SSIS Conversion Examples
Before/after examples showing how SSIS constructs are converted to Databricks. For the component support matrix, see SSIS Supported Components.
Data Flow Transformations
Derived Column
SSIS:
Derived Column Transformation
Columns:
FullName = FirstName + " " + LastName
AgeGroup = Age < 30 ? "Young" : (Age < 60 ? "Middle" : "Senior")
ProcessDate = GETDATE()
Databricks:
Derived_Column_Add_Columns = f"""
SELECT
*,
CONCAT(FirstName, ' ', LastName) AS FullName,
CASE
WHEN Age < 30 THEN 'Young'
WHEN Age < 60 THEN 'Middle'
ELSE 'Senior'
END AS AgeGroup,
CURRENT_TIMESTAMP() AS ProcessDate
FROM source_table
"""
Derived_Column_Add_Columns = spark.sql(Derived_Column_Add_Columns)
Derived_Column_Add_Columns.createOrReplaceTempView('Derived_Column_Add_Columns')
Conditional Split
SSIS:
Conditional Split Transformation
Outputs:
HighValue: Amount > 1000
MediumValue: Amount > 100 AND Amount <= 1000
LowValue: Default Output
Databricks:
Conditional_Split_High_Value = f"""
SELECT * FROM source_table WHERE Amount > 1000
"""
Conditional_Split_High_Value = spark.sql(Conditional_Split_High_Value)
Conditional_Split_High_Value.createOrReplaceTempView('Conditional_Split_High_Value')
Conditional_Split_Medium_Value = f"""
SELECT * FROM source_table WHERE Amount > 100 AND Amount <= 1000
"""
Conditional_Split_Medium_Value = spark.sql(Conditional_Split_Medium_Value)
Conditional_Split_Medium_Value.createOrReplaceTempView('Conditional_Split_Medium_Value')
Conditional_Split_Low_Value = f"""
SELECT * FROM source_table WHERE Amount <= 100
"""
Conditional_Split_Low_Value = spark.sql(Conditional_Split_Low_Value)
Conditional_Split_Low_Value.createOrReplaceTempView('Conditional_Split_Low_Value')
Lookup Transformation
SSIS:
Lookup Transformation
Reference Table: DimCustomerType
Join Columns: CustomerTypeID = TypeID
Lookup Columns: TypeName, TypeDescription
Cache Mode: Full Cache
Databricks:
Lookup_Customer_Type = f"""
SELECT
src.*,
ref.TypeName,
ref.TypeDescription
FROM source_table src
LEFT JOIN dim.customer_type ref
ON src.CustomerTypeID = ref.TypeID
"""
Lookup_Customer_Type = spark.sql(Lookup_Customer_Type)
Lookup_Customer_Type.createOrReplaceTempView('Lookup_Customer_Type')
Variables and Expressions
SSIS Variables
SSIS:
Variables:
User::MaxProcessDate (DateTime)
User::RowCount (Int32)
User::SourceFolder (String)
Databricks:
V_MaxProcessDate = f'2024-01-01'
V_SourceFolder = f'/mnt/source/'
Incremental_Data_Query = f"""
SELECT *
FROM source_table
WHERE ProcessDate > '{V_MaxProcessDate}'
"""
spark.sql(Incremental_Data_Query)
SSIS Expressions
SSIS:
@[User::SourceFolder] + "customers_" +
(DT_WSTR, 8) DATEPART("yyyy", GETDATE()) +
RIGHT("0" + (DT_WSTR, 2) DATEPART("mm", GETDATE()), 2) + ".csv"
Databricks:
from datetime import datetime
V_SourceFolder = f'/mnt/source/'
current_date = datetime.now()
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
V_FilePath = f"{V_SourceFolder}customers_{year}{month}.csv"
Read_Customers_Data = f"""SELECT * FROM csv.`{V_FilePath}`"""
Read_Customers_Data = spark.sql(Read_Customers_Data)
Read_Customers_Data.createOrReplaceTempView('Read_Customers_Data')
Control Flow Patterns
ForEach Loop Container
SSIS:
ForEach Loop Container (File Enumerator)
Folder: C:\Data\Input\
Files: *.csv
Tasks: Data Flow, Execute SQL (log processing)
Databricks:
V_InputFolder = f'/mnt/data/input/'
ForEach_Read_All_Files = f"""
SELECT
*,
input_file_name() AS source_file,
CURRENT_TIMESTAMP() AS process_date
FROM csv.`{V_InputFolder}*.csv`
"""
ForEach_Read_All_Files = spark.sql(ForEach_Read_All_Files)
ForEach_Read_All_Files.createOrReplaceTempView('ForEach_Read_All_Files')
ForEach_Insert_Customer_Data = f"""
INSERT INTO staging.customer_data
SELECT * FROM ForEach_Read_All_Files
"""
spark.sql(ForEach_Insert_Customer_Data)
ForEach_Log_Processing = f"""
INSERT INTO logs.processing_log
SELECT source_file AS file_path, COUNT(*) AS row_count, MAX(process_date) AS process_time
FROM ForEach_Read_All_Files
GROUP BY source_file
"""
spark.sql(ForEach_Log_Processing)
Execute SQL Task
SSIS:
Execute SQL Task
SQL Statement:
TRUNCATE TABLE staging.customer_temp;
INSERT INTO staging.customer_temp
SELECT * FROM staging.customer_stage WHERE process_date >= ?;
Parameter Mapping:
User::MaxProcessDate → Parameter 0
Databricks:
V_MaxProcessDate = f'2024-01-01'
Execute_SQL_Truncate = f"""TRUNCATE TABLE staging.customer_temp"""
spark.sql(Execute_SQL_Truncate)
Execute_SQL_Insert = f"""
INSERT INTO staging.customer_temp
SELECT * FROM staging.customer_stage
WHERE process_date >= '{V_MaxProcessDate}'
"""
spark.sql(Execute_SQL_Insert)
Script Component Conversion
Script Task (Control Flow)
SSIS (C#):
public void Main()
{
string sourceFolder = Dts.Variables["User::SourceFolder"].Value.ToString();
int fileCount = Directory.GetFiles(sourceFolder, "*.csv").Length;
Dts.Variables["User::FileCount"].Value = fileCount;
Dts.TaskResult = (int)ScriptResults.Success;
}
Databricks (Python):
V_SourceFolder = f'/mnt/source/'
file_list = dbutils.fs.ls(V_SourceFolder)
V_FileCount = str(len([f for f in file_list if f.path.endswith('.csv')]))
Script Component (Data Flow)
SSIS (C#):
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
if (Row.Amount > 1000) {
Row.PriorityFlag = "HIGH";
Row.DiscountRate = 0.15;
} else {
Row.PriorityFlag = "NORMAL";
Row.DiscountRate = 0.05;
}
Row.ProcessedDate = DateTime.Now;
}
Databricks:
Script_Component_Custom_Logic = f"""
SELECT
*,
CASE WHEN Amount > 1000 THEN 'HIGH' ELSE 'NORMAL' END AS PriorityFlag,
CASE WHEN Amount > 1000 THEN 0.15 ELSE 0.05 END AS DiscountRate,
CURRENT_TIMESTAMP() AS ProcessedDate
FROM source_table
"""
Script_Component_Custom_Logic = spark.sql(Script_Component_Custom_Logic)
Script_Component_Custom_Logic.createOrReplaceTempView('Script_Component_Custom_Logic')
Complete Package Example
SSIS Package: LoadCustomerData.dtsx
Control Flow:
- Execute SQL Task: Get Destination Table Name
- Execute SQL Task: Truncate Staging Table
- Execute SQL Task: Load Customer Data
- Execute SQL Task: Update Control Table
Converted Databricks Notebook:
# Databricks notebook source
ProcessName = f'CustomerETL'
# COMMAND ----------
# Get Destination Table Name
Package_Get_Destination_Table_Name = f"""
DECLARE VARIABLE V_TblNm STRING;
CALL aud.spGetPackageDesTbl({ProcessName}, V_TblNm);
SELECT V_TblNm AS DestinationTable;
"""
spark.sql(Package_Get_Destination_Table_Name)
# COMMAND ----------
# Truncate Staging Table
spark.sql(f"""TRUNCATE TABLE staging.customers;""")
# COMMAND ----------
# Load Customer Data
Package_Load_Customer_Data = f"""
INSERT INTO staging.customers (CustomerID, CustomerName, Email, Status, ProcessDate)
SELECT CustomerID, CustomerName, Email, Status, current_timestamp() AS ProcessDate
FROM source.customers
WHERE Status = 'Active';
"""
spark.sql(Package_Load_Customer_Data)
# COMMAND ----------
# Update Control Table
Package_Update_Control_Table = f"""
INSERT INTO control.load_log (process_name, rows_processed, process_date)
SELECT {ProcessName} AS process_name, COUNT(*) AS rows_processed, current_timestamp() AS process_date
FROM dw.customers
WHERE ProcessDate >= CAST(current_timestamp() AS DATE);
"""
spark.sql(Package_Update_Control_Table)