Using Streaming Data
You can make any data spec into a streaming data frame by passing additional options to the build
method.
If the withStreaming
option is used when building the data set, it will use a streaming rate source to generate
the data. You can control the streaming rate with the option rowsPerSecond
.
In this case, the row count is ignored.
In most cases, no further changes are needed to run the data generation as a streaming data generator.
As the generated data frame is a normal spark streaming data frame, all the same caveats and features apply.
Example 1: site code and technology
from datetime import timedelta, datetime
import math
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, \
FloatType, TimestampType
# from dbldatagen.data_generator import DataGenerator,ensure
import dbldatagen as dg
interval = timedelta(days=1, hours=1)
start = datetime(2017, 10, 1, 0, 0, 0)
end = datetime(2018, 10, 1, 6, 0, 0)
schema = StructType([
StructField("site_id", IntegerType(), True),
StructField("site_cd", StringType(), True),
StructField("c", StringType(), True),
StructField("c1", StringType(), True),
StructField("sector_technology_desc", StringType(), True),
])
# will have implied column `id` for ordinal of row
ds = (
dg.DataGenerator(spark, name="association_oss_cell_info", rows=100000, partitions=20)
.withSchema(schema)
# withColumnSpec adds specification for existing column
.withColumnSpec("site_id", minValue=1, maxValue=20, step=1)
# base column specifies dependent column
.withIdOutput()
.withColumnSpec("site_cd", prefix='site', baseColumn='site_id')
.withColumn("sector_status_desc", "string", minValue=1, maxValue=200, step=1,
prefix='status', random=True)
# withColumn adds specification for new column
.withColumn("rand", "float", expr="floor(rand() * 350) * (86400 + 3600)")
.withColumn("last_sync_dt", "timestamp", begin=start, end=end, interval=interval,
random=True)
.withColumnSpec("sector_technology_desc", values=["GSM", "UMTS", "LTE", "UNKNOWN"],
random=True)
.withColumn("test_cell_flg", "integer", values=[0, 1], random=True)
)
df = ds.build(withStreaming=True, options={'rowsPerSecond': 500})
display(df)
Example 2: IOT style data
The following example shows how to control the length of time to run the streaming data generation for.
import time
time_to_run = 180
from pyspark.sql.types import LongType, IntegerType, StringType
import dbldatagen as dg
device_population = 10000
data_rows = 20 * 100000
partitions_requested = 8
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58,
8, 17]
manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']
lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']
testDataSpec = (
dg.DataGenerator(spark, name="device_data_set", rows=data_rows,
partitions=partitions_requested,
verbose=True)
.withIdOutput()
# we'll use hash of the base field to generate the ids to
# avoid a simple incrementing sequence
.withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
uniqueValues=device_population, omit=True, baseColumnType="hash")
# note for format strings, we must use "%lx" not "%x" as the
# underlying value is a long
.withColumn("device_id", StringType(), format="0x%013x",
baseColumn="internal_device_id")
# the device / user attributes will be the same for the same device id
# so lets use the internal device id as the base column for these attribute
.withColumn("country", StringType(), values=country_codes,
weights=country_weights, baseColumn="internal_device_id")
.withColumn("manufacturer", StringType(), values=manufacturers,
baseColumn="internal_device_id")
# use omit = True if you don't want a column to appear in the final output
# but just want to use it as part of generation of another column
.withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
baseColumnType="hash", omit=True)
.withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
baseColumn="device_id", baseColumnType="hash", omit=True)
.withColumn("model_line", StringType(), expr="concat(line, '#', model_ser)",
baseColumn=["line", "model_ser"])
.withColumn("event_type", StringType(),
values=["activation", "deactivation", "plan change",
"telecoms activity", "internet activity", "device error"],
random=True)
.withColumn("event_ts", "timestamp", expr="now()")
)
dfTestDataStreaming = testDataSpec.build(withStreaming=True, options={'rowsPerSecond': 500})
# ... do something with your streaming source here
display(dfTestDataStreaming)
In a separate notebook cell, you can execute the following code to terminate the streaming after a specified period of time.
time.sleep(time_to_run)
# note stopping the stream may produce exceptions - these can be ignored
for x in spark.streams.active:
try:
x.stop()
except RuntimeError:
pass
Using streaming data with Delta tables
If you write the streaming data to a Delta table using a streaming writer, then the Delta table itself can be used as a streaming source for downstream consumption.
from datetime import timedelta, datetime
import dbldatagen as dg
interval = timedelta(days=1, hours=1)
start = datetime(2017, 10, 1, 0, 0, 0)
end = datetime(2018, 10, 1, 6, 0, 0)
# row count will be ignored
ds = (dg.DataGenerator(spark, name="association_oss_cell_info", rows=100000, partitions=20)
.withColumnSpec("site_id", minValue=1, maxValue=20, step=1)
.withColumnSpec("site_cd", prefix='site', baseColumn='site_id')
.withColumn("sector_status_desc", "string", minValue=1, maxValue=200, step=1,
prefix='status', random=True)
.withColumn("rand", "float", expr="floor(rand() * 350) * (86400 + 3600)")
.withColumn("last_sync_dt", "timestamp", begin=start, end=end, interval=interval,
random=True)
.withColumn("sector_technology_desc", values=["GSM", "UMTS", "LTE", "UNKNOWN"],
random=True)
)
df = ds.build(withStreaming=True, options={'rowsPerSecond': 500})
df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/dbldatagen/streamingDemo/checkpoint")
.start("/tmp/dbldatagen/streamingDemo/data")