data_flow_id | This is unique identifier for pipeline |
data_flow_group | This is group identifier for launching multiple pipelines under single DLT |
source_format | Source format e.g cloudFiles , eventhub , kafka , delta , snapshot |
source_details | This map Type captures all source details for cloudfiles = source_schema_path , source_path_{env} , source_database , source_metadata For eventhub= source_schema_path , eventhub.accessKeyName , eventhub.accessKeySecretName , eventhub.name , eventhub.secretsScopeName , kafka.sasl.mechanism , kafka.security.protocol , eventhub.namespace , eventhub.port . For Source schema file spark DDL schema format parsing is supported In case of custom schema format then write schema parsing function bronze_schema_mapper(schema_file_path, spark):Schema and provide to OnboardDataflowspec initialization e.g onboardDataFlowSpecs = OnboardDataflowspec(spark, dict_obj,bronze_schema_mapper).onboardDataFlowSpecs() . For cloudFiles option _metadata columns addtiion there is source_metadata tag with attributes: include_autoloader_metadata_column flag (True or False value) will add _metadata column to target bronze dataframe, autoloader_metadata_col_name if this provided then will be used to rename _metadata to this value otherwise default is source_metadata ,select_metadata_cols:{key:value} will be used to extract columns from _metadata. key is target dataframe column name and value is expression used to add column from _metadata column. for snapshot= snapshot_format , source_path_{env} |
bronze_database_{env} | Delta lake bronze database name. |
bronze_table | Delta lake bronze table name |
bronze_reader_options | Reader options which can be provided to spark reader e.g multiline=true,header=true in json format |
bronze_parition_columns | Bronze table partition cols list |
bronze_cluster_by | Bronze tables cluster by cols list |
bronze_cdc_apply_changes | Bronze cdc apply changes Json |
bronze_apply_changes_from_snapshot | Bronze apply changes from snapshot Json e.g. Mandatory fields: keys=[“userId”], scd_type=1 or 2 optional fields: track_history_column_list=[col1] , track_history_except_column_list=[col2] |
bronze_table_path_{env} | Bronze table storage path. |
bronze_table_properties | DLT table properties map. e.g. {"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false" } |
bronze_data_quality_expectations_json | Bronze table data quality expectations |
bronze_database_quarantine_{env} | Bronze database for quarantine data which fails expectations. |
bronze_quarantine_table Bronze | Table for quarantine data which fails expectations |
bronze_quarantine_table_path_{env} | Bronze database for quarantine data which fails expectations. |
bronze_quarantine_table_partitions | Bronze quarantine tables partition cols |
bronze_quarantine_table_cluster_by | Bronze quarantine tables cluster cols |
bronze_quarantine_table_properties | DLT table properties map. e.g. {"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false" } |
bronze_append_flows | Bronze table append flows json. e.g."bronze_append_flows":[{"name":"customer_bronze_flow", "create_streaming_table": false,"source_format": "cloudFiles", "source_details": {"source_database": "APP","source_table":"CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"},"reader_options": {"cloudFiles.format": "json","cloudFiles.inferColumnTypes": "true","cloudFiles.rescuedDataColumn": "_rescued_data"},"once": true}] |
silver_database_{env} | Silver database name. |
silver_table | Silver table name |
silver_partition_columns | Silver table partition columns list |
silver_cluster_by | Silver tables cluster by cols list |
silver_cdc_apply_changes | Silver cdc apply changes Json |
silver_table_path_{env} | Silver table storage path. |
silver_table_properties | DLT table properties map. e.g. {"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false"} |
silver_transformation_json | Silver table sql transformation json path |
silver_data_quality_expectations_json_{env} | Silver table data quality expectations json file path |
silver_append_flows | Silver table append flows json. e.g."silver_append_flows":[{"name":"customer_bronze_flow", "create_streaming_table": false,"source_format": "cloudFiles", "source_details": {"source_database": "APP","source_table":"CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"},"reader_options": {"cloudFiles.format": "json","cloudFiles.inferColumnTypes": "true","cloudFiles.rescuedDataColumn": "_rescued_data"},"once": true}] |