databricks.labs.dqx.telemetry
log_telemetry
def log_telemetry(ws: WorkspaceClient, key: str, value: str) -> None
Trace specific telemetry information in the Databricks workspace by setting user agent extra info.
Arguments:
ws- WorkspaceClientkey- telemetry key to logvalue- telemetry value to log
telemetry_logger
def telemetry_logger(key: str,
value: str,
workspace_client_attr: str = "ws") -> Callable
Decorator to log telemetry for method calls. By default, it expects the decorated method to have "ws" attribute for workspace client.
Usage: @telemetry_logger("telemetry_key", "telemetry_value") # Uses "ws" attribute for workspace client by default @telemetry_logger("telemetry_key", "telemetry_value", "my_ws_client") # Custom attribute
Arguments:
key- Telemetry key to logvalue- Telemetry value to logworkspace_client_attr- Name of the workspace client attribute on the class (defaults to "ws")
log_dataframe_telemetry
def log_dataframe_telemetry(ws: WorkspaceClient, spark: SparkSession,
df: DataFrame)
Log telemetry information about a Spark DataFrame to the Databricks workspace including:
- List of tables used as inputs (hashed)
- List of file paths used as inputs (hashed, excluding paths from tables)
- Whether the DataFrame is streaming
- Whether running in a Delta Live Tables (DLT) pipeline
This function is designed to never throw exceptions - it will log errors but continue execution to ensure telemetry failures don't break the main application flow.
Arguments:
ws- WorkspaceClientspark- SparkSessiondf- DataFrame to analyze
Returns:
None
get_tables_from_spark_plan
def get_tables_from_spark_plan(plan_str: str) -> set[str]
Extract table names from the Analyzed Logical Plan section of a Spark execution plan.
This function parses the Analyzed Logical Plan section and identifies table references by finding SubqueryAlias nodes, which Spark uses to represent table references in the logical plan. File-based sources (e.g., Delta files from volumes) and in-memory DataFrames do not create SubqueryAlias nodes and therefore won't be counted as tables.
Arguments:
plan_str- The complete Spark execution plan string (from df.explain(True))
Returns:
A set of distinct table names found in the plan. Returns empty set if no Analyzed Logical Plan section is found or no tables are referenced.
get_paths_from_spark_plan
def get_paths_from_spark_plan(plan_str: str,
table_names: set[str] | None = None) -> set[str]
Extract file paths from the Physical Plan section of a Spark execution plan.
This function parses the Physical Plan section and identifies file path references by finding any *FileIndex patterns in the Location field (e.g., PreparedDeltaFileIndex, ParquetFileIndex, etc.). These paths represent direct file-based data sources (e.g., files from volumes, DBFS, S3, etc.) that are not registered as tables.
Arguments:
plan_str- The complete Spark execution plan string (from df.explain(True))table_names- Optional set of table names to exclude (paths associated with tables are skipped)
Returns:
A set of distinct file paths found in the plan. Returns empty set if no Physical Plan section is found or no paths are referenced.
is_dlt_pipeline
def is_dlt_pipeline(spark: SparkSession) -> bool
Determine if the current Spark session is running within a Databricks Delta Live Tables (DLT) pipeline.
Arguments:
spark- The SparkSession to check
Returns:
True if running in a DLT pipeline, False otherwise
get_spark_plan_as_string
def get_spark_plan_as_string(df: DataFrame) -> str
Retrieve the Spark execution plan as a string by capturing df.explain() output.
This function temporarily redirects stdout to capture the output of df.explain(True), which prints the detailed execution plan including the Analyzed Logical Plan.
Arguments:
df- The Spark DataFrame to get the execution plan from
Returns:
The complete execution plan as a string, or empty string if explain() fails