Generating JSON and Structured Column Data
This section explores generating JSON and structured column data. By structured columns, we mean columns that are some combination of struct, array and map of other types.
Note that some of the examples are code fragments for illustration purposes only.
Generating JSON data
There are several methods for generating JSON data:
Generate a dataframe and save it as JSON will generate full data set as JSON
Generate JSON valued fields using SQL functions such as named_struct and to_json.
Writing dataframe as JSON data
The following example illustrates the basic technique for generating JSON data from a dataframe.
from pyspark.sql.types import LongType, IntegerType, StringType
import dbldatagen as dg
device_population = 100000
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58, 8,
17]
manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']
lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']
testDataSpec = (
dg.DataGenerator(spark, name="device_data_set", rows=1000000,
partitions=8, randomSeedMethod='hash_fieldname')
.withIdOutput()
# we'll use hash of the base field to generate the ids to
# avoid a simple incrementing sequence
.withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
uniqueValues=device_population, omit=True, baseColumnType="hash")
# note for format strings, we must use "%lx" not "%x" as the
# underlying value is a long
.withColumn("device_id", StringType(), format="0x%013x",
baseColumn="internal_device_id")
# the device / user attributes will be the same for the same device id
# so lets use the internal device id as the base column for these attribute
.withColumn("country", StringType(), values=country_codes,
weights=country_weights,
baseColumn="internal_device_id")
.withColumn("manufacturer", StringType(), values=manufacturers,
baseColumn="internal_device_id")
# use omit = True if you don't want a column to appear in the final output
# but just want to use it as part of generation of another column
.withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
baseColumnType="hash")
.withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
baseColumn="device_id",
baseColumnType="hash", omit=True)
.withColumn("event_type", StringType(),
values=["activation", "deactivation", "plan change",
"telecoms activity", "internet activity", "device error"],
random=True)
.withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00",
end="2020-12-31 23:59:00",
interval="1 minute", random=True)
)
dfTestData = testDataSpec.build()
dfTestData.write.format("json").mode("overwrite").save("/tmp/jsonData1")
In the most basic form, you can simply save the dataframe to storage in JSON format.
Use of nested structures in data generation specifications
When we save a dataframe containing complex column types such as map, struct and array, these will be converted to equivalent constructs in JSON.
So how do we go about creating these?
We can use a struct valued column to hold the nested structure data and write the results out as JSON
Struct / array and map valued columns can be created by adding a column of the appropriate type and using the expr attribute to assemble the complex column.
Note that in the current release, the expr attribute will override other column data generation rules.
from pyspark.sql.types import LongType, FloatType, IntegerType, StringType, \
DoubleType, BooleanType, ShortType, \
TimestampType, DateType, DecimalType, \
ByteType, BinaryType, ArrayType, MapType, \
StructType, StructField
import dbldatagen as dg
device_population = 100000
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58, 8,
17]
manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']
lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']
testDataSpec = (
dg.DataGenerator(spark, name="device_data_set", rows=1000000,
partitions=8, randomSeedMethod='hash_fieldname')
.withIdOutput()
# we'll use hash of the base field to generate the ids to
# avoid a simple incrementing sequence
.withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
uniqueValues=device_population, omit=True, baseColumnType="hash")
# note for format strings, we must use "%lx" not "%x" as the
# underlying value is a long
.withColumn("device_id", StringType(), format="0x%013x",
baseColumn="internal_device_id")
# the device / user attributes will be the same for the same device id
# so lets use the internal device id as the base column for these attribute
.withColumn("country", StringType(), values=country_codes,
weights=country_weights,
baseColumn="internal_device_id")
.withColumn("manufacturer", StringType(), values=manufacturers,
baseColumn="internal_device_id", omit=True)
.withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
baseColumnType="hash", omit=True)
.withColumn("manufacturer_info", StructType([StructField('line',StringType()),
StructField('manufacturer', StringType())]),
expr="named_struct('line', line, 'manufacturer', manufacturer)",
baseColumn=['manufacturer', 'line'])
.withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
baseColumn="device_id",
baseColumnType="hash", omit=True)
.withColumn("event_type", StringType(),
values=["activation", "deactivation", "plan change",
"telecoms activity", "internet activity", "device error"],
random=True, omit=True)
.withColumn("event_ts", "timestamp", begin="2020-01-01 01:00:00",
end="2020-12-31 23:59:00",
interval="1 minute", random=True, omit=True)
.withColumn("event_info",
StructType([StructField('event_type',StringType()),
StructField('event_ts', TimestampType())]),
expr="named_struct('event_type', event_type, 'event_ts', event_ts)",
baseColumn=['event_type', 'event_ts'])
)
dfTestData = testDataSpec.build()
dfTestData.write.format("json").mode("overwrite").save("/tmp/jsonData2")
As the datatype can also be specified using a string, the withColumn
entry for 'event_info'
could also be
written as:
.withColumn("event_info",
"struct<event_type:string, event_ts: timestamp>",
expr="named_struct('event_type', event_type, 'event_ts', event_ts)",
baseColumn=['event_type', 'event_ts'])
To simplify the specification of struct valued columns, the defined value of INFER_DATATYPE can be used in place of the datatype when the expr attribute is specified. This will cause the datatype to be inferred from the expression.
In this case, the previous code would be written as follows:
.withColumn("event_info",
dg.INFER_DATATYPE,
expr="named_struct('event_type', event_type, 'event_ts', event_ts)")
The helper method withStructColumn
can also be used to simplify the specification of struct valued columns.
Using this method, the previous code can be written as one of the following options:
# Use either form to create the struct valued field
.withStructColumn("event_info1", fields=['event_type', 'event_ts'])
.withStructColumn("event_info2", fields={'event_type': 'event_type',
'event_ts': 'event_ts'})
In the case of the second variant, the expression following the struct field name can be any arbitrary SQL string. It can also generate JSON for the same definition.
See the following documentation for more details: withStructColumn
Generating JSON valued fields
JSON valued fields can be generated as fields of string type and assembled using a combination of Spark SQL functions such as named_struct and to_json.
from pyspark.sql.types import LongType, FloatType, IntegerType, \
StringType, DoubleType, BooleanType, ShortType, \
TimestampType, DateType, DecimalType, ByteType, \
BinaryType, ArrayType, MapType, StructType, StructField
import dbldatagen as dg
device_population = 100000
country_codes = ['CN', 'US', 'FR', 'CA', 'IN', 'JM', 'IE', 'PK', 'GB', 'IL', 'AU', 'SG',
'ES', 'GE', 'MX', 'ET', 'SA', 'LB', 'NL']
country_weights = [1300, 365, 67, 38, 1300, 3, 7, 212, 67, 9, 25, 6, 47, 83, 126, 109, 58, 8,
17]
manufacturers = ['Delta corp', 'Xyzzy Inc.', 'Lakehouse Ltd', 'Acme Corp', 'Embanks Devices']
lines = ['delta', 'xyzzy', 'lakehouse', 'gadget', 'droid']
testDataSpec = (
dg.DataGenerator(spark, name="device_data_set", rows=1000000,
partitions=8,
randomSeedMethod='hash_fieldname')
.withIdOutput()
# we'll use hash of the base field to generate the ids to
# avoid a simple incrementing sequence
.withColumn("internal_device_id", LongType(), minValue=0x1000000000000,
uniqueValues=device_population, omit=True, baseColumnType="hash")
# note for format strings, we must use "%lx" not "%x" as the
# underlying value is a long
.withColumn("device_id", StringType(), format="0x%013x",
baseColumn="internal_device_id")
# the device / user attributes will be the same for the same device id
# so lets use the internal device id as the base column for these attribute
.withColumn("country", StringType(), values=country_codes,
weights=country_weights,
baseColumn="internal_device_id")
.withColumn("manufacturer", StringType(), values=manufacturers,
baseColumn="internal_device_id", omit=True)
.withColumn("line", StringType(), values=lines, baseColumn="manufacturer",
baseColumnType="hash", omit=True)
.withColumn("manufacturer_info", "string",
expr="to_json(named_struct('line', line, 'manufacturer', manufacturer))",
baseColumn=['manufacturer', 'line'])
.withColumn("model_ser", IntegerType(), minValue=1, maxValue=11,
baseColumn="device_id",
baseColumnType="hash", omit=True)
.withColumn("event_type", StringType(),
values=["activation", "deactivation", "plan change",
"telecoms activity", "internet activity", "device error"],
random=True, omit=True)
.withColumn("event_ts", "timestamp",
begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00",
interval="1 minute", random=True, omit=True)
.withColumn("event_info", "string",
expr="to_json(named_struct('event_type', event_type, 'event_ts', event_ts))",
baseColumn=['event_type', 'event_ts'])
)
dfTestData = testDataSpec.build()
#dfTestData.write.format("json").mode("overwrite").save("/tmp/jsonData2")
display(dfTestData)
The helper method withStructColumn
in the DataGenerator class can also be used to simplify the specification
of struct valued columns. When the argument asJson
is set to True
, the resulting structure
will be transformed to JSON.
Generating complex column data
There are several methods for columns with arrays, structs and maps.
You can define a column has having a value of type array, map or struct. This can
be specified in the datatype parameter to the withColumn
method as a string such as "array<string>"
or as a
composite of datatype object instances.
If the column type is based on a struct, map or array, then the expr
attribute must be specified to provide a
value for the column.
If the expr
attribute is not specified, then the default column value will be NULL
.
The following example illustrates some of these techniques:
import dbldatagen as dg
ds = (
dg.DataGenerator(spark, name="test_data_set1", rows=1000, random=True)
.withColumn("r", "float", minValue=1.0, maxValue=10.0, step=0.1,
numColumns=5)
.withColumn("observations", "array<float>",
expr="slice(array(r_0, r_1, r_2, r_3, r_4), 1, abs(hash(id)) % 5 + 1 )",
baseColumn="r")
)
df = ds.build()
df.show()
The above example constructs a varying length array valued column observations
using the slice
function to take
variable length subsets of the r
columns.
Note
Note the use of the baseColumn attribute here to ensure correct ordering and separation of phases.
Using inferred datatypes
When building columns with complex data types such as structs especially with nested structs, it can be repetitive and
error prone to specify the datatypes - especially when the column is based on the resuls of a SQL expression
(as specified by the expr
attribute).
You may use the constant INFER_DATATYPE
in place of the actual datatype when the expr
attribute is used.
When the INFER_DATATYPE
constant is used for the datatype, the actual datatype for the column will be inferred
from the SQL expression passed using the expr
parameter. This is only supported when the expr
parameter is
populated.
The following example illustrates this:
import dbldatagen as dg
column_count = 10
data_rows = 10 * 1000
df_spec = (dg.DataGenerator(spark, name="test_data_set1", rows=data_rows)
.withIdOutput()
.withColumn("r", FloatType(), expr="floor(rand() * 350) * (86400 + 3600)",
numColumns=column_count, structType="array")
.withColumn("code1", "integer", minValue=100, maxValue=200)
.withColumn("code2", "integer", minValue=0, maxValue=10)
.withColumn("code3", "string", values=['one', 'two', 'three'])
.withColumn("code4", "string", values=['one', 'two', 'three'])
.withColumn("code5", dg.INFER_DATATYPE, expr="current_date()")
.withColumn("code6", dg.INFER_DATATYPE, expr="code1 + code2")
.withColumn("code7", dg.INFER_DATATYPE, expr="concat(code3, code4)")
)
Using multi feature columns to generate arrays
For array valued columns, where all of the elements of the array are to be generated with the same column specification, an alternative method is also supported.
You can specify that a column has a specific number of features with structType
attribute value of 'array'
to control the generation of the column. In this case, the datatype should be the type of the individual element,
not of the array.
For example, the following code will generate rows with varying numbers of synthetic emails for each customer row:
import dbldatagen as dg
ds = (
dg.DataGenerator(sparkSession=spark, name="customers", rows=1000, partitions=4,
random=True)
.withColumn("name", "string", percentNulls=0.01, template=r'\\w \\w|\\w A. \\w|test')
.withColumn("emails", "string", template=r'\\w.\\w@\\w.com', numFeatures=(1, 6),
structType="array")
)
df = ds.build()