Running Reconcile on Notebook
This page gives you a comprehensive guide on how to configure and run the Reconcile utility of Lakebridge on your Databricks workspace using Databricks notebook.
Installation
Before running the Reconcile utility, you need to install the required packages. You can install the required packages using the following command:
- GitHub
 - PyPi
 
%pip install git+https://github.com/databrickslabs/lakebridge
dbutils.library.restartPython()
%pip install databricks-labs-lakebridge
dbutils.library.restartPython()
Imports
Import all the necessary modules.
from databricks.sdk import WorkspaceClient
from databricks.labs.lakebridge.config import (
    DatabaseConfig,
    ReconcileConfig,
    ReconcileMetadataConfig,
    TableRecon
)
from databricks.labs.lakebridge.reconcile.recon_config import (
    Table,
    ColumnMapping,
    ColumnThresholds,
    Transformation,
    JdbcReaderOptions,
    Aggregate,
    Filters
)
from databricks.labs.lakebridge.reconcile.trigger_recon_service import TriggerReconService
from databricks.labs.lakebridge.reconcile.trigger_recon_aggregate_service import TriggerReconAggregateService
from databricks.labs.lakebridge.reconcile.exception import ReconciliationException
Configure Reconcile Properties
We use the class ReconcileConfig to configure the properties required for reconciliation.
@dataclass
class ReconcileConfig:
    data_source: str
    report_type: str
    secret_scope: str
    database_config: DatabaseConfig
    metadata_config: ReconcileMetadataConfig
Parameters:
data_source: The data source to be reconciled. Supported values:snowflake,teradata,oracle,mssql,synapse,databricks.report_type: The type of report to be generated. Available report types areschema,row,dataorall. For details check here.secret_scope: The secret scope name used to store the connection credentials for the source database system.database_config: The database configuration for connecting to the source database. expects aDatabaseConfigobject.source_schema: The source schema name.target_catalog: The target catalog name.target_schema: The target schema name (Databricks).source_catalog: The source catalog name. (Optional and is applied to the source system that support catalog).
@dataclass
class DatabaseConfig:
    source_schema: str
    target_catalog: str
    target_schema: str
    source_catalog: str | None = None
metadata_config: The metadata configuration. Reconcile uses this catalog & Schema on Databricks to store all the backend metadata details for reconciliation. expects aReconcileMetadataConfigobject.catalog: The catalog name to store the metadata.schema: The schema name to store the metadata.
@dataclass
class ReconcileMetadataConfig:
    catalog: str = "lakebridge"
    schema: str = "reconcile"
    volume: str = "reconcile_volume"
If not set the default values will be used to store the metadata. The default resources are created during the installation of Lakebridge.
An Example of configuring the Reconcile properties:
from databricks.labs.lakebridge.config import (
    DatabaseConfig,
    ReconcileConfig,
    ReconcileMetadataConfig
)
reconcile_config = ReconcileConfig(
         data_source = "snowflake",
         report_type = "all",
         secret_scope = "snowflake-credential",
         database_config= DatabaseConfig(source_catalog="source_sf_catalog",
                                         source_schema="source_sf_schema",
                                         target_catalog="target_databricks_catalog",
                                         target_schema="target_databricks_schema"
                          ),
         metadata_config = ReconcileMetadataConfig(
              catalog = "lakebridge_metadata",
              schema= "reconcile"
         )
    )
Configure Table Properties
We use the class TableRecon to configure the properties of the source & target tables to be reconciled.
@dataclass
class TableRecon:
    source_schema: str
    target_catalog: str
    target_schema: str
    tables: list[Table]
    source_catalog: str | None = None
An Example Table properties for reconciliation:
from databricks.labs.lakebridge.config import TableRecon
from databricks.labs.lakebridge.reconcile.recon_config import (
    Table,
    ColumnMapping,
    ColumnThresholds,
    TableThresholds,
    Transformation,
    JdbcReaderOptions,
    Aggregate,
    Filters
)
table_recon = TableRecon(
    source_schema="source_sf_schema",
    target_catalog="target_databricks_catalog",
    target_schema="target_databricks_schema",
    tables=[
        Table(
            source_name="source_table_name",
            target_name="target_table_name",
            join_columns= ["store_id", "account_id"], # List of columns to join the source and target tables.
            column_mapping=[
                ColumnMapping(source_name="dept_id", target_name="department_id"),
                ColumnMapping(source_name="cty_cd", target_name="country_code")
            ],
            column_thresholds=[
                ColumnThresholds(column_name="unit_price", upper_bound="-5", lower_bound="5", type="float")
            ],
           table_thresholds=[
                 TableThresholds(lower_bound="0%", upper_bound="5%", model="mismatch")
           ],
            transformations=[
                Transformation(
                column_name= 'inventory_units',
                source= "coalesce(cast( cast(inventory_units as decimal(38,10)) as string),'_null_recon_')",
                target= 'coalesce(replace(cast(format_number(cast(inventory_units as decimal(38, 10)), 10) as string),",", ""),"_null_recon_")',
              )
               ,
                 Transformation(
                column_name= 'scanout_dollars',
                source= "coalesce(cast( cast(scanout_dollars as decimal(38,10)) as string) ,'_null_recon_')",
                target= 'coalesce(replace(cast(format_number(cast(scanout_dollars as decimal(38, 10)), 10) as string),",", ""),"_null_recon_")',
              )
            ],
            jdbc_reader_options=JdbcReaderOptions(
                number_partitions=50,
                partition_column="lct_nbr",
                lower_bound="1",
                upper_bound="50000",
            ),
            aggregates=[
                Aggregate(agg_columns=["inventory_units"], type="count"),
                Aggregate(agg_columns=["unit_price"], type="min"),
                Aggregate(agg_columns=["unit_price"], type="max")
            ],
            filters= Filters(
                        source="lower(dept_name)='finance'",
                        target="lower(department_name)='finance'")
        )
    ]
)
Run Reconcile
To run Reconcile on the configured properties, use the recon method. you also need to pass a WorkspaceClient to recon.
from databricks.labs.lakebridge import __version__
from databricks.sdk import WorkspaceClient
from databricks.labs.lakebridge.reconcile.trigger_recon_service import TriggerReconService
from databricks.labs.lakebridge.reconcile.exception import ReconciliationException
ws = WorkspaceClient(product="lakebridge", product_version=__version__)
try:
  result = TriggerReconService.trigger_recon(
            ws = ws,
            spark = spark, # notebook spark session
            table_recon = table_recon, # previously created
            reconcile_config = reconcile_config # previously created
          )
  print(result.recon_id)
  print(result)
  print("***************************")
except ReconciliationException as e:
    recon_id = e.reconcile_output.recon_id
    print(f" Failed : {recon_id}")
    print(e)
    print("***************************")
except Exception as e:
    print(e.with_traceback)
    raise e
    print(f"Exception : {str(e)}")
    print("***************************")
Visualization
When you install Lakebridge using databricks cli, it deploys an AI/BI Dashboard on your workspace.
This dashboard gives you a detailed report of all the reconciliation runs on your workspace. After every reconciliation run, you get a
recon_id. You can use this recon_id to drill down into the details of that particular reconciliation run on the dashboard.