databricks.labs.dqx.utils
get_column_name_or_alias
def get_column_name_or_alias(
column: str | Column | ConnectColumn,
normalize: bool = False,
allow_simple_expressions_only: bool = False) -> str
Extracts the column alias or name from a PySpark Column or ConnectColumn expression.
PySpark does not provide direct access to the alias of an unbound column, so this function parses the alias from the column's string representation.
- Supports columns with one or multiple aliases.
- Ensures the extracted expression is truncated to 255 characters.
- Provides an optional normalization step for consistent naming.
Arguments:
column
- Column, ConnectColumn or string representing a column.normalize
- If True, normalizes the column name (removes special characters, converts to lowercase).allow_simple_expressions_only
- If True, raises an error if the column expression is not a simple expression. Complex PySpark expressions (e.g., conditionals, arithmetic, or nested transformations), cannot be fully reconstructed correctly when converting to string (e.g. F.col("a") + F.lit(1)). However, in certain situations this is acceptable, e.g. when using the output for reporting purposes.
Returns:
The extracted column alias or name.
Raises:
ValueError
- If the column expression is invalid.TypeError
- If the column type is unsupported.
get_columns_as_strings
def get_columns_as_strings(
columns: list[str | Column],
allow_simple_expressions_only: bool = True) -> list[str]
Extracts column names from a list of PySpark Column or ConnectColumn expressions.
This function processes each column, ensuring that only valid column names are returned.
Arguments:
columns
- List of columns, ConnectColumns or strings representing columns.allow_simple_expressions_only
- If True, raises an error if the column expression is not a simple expression.
Returns:
List of column names as strings.
is_simple_column_expression
def is_simple_column_expression(col_name: str) -> bool
Returns True if the column name does not contain any disallowed characters: space, comma, semicolon, curly braces, parentheses, newline, tab, or equals sign.
Arguments:
col_name
- Column name to validate.
Returns:
True if the column name is valid, False otherwise.
normalize_bound_args
def normalize_bound_args(val: Any) -> Any
Normalize a value or collection of values for consistent processing.
Handles primitives, dates, and column-like objects. Lists, tuples, and sets are recursively normalized with type preserved.
Arguments:
val
- Value or collection of values to normalize.
Returns:
Normalized value or collection.
Raises:
ValueError
- If a column resolves to an invalid name.TypeError
- If a column type is unsupported.
normalize_col_str
def normalize_col_str(col_str: str) -> str
Normalizes string to be compatible with metastore column names by applying the following transformations:
- remove special characters
- convert to lowercase
- limit the length to 255 characters to be compatible with metastore column names
Arguments:
col_str
- Column or string representing a column.
Returns:
Normalized column name.
read_input_data
def read_input_data(spark: SparkSession,
input_config: InputConfig) -> DataFrame
Reads input data from the specified location and format.
Arguments:
spark
- SparkSessioninput_config
- InputConfig with source location/table name, format, and options
Returns:
DataFrame with values read from the input data
get_reference_dataframes
def get_reference_dataframes(
spark: SparkSession,
reference_tables: dict[str, InputConfig] | None = None
) -> dict[str, DataFrame] | None
Get reference DataFrames from the provided reference tables configuration.
Arguments:
spark
- SparkSessionreference_tables
- A dictionary mapping of reference table names to their input configurations.
Examples:
reference_tables = {
"reference_table_1": InputConfig(location="db.schema.table1", format="delta"),
"reference_table_2": InputConfig(location="db.schema.table2", format="delta")
}
Returns:
A dictionary mapping reference table names to their DataFrames.
save_dataframe_as_table
def save_dataframe_as_table(df: DataFrame, output_config: OutputConfig)
Helper method to save a DataFrame to a Delta table.
Arguments:
df
- The DataFrame to saveoutput_config
- Output table name, write mode, and options
safe_json_load
def safe_json_load(value: str)
Safely load a JSON string, returning the original value if it fails to parse. This allows to specify string value without a need to escape the quotes.
Arguments:
value
- The value to parse as JSON.