Skip to main content

SSIS Conversion Examples

Before/after examples showing how SSIS constructs are converted to Databricks. For the component support matrix, see SSIS Supported Components.


Data Flow Transformations

Derived Column

SSIS:

Derived Column Transformation
Columns:
FullName = FirstName + " " + LastName
AgeGroup = Age < 30 ? "Young" : (Age < 60 ? "Middle" : "Senior")
ProcessDate = GETDATE()

Databricks:

Derived_Column_Add_Columns = f"""
SELECT
*,
CONCAT(FirstName, ' ', LastName) AS FullName,
CASE
WHEN Age < 30 THEN 'Young'
WHEN Age < 60 THEN 'Middle'
ELSE 'Senior'
END AS AgeGroup,
CURRENT_TIMESTAMP() AS ProcessDate
FROM source_table
"""
Derived_Column_Add_Columns = spark.sql(Derived_Column_Add_Columns)
Derived_Column_Add_Columns.createOrReplaceTempView('Derived_Column_Add_Columns')

Conditional Split

SSIS:

Conditional Split Transformation
Outputs:
HighValue: Amount > 1000
MediumValue: Amount > 100 AND Amount <= 1000
LowValue: Default Output

Databricks:

Conditional_Split_High_Value = f"""
SELECT * FROM source_table WHERE Amount > 1000
"""
Conditional_Split_High_Value = spark.sql(Conditional_Split_High_Value)
Conditional_Split_High_Value.createOrReplaceTempView('Conditional_Split_High_Value')

Conditional_Split_Medium_Value = f"""
SELECT * FROM source_table WHERE Amount > 100 AND Amount <= 1000
"""
Conditional_Split_Medium_Value = spark.sql(Conditional_Split_Medium_Value)
Conditional_Split_Medium_Value.createOrReplaceTempView('Conditional_Split_Medium_Value')

Conditional_Split_Low_Value = f"""
SELECT * FROM source_table WHERE Amount <= 100
"""
Conditional_Split_Low_Value = spark.sql(Conditional_Split_Low_Value)
Conditional_Split_Low_Value.createOrReplaceTempView('Conditional_Split_Low_Value')

Lookup Transformation

SSIS:

Lookup Transformation
Reference Table: DimCustomerType
Join Columns: CustomerTypeID = TypeID
Lookup Columns: TypeName, TypeDescription
Cache Mode: Full Cache

Databricks:

Lookup_Customer_Type = f"""
SELECT
src.*,
ref.TypeName,
ref.TypeDescription
FROM source_table src
LEFT JOIN dim.customer_type ref
ON src.CustomerTypeID = ref.TypeID
"""
Lookup_Customer_Type = spark.sql(Lookup_Customer_Type)
Lookup_Customer_Type.createOrReplaceTempView('Lookup_Customer_Type')

Variables and Expressions

SSIS Variables

SSIS:

Variables:
User::MaxProcessDate (DateTime)
User::RowCount (Int32)
User::SourceFolder (String)

Databricks:

V_MaxProcessDate = f'2024-01-01'
V_SourceFolder = f'/mnt/source/'

Incremental_Data_Query = f"""
SELECT *
FROM source_table
WHERE ProcessDate > '{V_MaxProcessDate}'
"""
spark.sql(Incremental_Data_Query)

SSIS Expressions

SSIS:

@[User::SourceFolder] + "customers_" +
(DT_WSTR, 8) DATEPART("yyyy", GETDATE()) +
RIGHT("0" + (DT_WSTR, 2) DATEPART("mm", GETDATE()), 2) + ".csv"

Databricks:

from datetime import datetime

V_SourceFolder = f'/mnt/source/'
current_date = datetime.now()
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
V_FilePath = f"{V_SourceFolder}customers_{year}{month}.csv"

Read_Customers_Data = f"""SELECT * FROM csv.`{V_FilePath}`"""
Read_Customers_Data = spark.sql(Read_Customers_Data)
Read_Customers_Data.createOrReplaceTempView('Read_Customers_Data')

Control Flow Patterns

ForEach Loop Container

SSIS:

ForEach Loop Container (File Enumerator)
Folder: C:\Data\Input\
Files: *.csv
Tasks: Data Flow, Execute SQL (log processing)

Databricks:

V_InputFolder = f'/mnt/data/input/'

ForEach_Read_All_Files = f"""
SELECT
*,
input_file_name() AS source_file,
CURRENT_TIMESTAMP() AS process_date
FROM csv.`{V_InputFolder}*.csv`
"""
ForEach_Read_All_Files = spark.sql(ForEach_Read_All_Files)
ForEach_Read_All_Files.createOrReplaceTempView('ForEach_Read_All_Files')

ForEach_Insert_Customer_Data = f"""
INSERT INTO staging.customer_data
SELECT * FROM ForEach_Read_All_Files
"""
spark.sql(ForEach_Insert_Customer_Data)

ForEach_Log_Processing = f"""
INSERT INTO logs.processing_log
SELECT source_file AS file_path, COUNT(*) AS row_count, MAX(process_date) AS process_time
FROM ForEach_Read_All_Files
GROUP BY source_file
"""
spark.sql(ForEach_Log_Processing)

Execute SQL Task

SSIS:

Execute SQL Task
SQL Statement:
TRUNCATE TABLE staging.customer_temp;
INSERT INTO staging.customer_temp
SELECT * FROM staging.customer_stage WHERE process_date >= ?;
Parameter Mapping:
User::MaxProcessDate → Parameter 0

Databricks:

V_MaxProcessDate = f'2024-01-01'

Execute_SQL_Truncate = f"""TRUNCATE TABLE staging.customer_temp"""
spark.sql(Execute_SQL_Truncate)

Execute_SQL_Insert = f"""
INSERT INTO staging.customer_temp
SELECT * FROM staging.customer_stage
WHERE process_date >= '{V_MaxProcessDate}'
"""
spark.sql(Execute_SQL_Insert)

Script Component Conversion

Script Task (Control Flow)

SSIS (C#):

public void Main()
{
string sourceFolder = Dts.Variables["User::SourceFolder"].Value.ToString();
int fileCount = Directory.GetFiles(sourceFolder, "*.csv").Length;
Dts.Variables["User::FileCount"].Value = fileCount;
Dts.TaskResult = (int)ScriptResults.Success;
}

Databricks (Python):

V_SourceFolder = f'/mnt/source/'
file_list = dbutils.fs.ls(V_SourceFolder)
V_FileCount = str(len([f for f in file_list if f.path.endswith('.csv')]))

Script Component (Data Flow)

SSIS (C#):

public override void Input0_ProcessInputRow(Input0Buffer Row)
{
if (Row.Amount > 1000) {
Row.PriorityFlag = "HIGH";
Row.DiscountRate = 0.15;
} else {
Row.PriorityFlag = "NORMAL";
Row.DiscountRate = 0.05;
}
Row.ProcessedDate = DateTime.Now;
}

Databricks:

Script_Component_Custom_Logic = f"""
SELECT
*,
CASE WHEN Amount > 1000 THEN 'HIGH' ELSE 'NORMAL' END AS PriorityFlag,
CASE WHEN Amount > 1000 THEN 0.15 ELSE 0.05 END AS DiscountRate,
CURRENT_TIMESTAMP() AS ProcessedDate
FROM source_table
"""
Script_Component_Custom_Logic = spark.sql(Script_Component_Custom_Logic)
Script_Component_Custom_Logic.createOrReplaceTempView('Script_Component_Custom_Logic')

Complete Package Example

SSIS Package: LoadCustomerData.dtsx

Control Flow:

  1. Execute SQL Task: Get Destination Table Name
  2. Execute SQL Task: Truncate Staging Table
  3. Execute SQL Task: Load Customer Data
  4. Execute SQL Task: Update Control Table

Converted Databricks Notebook:

# Databricks notebook source

ProcessName = f'CustomerETL'

# COMMAND ----------
# Get Destination Table Name
Package_Get_Destination_Table_Name = f"""
DECLARE VARIABLE V_TblNm STRING;
CALL aud.spGetPackageDesTbl({ProcessName}, V_TblNm);
SELECT V_TblNm AS DestinationTable;
"""
spark.sql(Package_Get_Destination_Table_Name)

# COMMAND ----------
# Truncate Staging Table
spark.sql(f"""TRUNCATE TABLE staging.customers;""")

# COMMAND ----------
# Load Customer Data
Package_Load_Customer_Data = f"""
INSERT INTO staging.customers (CustomerID, CustomerName, Email, Status, ProcessDate)
SELECT CustomerID, CustomerName, Email, Status, current_timestamp() AS ProcessDate
FROM source.customers
WHERE Status = 'Active';
"""
spark.sql(Package_Load_Customer_Data)

# COMMAND ----------
# Update Control Table
Package_Update_Control_Table = f"""
INSERT INTO control.load_log (process_name, rows_processed, process_date)
SELECT {ProcessName} AS process_name, COUNT(*) AS rows_processed, current_timestamp() AS process_date
FROM dw.customers
WHERE ProcessDate >= CAST(current_timestamp() AS DATE);
"""
spark.sql(Package_Update_Control_Table)