Metadata Preparation

Directory structure

conf/
    onboarding.json
    silver_transformations.json
    dqe/
        bronze_data_quality_expectations.json
  1. Create onboarding.json
  2. Create silver_transformations.json
  3. Create data quality rules json’s for each entity e.g. Data Quality Rules

The onboarding.json file contains links to silver_transformations.json and data quality expectation files dqe.

onboarding.json File structure: Examples( Autoloader, Eventhub, Kafka )

env is your environment placeholder e.g dev, prod, stag

FieldDescription
data_flow_idThis is unique identifier for pipeline
data_flow_groupThis is group identifier for launching multiple pipelines under single DLT
source_formatSource format e.g cloudFiles, eventhub, kafka, delta, snapshot
source_detailsThis map Type captures all source details for cloudfiles = source_schema_path, source_path_{env}, source_database, source_metadata For eventhub= source_schema_path , eventhub.accessKeyName, eventhub.accessKeySecretName, eventhub.name , eventhub.secretsScopeName , kafka.sasl.mechanism, kafka.security.protocol, eventhub.namespace, eventhub.port. For Source schema file spark DDL schema format parsing is supported In case of custom schema format then write schema parsing function bronze_schema_mapper(schema_file_path, spark):Schema and provide to OnboardDataflowspec initialization e.g onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs(). For cloudFiles option _metadata columns addtiion there is source_metadata tag with attributes: include_autoloader_metadata_column flag (True or False value) will add _metadata column to target bronze dataframe, autoloader_metadata_col_name if this provided then will be used to rename _metadata to this value otherwise default is source_metadata,select_metadata_cols:{key:value} will be used to extract columns from _metadata. key is target dataframe column name and value is expression used to add column from _metadata column. for snapshot= snapshot_format, source_path_{env}
bronze_database_{env}Delta lake bronze database name.
bronze_tableDelta lake bronze table name
bronze_reader_optionsReader options which can be provided to spark reader e.g multiline=true,header=true in json format
bronze_parition_columnsBronze table partition cols list
bronze_cluster_byBronze tables cluster by cols list
bronze_cdc_apply_changesBronze cdc apply changes Json
bronze_apply_changes_from_snapshotBronze apply changes from snapshot Json e.g. Mandatory fields: keys=[“userId”], scd_type=1 or 2 optional fields: track_history_column_list=[col1], track_history_except_column_list=[col2]
bronze_table_path_{env}Bronze table storage path.
bronze_table_propertiesDLT table properties map. e.g. {"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false" }
bronze_data_quality_expectations_jsonBronze table data quality expectations
bronze_database_quarantine_{env}Bronze database for quarantine data which fails expectations.
bronze_quarantine_table BronzeTable for quarantine data which fails expectations
bronze_quarantine_table_path_{env}Bronze database for quarantine data which fails expectations.
bronze_quarantine_table_partitionsBronze quarantine tables partition cols
bronze_quarantine_table_cluster_byBronze quarantine tables cluster cols
bronze_quarantine_table_propertiesDLT table properties map. e.g. {"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false" }
bronze_append_flowsBronze table append flows json. e.g."bronze_append_flows":[{"name":"customer_bronze_flow", "create_streaming_table": false,"source_format": "cloudFiles", "source_details": {"source_database": "APP","source_table":"CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"},"reader_options": {"cloudFiles.format": "json","cloudFiles.inferColumnTypes": "true","cloudFiles.rescuedDataColumn": "_rescued_data"},"once": true}]
silver_database_{env}Silver database name.
silver_tableSilver table name
silver_partition_columnsSilver table partition columns list
silver_cluster_bySilver tables cluster by cols list
silver_cdc_apply_changesSilver cdc apply changes Json
silver_table_path_{env}Silver table storage path.
silver_table_propertiesDLT table properties map. e.g. {"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false"}
silver_transformation_jsonSilver table sql transformation json path
silver_data_quality_expectations_json_{env}Silver table data quality expectations json file path
silver_append_flowsSilver table append flows json. e.g."silver_append_flows":[{"name":"customer_bronze_flow", "create_streaming_table": false,"source_format": "cloudFiles", "source_details": {"source_database": "APP","source_table":"CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"},"reader_options": {"cloudFiles.format": "json","cloudFiles.inferColumnTypes": "true","cloudFiles.rescuedDataColumn": "_rescued_data"},"once": true}]

Data Quality Rules File Structure(Examples)

FieldDescription
expectSpecify multiple data quality sql for each field when records that fail validation should be included in the target dataset
expect_or_failSpecify multiple data quality sql for each field when records that fail validation should halt pipeline execution
expect_or_dropSpecify multiple data quality sql for each field when records that fail validation should be dropped from the target dataset
expect_or_quarantineSpecify multiple data quality sql for each field when records that fails validation will be dropped from main table and inserted into quarantine table specified in dataflowspec (only applicable for Bronze layer)

Silver transformation File Structure(Example)

FieldDescription
target_tableSpecify target table name : Type String
target_partition_colsSpecify partition columns : Type Array
select_expSpecify SQL expressions : Type Array
where_clauseSpecify filter conditions if you want to prevent certain records from main input : Type Array