Generating Change Data Capture Data
This section explores some of the features for generating CDC style data - that is exploring the ability to generate a base data set and then apply changes such as updates to existing rows and new rows that will be inserts to the existing data
See the section on repeatable data generation for the concepts that underpin the data generation.
Overview
We’ll generate a customer table, and write out the data.
Then we generate changes for the table and show merging them in.
To start, we’ll specify some locations for our data:
BASE_PATH = '/tmp/dbldatagen/cdc/'
dbutils.fs.mkdirs(BASE_PATH)
customers1_location = BASE_PATH + "customers1"
Lets generate 10 million customer style records.
We’ll add a timestamp for when the row was generated and a memo field to mark what operation added it.
import dbldatagen as dg
import pyspark.sql.functions as F
spark.catalog.clearCache()
shuffle_partitions_requested = 8
partitions_requested = 32
data_rows = 10 * 1000 * 1000
spark.conf.set("spark.sql.shuffle.partitions", shuffle_partitions_requested)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
uniqueCustomers = 10 * 1000000
dataspec = (
dg.DataGenerator(spark, rows=data_rows, partitions=partitions_requested)
.withColumn("customer_id","long", uniqueValues=uniqueCustomers)
.withColumn("name", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
.withColumn("alias", percentNulls=0.01, template=r'\\w \\w|\\w a. \\w')
.withColumn("payment_instrument_type", values=['paypal', 'Visa', 'Mastercard',
'American Express', 'discover', 'branded visa', 'branded mastercard'],
random=True, distribution="normal")
.withColumn("int_payment_instrument", "int", minValue=0000, maxValue=9999,
baseColumn="customer_id", baseColumnType="hash", omit=True)
.withColumn("payment_instrument",
expr="format_number(int_payment_instrument, '**** ****** *####')",
baseColumn="int_payment_instrument")
.withColumn("email", template=r'\\w.\\w@\\w.com|\\w-\\w@\\w')
.withColumn("email2", template=r'\\w.\\w@\\w.com')
.withColumn("ip_address", template=r'\\n.\\n.\\n.\\n')
.withColumn("md5_payment_instrument",
expr="md5(concat(payment_instrument_type, ':', payment_instrument))",
base_column=['payment_instrument_type', 'payment_instrument'])
.withColumn("customer_notes", text=dg.ILText(words=(1,8)))
.withColumn("created_ts", "timestamp", expr="now()")
.withColumn("modified_ts", "timestamp", expr="now()")
.withColumn("memo", expr="'original data'")
)
df1 = dataspec.build()
# write table
df1.write.format("delta").save(customers1_location)
Creating a table definition
We can use the features of the data generator to script SQL definitions for table creation and merge statements.
Lets create a table definition around our data. As we generate a SQL statement with an explicit location,
the table is implicitly external
and will not overwrite our data.
customers1_location = BASE_PATH + "customers1"
tableDefn=dataspec.scriptTable(name="customers1", location=customers1_location)
spark.sql(tableDefn)
Now lets explore the table layout:
%sql
-- lets check our table
select * from customers1
Creating Changes
Lets generate some changes.
Here we want to generate a set of new rows, which we guarantee to be new by using customer ids greater than the maximum existing customer id.
We will also generate a set of updates by sampling from the existing data and adding some modifications.
import dbldatagen as dg
import pyspark.sql.functions as F
start_of_new_ids = df1.select(F.max('customer_id')+1).collect()[0][0]
print(start_of_new_ids)
df1_inserts = (dataspec.clone()
.option("startingId", start_of_new_ids)
.withRowCount(10 * 1000)
.build()
.withColumn("memo", F.lit("insert"))
.withColumn("customer_id", F.expr(f"customer_id + {start_of_new_ids}"))
)
# read the written data - if we simply recompute, timestamps of original will be lost
df_original = spark.read.format("delta").load(customers1_location)
df1_updates = (df_original.sample(False, 0.1)
.limit(50 * 1000)
.withColumn("alias", F.lit('modified alias'))
.withColumn("modified_ts",F.expr('now()'))
.withColumn("memo", F.lit("update")))
df_changes = df1_inserts.union(df1_updates)
# randomize ordering
df_changes = (df_changes.withColumn("order_rand", F.expr("rand()"))
.orderBy("order_rand")
.drop("order_rand")
)
display(df_changes)
Merging in the changes
We can script the merge statement in the data generator.
The updateColumns
argument, specifies which columns should be updated.
The corresponding updateColumnExprs
argument provides SQL expressions as overrides for the
columns being updated. These do not have to provided - in which case the
values of the columns from the source table will be used.
df_changes.dropDuplicates(["customer_id"]).createOrReplaceTempView("customers1_changes")
sqlStmt = dataspec.scriptMerge(tgtName="customers1", srcName="customers1_changes",
joinExpr="src.customer_id=tgt.customer_id",
updateColumns=["alias", "memo","modified_ts"],
updateColumnExprs=[ ("memo", "'updated on merge'"),
("modified_ts", "now()")
])
print(sqlStmt)
spark.sql(sqlStmt)
That’s all that’s required to perform merges with the data generation framework. Note that these merge script statements can be used as part of a streaming merge implementation also.