Running Reconcile on Notebook
This page gives you a comprehensive guide on how to configure and run the Reconcile utility of Remorph on your Databricks workspace using Databricks notebook.
Installation
Before running the Reconcile utility, you need to install the required packages. You can install the required packages using the following command:
- GitHub
- PyPi
%pip install git+https://github.com/databrickslabs/remorph
dbutils.library.restartPython()
%pip install databricks-labs-remorph
dbutils.library.restartPython()
Imports
Import all the necessary modules.
from databricks.sdk import WorkspaceClient
from databricks.labs.remorph.config import (
DatabaseConfig,
ReconcileConfig,
ReconcileMetadataConfig,
TableRecon
)
from databricks.labs.remorph.reconcile.recon_config import (
Table,
ColumnMapping,
ColumnThresholds,
Transformation,
JdbcReaderOptions,
Aggregate,
Filters
)
from databricks.labs.remorph.reconcile.execute import (
recon,
reconcile_aggregates
)
from databricks.labs.remorph.reconcile.exception import ReconciliationException
Configure Reconcile Properties
We use the class ReconcileConfig
to configure the properties required for reconciliation.
@dataclass
class ReconcileConfig:
data_source: str
report_type: str
secret_scope: str
database_config: DatabaseConfig
metadata_config: ReconcileMetadataConfig
Parameters:
data_source
: The data source to be reconciled. Supported values:snowflake
,teradata
,oracle
,tsql
,databricks
etc.report_type
: The type of report to be generated. Available report types areschema
,row
,data
orall
. For details check here.secret_scope
: The secret scope name used to store the connection credentials for the source database system.database_config
: The database configuration for connecting to the source database. expects aDatabaseConfig
object.source_schema
: The source schema name.target_catalog
: The target catalog name.target_schema
: The target schema name (Databricks).source_catalog
: The source catalog name. (Optional and is applied to the source system that support catalog).
@dataclass
class DatabaseConfig:
source_schema: str
target_catalog: str
target_schema: str
source_catalog: str | None = None
metadata_config
: The metadata configuration. Reconcile uses this catalog & Schema on Databricks to store all the backend metadata details for reconciliation. expects aReconcileMetadataConfig
object.catalog
: The catalog name to store the metadata.schema
: The schema name to store the metadata.
@dataclass
class ReconcileMetadataConfig:
catalog: str = "remorph"
schema: str = "reconcile"
volume: str = "reconcile_volume"
If not set the default values will be used to store the metadata. The default resources are created during the installation of Remorph.
An Example of configuring the Reconcile properties:
from databricks.labs.remorph.config import (
DatabaseConfig,
ReconcileConfig,
ReconcileMetadataConfig
)
reconcile_config = ReconcileConfig(
data_source = "snowflake",
report_type = "all",
secret_scope = "snowflake-credential",
database_config= DatabaseConfig(source_catalog="source_sf_catalog",
source_schema="source_sf_schema",
target_catalog="target_databricks_catalog",
target_schema="target_databricks_schema"
),
metadata_config = ReconcileMetadataConfig(
catalog = "remorph_metadata",
schema= "reconcile"
)
)
Configure Table Properties
We use the class TableRecon
to configure the properties of the source & target tables to be reconciled.
@dataclass
class TableRecon:
source_schema: str
target_catalog: str
target_schema: str
tables: list[Table]
source_catalog: str | None = None
An Example Table properties for reconciliation:
from databricks.labs.remorph.config import TableRecon
from databricks.labs.remorph.reconcile.recon_config import (
Table,
ColumnMapping,
ColumnThresholds,
TableThresholds,
Transformation,
JdbcReaderOptions,
Aggregate,
Filters
)
table_recon = TableRecon(
source_schema="source_sf_schema",
target_catalog="target_databricks_catalog",
target_schema="target_databricks_schema",
tables=[
Table(
source_name="source_table_name",
target_name="target_table_name",
join_columns= ["store_id", "account_id"], # List of columns to join the source and target tables.
column_mapping=[
ColumnMapping(source_name="dept_id", target_name="department_id"),
ColumnMapping(source_name="cty_cd", target_name="country_code")
],
column_thresholds=[
ColumnThresholds(column_name="unit_price", upper_bound="-5", lower_bound="5", type="float")
],
table_thresholds=[
TableThresholds(lower_bound="0%", upper_bound="5%", model="mismatch")
],
transformations=[
Transformation(
column_name= 'inventory_units',
source= "coalesce(cast( cast(inventory_units as decimal(38,10)) as string),'_null_recon_')",
target= 'coalesce(replace(cast(format_number(cast(inventory_units as decimal(38, 10)), 10) as string),",", ""),"_null_recon_")',
)
,
Transformation(
column_name= 'scanout_dollars',
source= "coalesce(cast( cast(scanout_dollars as decimal(38,10)) as string) ,'_null_recon_')",
target= 'coalesce(replace(cast(format_number(cast(scanout_dollars as decimal(38, 10)), 10) as string),",", ""),"_null_recon_")',
)
],
jdbc_reader_options=JdbcReaderOptions(
number_partitions=50,
partition_column="lct_nbr",
lower_bound="1",
upper_bound="50000",
),
aggregates=[
Aggregate(agg_columns=["inventory_units"], type="count"),
Aggregate(agg_columns=["unit_price"], type="min"),
Aggregate(agg_columns=["unit_price"], type="max")
],
filters= Filters(
source="lower(dept_name)='finance'",
target="lower(department_name)='finance'")
)
]
)
Run Reconcile
To run Reconcile on the configured properties, use the recon
method. you also need to pass a WorkspaceClient
to recon
.
from databricks.labs.remorph import __version__
from databricks.sdk import WorkspaceClient
from databricks.labs.remorph.reconcile.execute import recon
from databricks.labs.remorph.reconcile.exception import ReconciliationException
ws = WorkspaceClient(product="remorph", product_version=__version__)
try:
result = recon(
ws = ws,
spark = spark, # notebook spark session
table_recon = table_recon, # previously created
reconcile_config = reconcile_config # previously created
)
print(result.recon_id)
print(result)
print("***************************")
except ReconciliationException as e:
recon_id = e.reconcile_output.recon_id
print(f" Failed : {recon_id}")
print(e)
print("***************************")
except Exception as e:
print(e.with_traceback)
raise e
print(f"Exception : {str(e)}")
print("***************************")
Visualization
When you install remorph using databricks cli, it deploys an AI/BI Dashboard on your workspace.
This dashboard gives you a detailed report of all the reconciliation runs on your workspace. After every reconciliation run, you get a
recon_id
. You can use this recon_id to drill down into the details of that particular reconciliation run on the dashboard.