Configuration
Recon Config
- Types of Report Supported
- Report Type-Flow Chart
- Supported Source System
- TABLE Config JSON filename
- TABLE Config Elements
- Aggregates
- JDBC Reader Options
- Column Mapping
- Drop Columns
- Transformations
- Column Thresholds
- Table Thresholds
- Filters
- Aggregates Reconcile
Types of Report Supported
report type | sample visualisation | description | key outputs captured in the recon metrics tables |
---|---|---|---|
schema | schema | reconcile the schema of source and target. - validate the datatype is same or compatible | - schema_comparison - schema_difference |
row | row | reconcile the data only at row level(hash value of the source row is matched with the hash value of the target).Preferred when there are no join columns identified between source and target. | - missing_in_src(sample rows that are available in target but missing in source + sample rows in the target that don't match with the source) - missing_in_tgt(sample rows that are available in source but are missing in target + sample rows in the source that doesn't match with target) NOTE: the report won't differentiate the mismatch and missing here. |
data | data | reconcile the data at row and column level- join_columns will help us to identify mismatches at each row and column level | - mismatch_data(the sample data with mismatches captured at each column and row level ) - missing_in_src(sample rows that are available in target but missing in source) - missing_in_tgt(sample rows that are available in source but are missing in target) - threshold_mismatch(configured column will be reconciled based on percentile or threshold boundary or date boundary) - mismatch_columns(consolidated list of columns that has mismatches in them) |
all | all | this is a combination of data + schema | - data + schema outputs |
Report Type-Flow Chart
Supported Source System
Source | Schema | Row | Data | All |
---|---|---|---|---|
Oracle | Yes | Yes | Yes | Yes |
Snowflake | Yes | Yes | Yes | Yes |
Databricks | Yes | Yes | Yes | Yes |
TABLE Config Json filename:
The config file must be named as recon_config_[DATA_SOURCE]_[SOURCE_CATALOG_OR_SCHEMA]_[REPORT_TYPE].json
and should be placed in the remorph root directory .remorph
within the Databricks Workspace.
The filename pattern would remain the same for all the data_sources.
recon_config_${DATA_SOURCE}_${SOURCE_CATALOG_OR_SCHEMA}_${REPORT_TYPE}.json
Please find the Table Recon
filename examples below for the Snowflake
, Oracle
, TSQL
and Databricks
source systems.
- Snowflake
- Oracle
- TSQL
- Databricks
database_config:
source_catalog: sample_data
source_schema: default
...
metadata_config:
...
data_source: snowflake
report_type: all
...
database_config:
source_schema: orc
...
metadata_config:
...
data_source: oracle
report_type: data
...
database_config:
source_schema: silver
...
metadata_config:
...
data_source: tsql
report_type: data
...
database_config:
source_schema: hms
...
metadata_config:
...
data_source: databricks
report_type: schema
...
Note: the filename must be created in the same case as [SOURCE_CATALOG_OR_SCHEMA] is defined. For example, if the source schema is defined as
ORC
in thereconcile
config, the filename should berecon_config_oracle_ORC_data.json
.
TABLE Config Elements:
- Python
- Json
@dataclass
class Table:
source_name: str
target_name: str
aggregates: list[Aggregate] | None = None
join_columns: list[str] | None = None
jdbc_reader_options: JdbcReaderOptions | None = None
select_columns: list[str] | None = None
drop_columns: list[str] | None = None
column_mapping: list[ColumnMapping] | None = None
transformations: list[Transformation] | None = None
column_thresholds: list[ColumnThresholds] | None = None
filters: Filters | None = None
table_thresholds: list[TableThresholds] | None = None
{
"source_name": "[SOURCE_NAME]",
"target_name": "[TARGET_NAME]",
"aggregates": null,
"join_columns": ["COLUMN_NAME_1","COLUMN_NAME_2"],
"jdbc_reader_options": null,
"select_columns": null,
"drop_columns": null,
"column_mapping": null,
"transformation": null,
"column_thresholds": null,
"filters": null,
"table_thresholds": null
}
Configuration Reference
Configuration Parameter | Data Type | Description | Requirement | Example Value |
---|---|---|---|---|
source_name | string | Source table name | Required |
|
target_name | string | Target table name | Required |
|
aggregates | list[Aggregate] | List of aggregation rules (see Aggregate) | Optional |
|
join_columns | list[string] | Primary key columns | Optional |
|
jdbc_reader_options | object | JDBC read parallelization configuration | Optional |
|
select_columns | list[string] | Columns to include in reconciliation | Optional |
|
drop_columns | list[string] | Columns to exclude from reconciliation | Optional |
|
column_mapping | list[ColumnMapping] | Source-target column mapping (see column_mapping) | Optional |
|
transformations | list[Transformations] | Column transformation rules (see transformations) | Optional |
|
column_thresholds | list[ColumnThresholds] | Column-level variance thresholds (see column_thresholds) | Optional |
|
table_thresholds | list[TableThresholds] | Table-level mismatch thresholds (see table_thresholds) | Optional |
|
filters | Filters | Source/target filter expressions | Optional |
|
JDBC Reader Options
- Python
- JSON
@dataclass
class JdbcReaderOptions:
number_partitions: int
partition_column: str
lower_bound: str
upper_bound: str
fetch_size: int = 100
"jdbc_reader_options": {
"number_partitions": "<NUMBER_PARTITIONS>",
"partition_column": "<PARTITION_COLUMN>",
"lower_bound": "<LOWER_BOUND>",
"upper_bound": "<UPPER_BOUND>",
"fetch_size": "<FETCH_SIZE>"
}
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
number_partitions | string | the number of partitions for reading input data in parallel | required | "200" |
partition_column | string | Int/date/timestamp parameter defining the column used for partitioning, typically the primary key of the source table. Note that this parameter accepts only one column, which is especially crucial when dealing with a composite primary key. In such cases, provide the column with higher cardinality. | required | "employee_id" |
upper_bound | string | integer or date or timestamp without time zone value as string), that should be set appropriately (usually the maximum value in case of non-skew data) so the data read from the source should be approximately equally distributed | required | "1" |
lower_bound | string | integer or date or timestamp without time zone value as string), that should be set appropriately (usually the minimum value in case of non-skew data) so the data read from the source should be approximately equally distributed | required | "100000" |
fetch_size | string | This parameter influences the number of rows fetched per round-trip between Spark and the JDBC database, optimising data retrieval performance. Adjusting this option significantly impacts the efficiency of data extraction, controlling the volume of data retrieved in each fetch operation. More details on configuring fetch size can be found here | optional(default="100") | "10000" |
Key Considerations for Oracle JDBC Reader Options:
For Oracle source, the following Spark Configurations are automatically set:
"oracle.jdbc.mapDateToTimestamp": "False",
"sessionInitStatement": "BEGIN dbms_session.set_nls('nls_date_format', '''YYYY-MM-DD''');dbms_session.set_nls('nls_timestamp_format', '''YYYY-MM-DD HH24:MI:SS''');END;"
While configuring Recon for Oracle source, the above options should be taken into consideration.
Column Mapping
- Python
- JSON
@dataclass
class ColumnMapping:
source_name: str
target_name: str
"column_mapping": [
{
"source_name": "<SOURCE_COLUMN_NAME>",
"target_name": "<TARGET_COLUMN_NAME>"
}
]
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
source_name | string | source column name | required | "dept_id" |
target_name | string | target column name | required | "department_id" |
Drop Columns
For Recon Type all
or data
, the drop_columns
parameter is used to exclude columns from the reconciliation process.
using this config, you can specify the columns that you want to exclude from the reconciliation process.
- Python
- JSON
TableRecon(
drop_columns = ["column_name1", "column_name2"]
)
{
"drop_columns": [ "column_name1", "column_name2"]
}
Transformations
You can apply custom transformations to the columns using the transformations
parameter. If applied, Reconcile
uses these transformations to fetch the data from source and/or target before comparing them. You can write SQL
expressions in source
& target
fields to apply transformations.
The class detail:
@dataclass
class Transformation:
column_name: str
source: str
target: str | None = None
Syntax for applying transformations:
- Python
- JSON
Table(
transformations = [
Transformation(
column_name = "unit_price",
source = "coalesce(cast(cast(unit_price as decimal(38,10)) as string), '_null_recon_')",
target = "coalesce(cast(format_number(cast(unit_price as decimal(38, 10)), 10) as string), '_null_recon_')"
)
]
)
"transformations": [
{
"column_name": "[COLUMN_NAME]",
"source": "[TRANSFORMATION_EXPRESSION]",
"target": "[TRANSFORMATION_EXPRESSION]"
}
]
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
column_name | string | the column name on which the transformation to be applied | required | "s_address" |
source | string | the transformation sql expr to be applied on source column | required | "trim(s_address)" or "s_address" |
target | string | the transformation sql expr to be applied on source column | required | "trim(s_address)" or "s_address" |
Reconciliation also takes an udf in the transformation expr.Say for eg. we have a udf named sort_array_input() that takes an unsorted array as input and returns an array sorted.We can use that in transformation as below:
transformations=[Transformation(column_name)="array_col",source=sort_array_input(array_col),target=sort_array_input(array_col)]
NULL
values are defaulted to _null_recon_
using the transformation expressions in these files:
- expression_generator.py
- sampling_query.py. If User is looking for any specific behaviour, they can override these rules using transformations accordingly.
Handling Nulls
While applying transformations, make sure you handle the nulls explicitly in the transformation column. While Reconcile takes care of null handling for all the other columns (including join keys and other keys that are not included in drop_column list), when users introduce transformation columns, Reconcile uses those expressions as is. So you would have to include null handling in the transformation expression itself.
So if you are planning on using the below expression:
cast(cast(scanout_units as decimal(38,10)) as string)
Use the below expression instead:
coalesce(cast(cast(scanout_units as decimal(38,10)) as string), '_null_recon_')
Handling Timestamp Columns
Different systems handle timestamps differently. During Reconciliation it is important that we apply necessary transformation to the timestamp columns
on both source and target to make sure that they are reconciled correctly. A recommended approach to dealing with timestamps is to convert them to
corresponding unix epoch
string values. SQL Functions like epoch_millisecond
(Snowflake), unix_millis (Databricks) become very handy to transform
those timestamp values to unix epoch values. Then they can be converted to string which is safer when it comes to comparing timestamps across different systems.
- Python
- JSON
Transformation(
column_name= 'UPDATE_TIMESTAMP',
source= "coalesce(cast(EXTRACT(epoch_millisecond FROM UPDATE_TIMESTAMP) as string), '_null_recon_')",
target= "coalesce(cast(unix_millis(UPDATE_TIMESTAMP) as string), '_null_recon_')"
),
"transformations": [
{
"column_name": "UPDATE_TIMESTAMP",
"source": "coalesce(cast(EXTRACT(epoch_millisecond FROM UPDATE_TIMESTAMP) as string), '_null_recon_')",
"target": "coalesce(cast(unix_millis(UPDATE_TIMESTAMP) as string), '_null_recon_')"
}
]
Transformation Expressions | |||
---|---|---|---|
filename | function / variable | transformation_rule | description |
sampling_query.py | _get_join_clause | transform(coalesce, default="null_recon", is_string=True) | Applies the coalesce transformation function for String column and defaults to _null_recon_ if column is NULL |
expression_generator.py | DataType_transform_mapping | (coalesce, default='null_recon', is_string=True) | Default String column Transformation rule for all dialects. Applies the coalesce transformation function and defaults to _null_recon_ if column is NULL |
expression_generator.py | DataType_transform_mapping | "oracle": DataType...NCHAR: ..."NVL(TRIM(TO_CHAR..,'null_recon')" | Transformation rule for oracle dialect 'NCHAR' datatype. Applies TO_CHAR, TRIM transformation functions. If column is NULL, then defaults to _null_recon_ |
expression_generator.py | DataType_transform_mapping | "oracle": DataType...NVARCHAR: ..."NVL(TRIM(TO_CHAR..,'null_recon')" | Transformation rule for oracle dialect 'NVARCHAR' datatype. Applies TO_CHAR, TRIM transformation functions. If column is NULL, then defaults to _null_recon_ |
If you are applying any transformation for generating the Reconciliation report, it is recommended that you run the queries with transformation expressions both on Source and target to ensure, they are syntactically and semantically giving correct result.
If you need help with the exact reconciliation queries that are getting generated after you apply the transformations, you may search the reconciliation log with the following string:
“Hash query for [source or target]”
Column Thresholds
- Python
- JSON
@dataclass
class ColumnThresholds:
column_name: str
lower_bound: str
upper_bound: str
type: str
"column_thresholds": [
{
"column_name": "COLUMN_NAME",
"lower_bound": "LOWER_BOUND",
"upper_bound": "UPPER_BOUND",
"type": "DATA_TYPE"
}
]
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
column_name | string | the column that should be considered for column threshold reconciliation | required | "product_discount" |
lower_bound | string | the lower bound of the difference between the source value and the target value | required | "-5%" |
upper_bound | string | the upper bound of the difference between the source value and the target value | required | "5%" |
type | string | The user must specify the column type. Supports SQLGLOT DataType.NUMERIC_TYPES and DataType.TEMPORAL_TYPES . | required | "int" |
Table Thresholds
- Python
- JSON
@dataclass
class TableThresholds:
lower_bound: str
upper_bound: str
model: str
"table_thresholds": [
{
"lower_bound": "LOWER_BOUND",
"upper_bound": "UPPER_BOUND",
"model": "MODEL"
}
]
- The threshold bounds for the table must be non-negative, with the lower bound not exceeding the upper bound.
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
lower_bound | string | the lower bound of the difference between the source mismatch and the target mismatch count | required | 0% |
upper_bound | string | the upper bound of the difference between the source mismatch and the target mismatch count | required | 5% |
model | string | The user must specify on which table model it should be applied; for now, we support only "mismatch" | required | int |
Filters
- Python
- JSON
@dataclass
class Filters:
source: str
target: str
"filters": {
"source": "FILTER_EXPRESSION",
"target": "FILTER_EXPRESSION"
}
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
source | string | the sql expression to filter the data from source | optional(default=None) |
|
target | string | the sql expression to filter the data from target | optional(default=None) |
|
- The column names are always converted to lowercase and considered for reconciliation.
- Currently, it doesn't support case insensitivity and doesn't have collation support
- Table Transformation internally considers the default value as the column value. It doesn't apply any default
transformations
if not provided.
eg:Transformation(column_name="address",source_name=None,target_name="trim(s_address)")
For the given example, the source transformation is None, so the raw value in the source is considered for reconciliation. - If no user transformation is provided for a given column in the configuration by default, depending on the source data type, our reconciler will apply default transformation on both source and target to get the matching hash value in source and target. Please find the detailed default transformations here.
- Always the column reference to be source column names in all the configs, except Transformations and Filters as these are dialect-specific SQL expressions that are applied directly in the SQL.
- Transformations and Filters should always be in their respective dialect SQL expressions, and the reconciler will not apply any logic on top of this.
Aggregates Reconciliation
Aggregates Reconcile is an utility to streamline the reconciliation process, specific aggregate metric is compared between source and target data residing on Databricks.
Summary
operation_name | sample visualisation | description | key outputs captured in the recon metrics tables |
---|---|---|---|
aggregates-reconcile | [data]({useBaseUrl('docs/reconcile/aggregates_reconcile_visualisation.md#data')}) | reconciles the data for each aggregate metric join_columns are used to identify the mismatches at aggregated metric level | mismatch_data(sample data with mismatches captured at aggregated metric level ) missing_in_src(sample rows that are available in target but missing in source) missing_in_tgt(sample rows that are available in source but are missing in target) |
Supported Aggregate Functions
Aggregate Functions |
---|
min |
max |
count |
sum |
avg |
mean |
mode |
stddev |
variance |
median |
[back to aggregates-reconciliation]
Flow Chart
[back to aggregates-reconciliation]
Aggregate
- Python
- JSON
@dataclass
class Aggregate:
agg_columns: list[str]
type: str
group_by_columns: list[str] | None = None
{
"type": "MIN",
"agg_columns": ["COLUMN_NAME_3"],
"group_by_columns": ["GROUP_COLUMN_NAME"]
}
field_name | data_type | description | required/optional | example_value |
---|---|---|---|---|
type | string | Supported Aggregate Functions | required | MIN |
agg_columns | list[string] | list of columns names on which aggregate function needs to be applied | required | ["product_discount"] |
group_by_columns | list[string] | list of column names on which grouping needs to be applied | optional(default=None) | ["product_id"] or None |
[back to aggregates-reconciliation]
TABLE Config Examples:
Please refer TABLE Config Elements for Class and JSON configs.
- Python
- JSON
Table(
source_name="SOURCE_NAME",
target_name="TARGET_NAME",
join_columns=["COLUMN_NAME_1", "COLUMN_NAME_2"],
aggregates=[
Aggregate(
agg_columns=["COLUMN_NAME_3"],
type="MIN",
group_by_columns=["GROUP_COLUMN_NAME"]
),
Aggregate(
agg_columns=["COLUMN_NAME_4"],
type="MAX"
)
]
)
{
"source_name": "SOURCE_NAME",
"target_name": "TARGET_NAME",
"join_columns": ["COLUMN_NAME_1", "COLUMN_NAME_2"],
"aggregates": [
{
"type": "MIN",
"agg_columns": ["COLUMN_NAME_3"],
"group_by_columns": ["GROUP_COLUMN_NAME"]
},
{
"type": "MAX",
"agg_columns": ["COLUMN_NAME_4"]
}
]
}
Key Considerations:
- The aggregate column names, group by columns and type are always converted to lowercase and considered for reconciliation.
- Currently, it doesn't support aggregates on window function using the OVER clause.
- It doesn't support case insensitivity and does not have collation support
- The queries with “group by” column(s) are compared based on the same group by columns.
- The queries without “group by” column(s) are compared row-to-row.
- Existing features like
column_mapping
,transformations
,JDBCReaderOptions
andfilters
are leveraged for the aggregate metric reconciliation. - Existing
select_columns
anddrop_columns
are not considered for the aggregate metric reconciliation. - Even though the user provides the
select_columns
anddrop_columns
, those are not considered. - If Transformations are defined, those are applied to both the “aggregate columns” and “group by columns”.