Using Streaming Data

You can make any data spec into a streaming data frame by passing additional options to the build method.

If the withStreaming option is used when building the data set, it will use a streaming rate source to generate the data. You can control the streaming rate with the option rowsPerSecond.

In this case, the row count is ignored.

In most cases, no further changes are needed to run the data generation as a streaming data generator.

As the generated data frame is a normal spark streaming data frame, all the same caveats and features apply.

Example 1: site code and technology

from datetime import timedelta, datetime
import math
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, \
                              FloatType, TimestampType
# from dbldatagen.data_generator import DataGenerator,ensure
import dbldatagen as dg

interval = timedelta(days=1, hours=1)
start = datetime(2017, 10, 1, 0, 0, 0)
end = datetime(2018, 10, 1, 6, 0, 0)

schema = StructType([
    StructField("site_id", IntegerType(), True),
    StructField("site_cd", StringType(), True),
    StructField("c", StringType(), True),
    StructField("c1", StringType(), True),
    StructField("sector_technology_desc", StringType(), True),
])

# will have implied column `id` for ordinal of row
ds = (
    dg.DataGenerator(spark, name="association_oss_cell_info", rows=100000, partitions=20)
    .withSchema(schema)
    # withColumnSpec adds specification for existing column
    .withColumnSpec("site_id", minValue=1, maxValue=20, step=1)
    # base column specifies dependent column
    .withIdOutput()
    .withColumnSpec("site_cd", prefix='site', baseColumn='site_id')
    .withColumn("sector_status_desc", "string", minValue=1, maxValue=200, step=1,
                prefix='status', random=True)
    # withColumn adds specification for new column
    .withColumn("rand", "float", expr="floor(rand() * 350) * (86400 + 3600)")
    .withColumn("last_sync_dt", "timestamp", begin=start, end=end, interval=interval,
                random=True)
    .withColumnSpec("sector_technology_desc", values=["GSM", "UMTS", "LTE", "UNKNOWN"],
                random=True)
    .withColumn("test_cell_flg", "integer", values=[0, 1], random=True)
)

df = ds.build(withStreaming=True, options={'rowsPerSecond': 500})

display(df)

Example 2: IOT style data

The following example shows how to control the length of time to run the streaming data generation for.

import time
time_to_run = 180

from pyspark.sql.types import LongType, IntegerType, StringType

import dbldatagen as dg

device_population = 10000
data_rows = 20 * 100000
partitions_requested = 8

country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
                 'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58,
                   8, 17]

manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']

lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']

testDataSpec = (
    dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
                     partitions=partitions_requested,
                     verbose=True)
    .withIdOutput()
    # we'll use hash of the base field to generate the ids to
    # avoid a simple incrementing sequence
    .withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
                uniqueValues=device_population, omit=True, baseColumnType="hash")

    # note for format strings, we must use "%lx" not "%x" as the
    # underlying value is a long
    .withColumn("device_id", StringType(), format="0x%013x",
                baseColumn="internal_device_id")

    # the device / user attributes will be the same for the same device id
    # so lets use the internal device id as the base column for these attribute
    .withColumn("country", StringType(), values=country_codes,
                weights=country_weights, baseColumn="internal_device_id")
    .withColumn("manufacturer", StringType(), values=manufacturers,
                baseColumn="internal_device_id")

    # use omit = True if you don't want a column to appear in the final output
    # but just want to use it as part of generation of another column
    .withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
                baseColumnType="hash", omit=True)
    .withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
                baseColumn="device_id", baseColumnType="hash", omit=True)

    .withColumn("model_line", StringType(), expr="concat(line, '#', model_ser)",
                baseColumn=["line", "model_ser"])
    .withColumn("event_type", StringType(),
                values=["activation", "deactivation", "plan change",
                        "telecoms activity", "internet activity", "device error"],
                random=True)
    .withColumn("event_ts", "timestamp", expr="now()")
    )

dfTestDataStreaming = testDataSpec.build(withStreaming=True, options={'rowsPerSecond': 500})

# ... do something with your streaming source here
display(dfTestDataStreaming)

In a separate notebook cell, you can execute the following code to terminate the streaming after a specified period of time.

time.sleep(time_to_run)

# note stopping the stream may produce exceptions - these can be ignored
for x in spark.streams.active:
    try:
        x.stop()
    except RuntimeError:
        pass

Using streaming data with Delta tables

If you write the streaming data to a Delta table using a streaming writer, then the Delta table itself can be used as a streaming source for downstream consumption.

from datetime import timedelta, datetime
import dbldatagen as dg

interval = timedelta(days=1, hours=1)
start = datetime(2017, 10, 1, 0, 0, 0)
end = datetime(2018, 10, 1, 6, 0, 0)

# row count will be ignored
ds = (dg.DataGenerator(spark, name="association_oss_cell_info", rows=100000, partitions=20)
      .withColumnSpec("site_id", minValue=1, maxValue=20, step=1)
      .withColumnSpec("site_cd", prefix='site', baseColumn='site_id')
      .withColumn("sector_status_desc", "string", minValue=1, maxValue=200, step=1,
                  prefix='status', random=True)
      .withColumn("rand", "float", expr="floor(rand() * 350) * (86400 + 3600)")
      .withColumn("last_sync_dt", "timestamp", begin=start, end=end, interval=interval,
                  random=True)
      .withColumn("sector_technology_desc", values=["GSM", "UMTS", "LTE", "UNKNOWN"],
                  random=True)
      )

df = ds.build(withStreaming=True, options={'rowsPerSecond': 500})

df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/tmp/dbldatagen/streamingDemo/checkpoint")
    .start("/tmp/dbldatagen/streamingDemo/data")