Skip to main content

Configuration

Recon Config

Types of Report Supported

report typesample visualisationdescriptionkey outputs captured in the recon metrics tables
schemaschemareconcile the schema of source and target.
- validate the datatype is same or compatible
- schema_comparison
- schema_difference
rowrowreconcile the data only at row level(hash value of the source row is matched with the hash value of the target).Preferred when there are no join columns identified between source and target.- missing_in_src(sample rows that are available in target but missing in source + sample rows in the target that don't match with the source)
- missing_in_tgt(sample rows that are available in source but are missing in target + sample rows in the source that doesn't match with target)
NOTE: the report won't differentiate the mismatch and missing here.
datadatareconcile the data at row and column level- join_columns will help us to identify mismatches at each row and column level- mismatch_data(the sample data with mismatches captured at each column and row level )
- missing_in_src(sample rows that are available in target but missing in source)
- missing_in_tgt(sample rows that are available in source but are missing in target)
- threshold_mismatch(configured column will be reconciled based on percentile or threshold boundary or date boundary)
- mismatch_columns(consolidated list of columns that has mismatches in them)
allallthis is a combination of data + schema- data + schema outputs

[back to top]

Report Type-Flow Chart







[back to top]

Supported Source System

SourceSchemaRowDataAll
OracleYesYesYesYes
SnowflakeYesYesYesYes
DatabricksYesYesYesYes

[back to top]

TABLE Config Json filename:

The config file must be named as recon_config_[DATA_SOURCE]_[SOURCE_CATALOG_OR_SCHEMA]_[REPORT_TYPE].json and should be placed in the remorph root directory .remorph within the Databricks Workspace.

The filename pattern would remain the same for all the data_sources.

recon_config_${DATA_SOURCE}_${SOURCE_CATALOG_OR_SCHEMA}_${REPORT_TYPE}.json

Please find the Table Recon filename examples below for the Snowflake, Oracle, TSQL and Databricks source systems.

recon_config_snowflake_sample_data_all.json
database_config:
source_catalog: sample_data
source_schema: default
...
metadata_config:
...
data_source: snowflake
report_type: all
...

Note: the filename must be created in the same case as [SOURCE_CATALOG_OR_SCHEMA] is defined. For example, if the source schema is defined as ORC in the reconcile config, the filename should be recon_config_oracle_ORC_data.json.

[back to top]

TABLE Config Elements:

@dataclass
class Table:
source_name: str
target_name: str
aggregates: list[Aggregate] | None = None
join_columns: list[str] | None = None
jdbc_reader_options: JdbcReaderOptions | None = None
select_columns: list[str] | None = None
drop_columns: list[str] | None = None
column_mapping: list[ColumnMapping] | None = None
transformations: list[Transformation] | None = None
column_thresholds: list[ColumnThresholds] | None = None
filters: Filters | None = None
table_thresholds: list[TableThresholds] | None = None

Configuration Reference

Configuration ParameterData TypeDescriptionRequirementExample Value
source_namestringSource table nameRequired
{"source_name": "product"}
target_namestringTarget table nameRequired
{"source_name": "product"}
aggregateslist[Aggregate]List of aggregation rules (see Aggregate)Optional
{"aggregates": {"type": "MAX", "agg_columns": ["price"]}}
join_columnslist[string]Primary key columnsOptional
{"join_columns": ["product_id", "order_id"]}
jdbc_reader_optionsobjectJDBC read parallelization configurationOptional
{"jdbc_reader_options": {"number_partitions": 10, "partition_column": "id", "fetch_size": 1000}}
select_columnslist[string]Columns to include in reconciliationOptional
{"select_columns": ["id", "name", "price"]}
drop_columnslist[string]Columns to exclude from reconciliationOptional
{"drop_columns": ["temp_sku"]}
column_mappinglist[ColumnMapping]Source-target column mapping (see column_mapping)Optional
{"column_mapping": {"source_name": "id", "target_name": "product_id"}}
transformationslist[Transformations]Column transformation rules (see transformations)Optional
{"transformations": {"column_name": "address", "source": "TRIM(address)", "target": "LOWER(TRIM(address))"}}
column_thresholdslist[ColumnThresholds]Column-level variance thresholds (see column_thresholds)Optional
{"column_thresholds": {"column_name": "price", "lower_bound": "-5%", "upper_bound": "+10%"}}
table_thresholdslist[TableThresholds]Table-level mismatch thresholds (see table_thresholds)Optional
{"table_thresholds": {"model": "mismatch", "lower_bound": "0%", "upper_bound": "5%"}}
filtersFiltersSource/target filter expressionsOptional
{"filters": {"source": "quantity > 100", "target": "stock_quantity >= 100"}}

[back to top]

JDBC Reader Options

@dataclass
class JdbcReaderOptions:
number_partitions: int
partition_column: str
lower_bound: str
upper_bound: str
fetch_size: int = 100
field_namedata_typedescriptionrequired/optionalexample_value
number_partitionsstringthe number of partitions for reading input data in parallelrequired"200"
partition_columnstringInt/date/timestamp parameter defining the column used for partitioning, typically the primary key of the source table. Note that this parameter accepts only one column, which is especially crucial when dealing with a composite primary key. In such cases, provide the column with higher cardinality.required"employee_id"
upper_boundstringinteger or date or timestamp without time zone value as string), that should be set appropriately (usually the maximum value in case of non-skew data) so the data read from the source should be approximately equally distributedrequired"1"
lower_boundstringinteger or date or timestamp without time zone value as string), that should be set appropriately (usually the minimum value in case of non-skew data) so the data read from the source should be approximately equally distributedrequired"100000"
fetch_sizestringThis parameter influences the number of rows fetched per round-trip between Spark and the JDBC database, optimising data retrieval performance. Adjusting this option significantly impacts the efficiency of data extraction, controlling the volume of data retrieved in each fetch operation. More details on configuring fetch size can be found hereoptional(default="100")"10000"
tip

Key Considerations for Oracle JDBC Reader Options:

For Oracle source, the following Spark Configurations are automatically set:

"oracle.jdbc.mapDateToTimestamp": "False",
"sessionInitStatement": "BEGIN dbms_session.set_nls('nls_date_format', '''YYYY-MM-DD''');dbms_session.set_nls('nls_timestamp_format', '''YYYY-MM-DD HH24:MI:SS''');END;"

While configuring Recon for Oracle source, the above options should be taken into consideration.

[back to top]

Column Mapping

@dataclass
class ColumnMapping:
source_name: str
target_name: str
field_namedata_typedescriptionrequired/optionalexample_value
source_namestringsource column namerequired"dept_id"
target_namestringtarget column namerequired"department_id"

[back to top]

Drop Columns

For Recon Type all or data, the drop_columns parameter is used to exclude columns from the reconciliation process. using this config, you can specify the columns that you want to exclude from the reconciliation process.

TableRecon(
drop_columns = ["column_name1", "column_name2"]
)

[back to top]

Transformations

You can apply custom transformations to the columns using the transformations parameter. If applied, Reconcile uses these transformations to fetch the data from source and/or target before comparing them. You can write SQL expressions in source & target fields to apply transformations.

The class detail:

@dataclass
class Transformation:
column_name: str
source: str
target: str | None = None

Syntax for applying transformations:

  Table(
transformations = [
Transformation(
column_name = "unit_price",
source = "coalesce(cast(cast(unit_price as decimal(38,10)) as string), '_null_recon_')",
target = "coalesce(cast(format_number(cast(unit_price as decimal(38, 10)), 10) as string), '_null_recon_')"
)
]
)
field_namedata_typedescriptionrequired/optionalexample_value
column_namestringthe column name on which the transformation to be appliedrequired"s_address"
sourcestringthe transformation sql expr to be applied on source columnrequired"trim(s_address)" or "s_address"
targetstringthe transformation sql expr to be applied on source columnrequired"trim(s_address)" or "s_address"
note

Reconciliation also takes an udf in the transformation expr.Say for eg. we have a udf named sort_array_input() that takes an unsorted array as input and returns an array sorted.We can use that in transformation as below:

transformations=[Transformation(column_name)="array_col",source=sort_array_input(array_col),target=sort_array_input(array_col)]

NULL values are defaulted to _null_recon_ using the transformation expressions in these files:

  1. expression_generator.py
  2. sampling_query.py. If User is looking for any specific behaviour, they can override these rules using transformations accordingly.
danger

Handling Nulls

While applying transformations, make sure you handle the nulls explicitly in the transformation column. While Reconcile takes care of null handling for all the other columns (including join keys and other keys that are not included in drop_column list), when users introduce transformation columns, Reconcile uses those expressions as is. So you would have to include null handling in the transformation expression itself.

So if you are planning on using the below expression:

cast(cast(scanout_units as decimal(38,10)) as string)

Use the below expression instead:

coalesce(cast(cast(scanout_units as decimal(38,10)) as string), '_null_recon_')

Handling Timestamp Columns

Different systems handle timestamps differently. During Reconciliation it is important that we apply necessary transformation to the timestamp columns on both source and target to make sure that they are reconciled correctly. A recommended approach to dealing with timestamps is to convert them to corresponding unix epoch string values. SQL Functions like epoch_millisecond (Snowflake), unix_millis (Databricks) become very handy to transform those timestamp values to unix epoch values. Then they can be converted to string which is safer when it comes to comparing timestamps across different systems.

Transformation(
column_name= 'UPDATE_TIMESTAMP',
source= "coalesce(cast(EXTRACT(epoch_millisecond FROM UPDATE_TIMESTAMP) as string), '_null_recon_')",
target= "coalesce(cast(unix_millis(UPDATE_TIMESTAMP) as string), '_null_recon_')"
),
Transformation Expressions
filenamefunction / variabletransformation_ruledescription
sampling_query.py_get_join_clausetransform(coalesce, default="null_recon", is_string=True)Applies the coalesce transformation function for String column and defaults to _null_recon_ if column is NULL
expression_generator.pyDataType_transform_mapping(coalesce, default='null_recon', is_string=True)Default String column Transformation rule for all dialects. Applies the coalesce transformation function and defaults to _null_recon_ if column is NULL
expression_generator.pyDataType_transform_mapping"oracle": DataType...NCHAR: ..."NVL(TRIM(TO_CHAR..,'null_recon')"Transformation rule for oracle dialect 'NCHAR' datatype. Applies TO_CHAR, TRIM transformation functions. If column is NULL, then defaults to _null_recon_
expression_generator.pyDataType_transform_mapping"oracle": DataType...NVARCHAR: ..."NVL(TRIM(TO_CHAR..,'null_recon')"Transformation rule for oracle dialect 'NVARCHAR' datatype. Applies TO_CHAR, TRIM transformation functions. If column is NULL, then defaults to _null_recon_
tip

If you are applying any transformation for generating the Reconciliation report, it is recommended that you run the queries with transformation expressions both on Source and target to ensure, they are syntactically and semantically giving correct result.

If you need help with the exact reconciliation queries that are getting generated after you apply the transformations, you may search the reconciliation log with the following string:

“Hash query for [source or target]”

[back to Transformations]

[back to top]

Column Thresholds

@dataclass
class ColumnThresholds:
column_name: str
lower_bound: str
upper_bound: str
type: str
field_namedata_typedescriptionrequired/optionalexample_value
column_namestringthe column that should be considered for column threshold reconciliationrequired"product_discount"
lower_boundstringthe lower bound of the difference between the source value and the target valuerequired"-5%"
upper_boundstringthe upper bound of the difference between the source value and the target valuerequired"5%"
typestringThe user must specify the column type. Supports SQLGLOT DataType.NUMERIC_TYPES and DataType.TEMPORAL_TYPES.required"int"

[back to top]

Table Thresholds

@dataclass
class TableThresholds:
lower_bound: str
upper_bound: str
model: str
  • The threshold bounds for the table must be non-negative, with the lower bound not exceeding the upper bound.
field_namedata_typedescriptionrequired/optionalexample_value
lower_boundstringthe lower bound of the difference between the source mismatch and the target mismatch countrequired0%
upper_boundstringthe upper bound of the difference between the source mismatch and the target mismatch countrequired5%
modelstringThe user must specify on which table model it should be applied; for now, we support only "mismatch"requiredint

[back to top]

Filters

@dataclass
class Filters:
source: str
target: str
field_namedata_typedescriptionrequired/optionalexample_value
sourcestringthe sql expression to filter the data from sourceoptional(default=None)
"lower(dept_name)='finance'"
targetstringthe sql expression to filter the data from targetoptional(default=None)
"lower(dept_name)='finance'"
Key Considerations:
  1. The column names are always converted to lowercase and considered for reconciliation.
  2. Currently, it doesn't support case insensitivity and doesn't have collation support
  3. Table Transformation internally considers the default value as the column value. It doesn't apply any default transformations if not provided. eg:Transformation(column_name="address",source_name=None,target_name="trim(s_address)") For the given example, the source transformation is None, so the raw value in the source is considered for reconciliation.
  4. If no user transformation is provided for a given column in the configuration by default, depending on the source data type, our reconciler will apply default transformation on both source and target to get the matching hash value in source and target. Please find the detailed default transformations here.
  5. Always the column reference to be source column names in all the configs, except Transformations and Filters as these are dialect-specific SQL expressions that are applied directly in the SQL.
  6. Transformations and Filters should always be in their respective dialect SQL expressions, and the reconciler will not apply any logic on top of this.

[back to top]

Aggregates Reconciliation

Aggregates Reconcile is an utility to streamline the reconciliation process, specific aggregate metric is compared between source and target data residing on Databricks.

Summary

operation_namesample visualisationdescriptionkey outputs captured in the recon metrics tables
aggregates-reconcile[data]({useBaseUrl('docs/reconcile/aggregates_reconcile_visualisation.md#data')})reconciles the data for each aggregate metric join_columns are used to identify the mismatches at aggregated metric levelmismatch_data(sample data with mismatches captured at aggregated metric level ) missing_in_src(sample rows that are available in target but missing in source) missing_in_tgt(sample rows that are available in source but are missing in target)

Supported Aggregate Functions

Aggregate Functions
min
max
count
sum
avg
mean
mode
stddev
variance
median

[back to aggregates-reconciliation]

[back to top]

Flow Chart

[back to aggregates-reconciliation]

[back to top]

Aggregate

@dataclass
class Aggregate:
agg_columns: list[str]
type: str
group_by_columns: list[str] | None = None
field_namedata_typedescriptionrequired/optionalexample_value
typestringSupported Aggregate FunctionsrequiredMIN
agg_columnslist[string]list of columns names on which aggregate function needs to be appliedrequired["product_discount"]
group_by_columnslist[string]list of column names on which grouping needs to be appliedoptional(default=None)["product_id"] or None

[back to aggregates-reconciliation]

[back to top]

TABLE Config Examples:

Please refer TABLE Config Elements for Class and JSON configs.

Table(
source_name="SOURCE_NAME",
target_name="TARGET_NAME",
join_columns=["COLUMN_NAME_1", "COLUMN_NAME_2"],
aggregates=[
Aggregate(
agg_columns=["COLUMN_NAME_3"],
type="MIN",
group_by_columns=["GROUP_COLUMN_NAME"]
),
Aggregate(
agg_columns=["COLUMN_NAME_4"],
type="MAX"
)
]
)
note

Key Considerations:

  1. The aggregate column names, group by columns and type are always converted to lowercase and considered for reconciliation.
  2. Currently, it doesn't support aggregates on window function using the OVER clause.
  3. It doesn't support case insensitivity and does not have collation support
  4. The queries with “group by” column(s) are compared based on the same group by columns.
  5. The queries without “group by” column(s) are compared row-to-row.
  6. Existing features like column_mapping, transformations, JDBCReaderOptions and filters are leveraged for the aggregate metric reconciliation.
  7. Existing select_columns and drop_columns are not considered for the aggregate metric reconciliation.
  8. Even though the user provides the select_columns and drop_columns, those are not considered.
  9. If Transformations are defined, those are applied to both the “aggregate columns” and “group by columns”.

[back to aggregates-reconciliation]

[back to top]