Skip to main content

Running Reconcile on Notebook

This page gives you a comprehensive guide on how to configure and run the Reconcile utility of Remorph on your Databricks workspace using Databricks notebook.

Installation

Before running the Reconcile utility, you need to install the required packages. You can install the required packages using the following command:

%pip install git+https://github.com/databrickslabs/remorph
dbutils.library.restartPython()

Imports

Import all the necessary modules.

from databricks.sdk import WorkspaceClient

from databricks.labs.remorph.config import (
DatabaseConfig,
ReconcileConfig,
ReconcileMetadataConfig,
TableRecon
)
from databricks.labs.remorph.reconcile.recon_config import (
Table,
ColumnMapping,
ColumnThresholds,
Transformation,
JdbcReaderOptions,
Aggregate,
Filters
)
from databricks.labs.remorph.reconcile.execute import (
recon,
reconcile_aggregates
)
from databricks.labs.remorph.reconcile.exception import ReconciliationException

Configure Reconcile Properties

We use the class ReconcileConfig to configure the properties required for reconciliation.

@dataclass
class ReconcileConfig:
data_source: str
report_type: str
secret_scope: str
database_config: DatabaseConfig
metadata_config: ReconcileMetadataConfig

Parameters:

  • data_source: The data source to be reconciled. Supported values: snowflake, teradata, oracle, tsql, databricks etc.
  • report_type: The type of report to be generated. Available report types are schema, row, data or all. For details check here.
  • secret_scope: The secret scope name used to store the connection credentials for the source database system.
  • database_config: The database configuration for connecting to the source database. expects a DatabaseConfig object.
    • source_schema: The source schema name.
    • target_catalog: The target catalog name.
    • target_schema: The target schema name (Databricks).
    • source_catalog: The source catalog name. (Optional and is applied to the source system that support catalog).
@dataclass
class DatabaseConfig:
source_schema: str
target_catalog: str
target_schema: str
source_catalog: str | None = None
  • metadata_config: The metadata configuration. Reconcile uses this catalog & Schema on Databricks to store all the backend metadata details for reconciliation. expects a ReconcileMetadataConfig object.
    • catalog: The catalog name to store the metadata.
    • schema: The schema name to store the metadata.
@dataclass
class ReconcileMetadataConfig:
catalog: str = "remorph"
schema: str = "reconcile"
volume: str = "reconcile_volume"

If not set the default values will be used to store the metadata. The default resources are created during the installation of Remorph.

An Example of configuring the Reconcile properties:

from databricks.labs.remorph.config import (
DatabaseConfig,
ReconcileConfig,
ReconcileMetadataConfig
)

reconcile_config = ReconcileConfig(
data_source = "snowflake",
report_type = "all",
secret_scope = "snowflake-credential",
database_config= DatabaseConfig(source_catalog="source_sf_catalog",
source_schema="source_sf_schema",
target_catalog="target_databricks_catalog",
target_schema="target_databricks_schema"
),
metadata_config = ReconcileMetadataConfig(
catalog = "remorph_metadata",
schema= "reconcile"
)
)

Configure Table Properties

We use the class TableRecon to configure the properties of the source & target tables to be reconciled.

@dataclass
class TableRecon:
source_schema: str
target_catalog: str
target_schema: str
tables: list[Table]
source_catalog: str | None = None

An Example Table properties for reconciliation:

from databricks.labs.remorph.config import TableRecon
from databricks.labs.remorph.reconcile.recon_config import (
Table,
ColumnMapping,
ColumnThresholds,
TableThresholds,
Transformation,
JdbcReaderOptions,
Aggregate,
Filters
)

table_recon = TableRecon(
source_schema="source_sf_schema",
target_catalog="target_databricks_catalog",
target_schema="target_databricks_schema",
tables=[
Table(
source_name="source_table_name",
target_name="target_table_name",
join_columns= ["store_id", "account_id"], # List of columns to join the source and target tables.
column_mapping=[
ColumnMapping(source_name="dept_id", target_name="department_id"),
ColumnMapping(source_name="cty_cd", target_name="country_code")
],
column_thresholds=[
ColumnThresholds(column_name="unit_price", upper_bound="-5", lower_bound="5", type="float")
],
table_thresholds=[
TableThresholds(lower_bound="0%", upper_bound="5%", model="mismatch")
],
transformations=[
Transformation(
column_name= 'inventory_units',
source= "coalesce(cast( cast(inventory_units as decimal(38,10)) as string),'_null_recon_')",
target= 'coalesce(replace(cast(format_number(cast(inventory_units as decimal(38, 10)), 10) as string),",", ""),"_null_recon_")',
)
,
Transformation(
column_name= 'scanout_dollars',
source= "coalesce(cast( cast(scanout_dollars as decimal(38,10)) as string) ,'_null_recon_')",
target= 'coalesce(replace(cast(format_number(cast(scanout_dollars as decimal(38, 10)), 10) as string),",", ""),"_null_recon_")',
)
],
jdbc_reader_options=JdbcReaderOptions(
number_partitions=50,
partition_column="lct_nbr",
lower_bound="1",
upper_bound="50000",
),
aggregates=[
Aggregate(agg_columns=["inventory_units"], type="count"),
Aggregate(agg_columns=["unit_price"], type="min"),
Aggregate(agg_columns=["unit_price"], type="max")
],
filters= Filters(
source="lower(dept_name)='finance'",
target="lower(department_name)='finance'")
)
]
)

Run Reconcile

To run Reconcile on the configured properties, use the recon method. you also need to pass a WorkspaceClient to recon.

from databricks.labs.remorph import __version__
from databricks.sdk import WorkspaceClient

from databricks.labs.remorph.reconcile.execute import recon
from databricks.labs.remorph.reconcile.exception import ReconciliationException

ws = WorkspaceClient(product="remorph", product_version=__version__)


try:
result = recon(
ws = ws,
spark = spark, # notebook spark session
table_recon = table_recon, # previously created
reconcile_config = reconcile_config # previously created
)
print(result.recon_id)
print(result)
print("***************************")
except ReconciliationException as e:
recon_id = e.reconcile_output.recon_id
print(f" Failed : {recon_id}")
print(e)
print("***************************")
except Exception as e:
print(e.with_traceback)
raise e
print(f"Exception : {str(e)}")
print("***************************")

Visualization

When you install remorph using databricks cli, it deploys an AI/BI Dashboard on your workspace. This dashboard gives you a detailed report of all the reconciliation runs on your workspace. After every reconciliation run, you get a recon_id. You can use this recon_id to drill down into the details of that particular reconciliation run on the dashboard.