Microsoft SSIS to Databricks
Conversion information
- Transpiler: BladeBridge
- Available target: Databricks notebooks (experimental)
Supported SSIS Versions
- SQL Server 2012, 2014, 2016, 2017, 2019, 2022
- Azure Data Factory SSIS Integration Runtime
Input Requirements
Export your SSIS packages as DTSX files:
- Solution Export: Export from Visual Studio / SQL Server Data Tools (SSDT)
- File System Packages: Direct DTSX file access
- SSISDB Export: Extract from SSIS catalog
Export from SSISDB:
-- Extract package from SSISDB catalog
DECLARE @packageData VARBINARY(MAX)
SELECT @packageData = [packagedata]
FROM [SSISDB].[catalog].[packages]
WHERE [name] = 'YourPackageName'
-- Save to file system for conversion
Components
Control Flow (Orchestration) - Supported
| SSIS Component | Microsoft Name | Spark Equivalent | Notes |
|---|---|---|---|
| Data Flow Task | Microsoft.DataFlowTask | Spark SQL temp views | Core data transformations |
| Execute SQL Task | Microsoft.ExecuteSQLTask | Spark SQL statements | SQL execution with parameter mapping |
| Execute Package Task | Microsoft.ExecutePackageTask | dbutils.notebook.run() | Nested notebook execution |
| File System Task | Microsoft.FileSystemTask | dbutils.fs commands | File operations (copy, move, delete, rename) |
| Script Task | Microsoft.ScriptTask | Python code with SQL | C#/VB.NET scripts converted to Python/SQL |
| For Loop Container | STOCK:FORLOOP | SQL iteration pattern | Iteration with counter |
| Foreach Loop Container | STOCK:FOREACHLOOP | SQL wildcard file reading | File/folder iteration |
| Sequence Container | STOCK:SEQUENCE | Function or notebook section | Grouping tasks |
| Execute Process Task | Microsoft.ExecuteProcess | subprocess.run() | External process execution |
| Extensible File Task | ExtensibleFileTask | dbutils.fs commands | Extended file operations |
Control Flow (Orchestration) - Unsupported
The following Control Flow components are not supported and would require manual conversion or alternative approaches:
| SSIS Component | Microsoft Name | Reason |
|---|---|---|
| Analysis Services Execute DDL | Microsoft.AnalysisServicesExecuteDDLTask | SSAS-specific, requires manual migration to Delta/SQL |
| Analysis Services Processing | Microsoft.AnalysisServicesProcessingTask | SSAS-specific, requires manual migration to Delta/SQL |
| Bulk Insert Task | Microsoft.BulkInsertTask | Use Data Flow Task with JDBC/Delta instead |
| Data Profiling Task | Microsoft.DataProfilingTask | Use Databricks Data Profile UI or custom profiling code |
| FTP Task | Microsoft.FTPTask | Implement using Python ftplib or dbutils.fs |
| Message Queue Task | Microsoft.MessageQueueTask | Requires custom Kafka/Event Hub integration |
| Send Mail Task | Microsoft.SendMailTask | Implement using Databricks job notifications or Python smtplib |
| Web Service Task | Microsoft.WebServiceTask | Implement using Python requests library |
| WMI Data Reader Task | Microsoft.WMIDataReaderTask | Windows-specific, requires alternative monitoring solution |
| XML Task | Microsoft.XMLTask | Implement using Python xml.etree or PySpark XML functions |
Components like Script Task contain C# or VB.NET code bodies that cannot be automatically converted. These require manual code translation from C#/VB to Python. The converter will preserve the logic structure but the actual implementation must be rewritten.
Data Flow (Transformation) - Supported
Sources
| SSIS Component | Microsoft Name | Spark Equivalent | Notes |
|---|---|---|---|
| OLE DB Source | Microsoft.OLEDBSource | spark.read.format("jdbc") | Database reads via JDBC with PySpark |
| Flat File Source | Microsoft.FlatFileSource | spark.read.csv() or SQL csv.\path`` | CSV, delimited, fixed-width files |
| Excel Source | Microsoft.ExcelSource | spark.read.format("excel") | Excel file reads |
| Raw File Source | Microsoft.RawSource | spark.read.format("parquet") | SSIS raw files converted to Parquet |
Transformations
| SSIS Component | Microsoft Name | Spark Equivalent | Notes |
|---|---|---|---|
| Aggregate | Microsoft.Aggregate | GROUP BY with aggregation | Sum, count, avg, min, max operations |
| Audit | Microsoft.Audit | Add columns in SELECT | Add audit columns (timestamp, user, etc.) |
| Cache Transform | Microsoft.Cache | Temp views or CTEs | Cache data for lookup operations |
| Character Map | Microsoft.CharacterMap | String functions | String transformations (upper, lower, etc.) |
| Conditional Split | Microsoft.ConditionalSplit | Multiple WHERE clauses | Route rows by conditions |
| Copy Column | Microsoft.CopyColumn | Column in SELECT | Duplicate columns |
| Data Conversion | Microsoft.DataConvert | CAST() | Type conversions |
| Derived Column | Microsoft.DerivedColumn | Calculated columns in SELECT | Column transformations and calculations |
| Lookup | Microsoft.Lookup | LEFT JOIN | Reference data lookup with caching |
| Merge | Microsoft.Merge | UNION | Merge sorted datasets |
| Merge Join | Microsoft.MergeJoin | JOIN | Sorted input joins |
| Multicast | Microsoft.Multicast | Temp view reuse | Send data to multiple outputs |
| OLE DB Command | Microsoft.OLEDBCommand | Row-by-row SQL execution | Row-by-row SQL execution |
| Percentage Sampling | Microsoft.PercentageSampling | TABLESAMPLE | Statistical sampling |
| Pivot | Microsoft.Pivot | PIVOT clause | Pivot operations |
| Row Count | Microsoft.RowCount | COUNT(*) | Count rows and store in variable |
| Script Component | Microsoft.SqlServer.Dts.Pipeline.ScriptComponentHost | SQL UDFs or CASE statements | Custom transformations |
| Slowly Changing Dimension | Microsoft.SCD | MERGE with Delta Lake | SCD Type 1, 2, 3 patterns |
| Sort | Microsoft.Sort | ORDER BY | Sorting operations |
| Union All | Microsoft.UnionAll | UNION ALL | Combine multiple datasets |
| Unpivot | Microsoft.UnPivot | UNPIVOT or STACK() | Unpivot operations |
Destinations
| SSIS Component | Microsoft Name | Spark Equivalent | Notes |
|---|---|---|---|
| OLE DB Destination | Microsoft.OLEDBDestination | INSERT INTO SQL or Delta Lake | Database writes via SQL statements |
| Flat File Destination | Microsoft.FlatFileDestination | df.write.csv() or SQL INSERT | CSV and delimited file writes |
| Excel Destination | Microsoft.ExcelDestination | df.write.format('excel') | Excel file writes |
| Raw File Destination | Microsoft.RawDestination | df.write.format('parquet') | SSIS raw files converted to Parquet |
Data Flow (Transformation) - Unsupported
The following Data Flow components are not supported and would require manual conversion:
| SSIS Component | Microsoft Name | Reason |
|---|---|---|
| Export Column | Microsoft.ExportColumn | Implement using custom UDF with file writing |
| Import Column | Microsoft.ImportColumn | Implement using custom UDF with file reading |
The Script Component often contains C# or VB.NET code bodies with complex row-by-row processing logic. While the converter identifies these components, the actual C#/VB code cannot be automatically converted.
Manual conversion required:
- Analyze the C#/VB logic
- Rewrite as SQL UDFs or CASE statements
- Test thoroughly as row-by-row logic may need redesign for set-based SQL processing
Data Flow Conversion
Derived Column Transformation
SSIS Derived Column:
Derived Column Transformation
Columns:
FullName = FirstName + " " + LastName
AgeGroup = Age < 30 ? "Young" : (Age < 60 ? "Middle" : "Senior")
ProcessDate = GETDATE()
Converted Spark SQL:
# Processing node Package\Data Flow Task\Derived Column, type DERIVED_COLUMN
# component nameDerived Column
Derived_Column_Add_Columns = f"""
SELECT
*,
CONCAT(FirstName, ' ', LastName) AS FullName,
CASE
WHEN Age < 30 THEN 'Young'
WHEN Age < 60 THEN 'Middle'
ELSE 'Senior'
END AS AgeGroup,
CURRENT_TIMESTAMP() AS ProcessDate
FROM source_table
"""
Derived_Column_Add_Columns = spark.sql(Derived_Column_Add_Columns)
Derived_Column_Add_Columns.createOrReplaceTempView('Derived_Column_Add_Columns')
Conditional Split
SSIS Conditional Split:
Conditional Split Transformation
Outputs:
HighValue: Amount > 1000
MediumValue: Amount > 100 AND Amount <= 1000
LowValue: Default Output
Converted Spark SQL:
# Processing node Package\Data Flow Task\Conditional Split, type CONDITIONAL_SPLIT
# component nameConditional Split
# High Value output
Conditional_Split_High_Value = f"""
SELECT * FROM source_table
WHERE Amount > 1000
"""
Conditional_Split_High_Value = spark.sql(Conditional_Split_High_Value)
Conditional_Split_High_Value.createOrReplaceTempView('Conditional_Split_High_Value')
# Medium Value output
Conditional_Split_Medium_Value = f"""
SELECT * FROM source_table
WHERE Amount > 100 AND Amount <= 1000
"""
Conditional_Split_Medium_Value = spark.sql(Conditional_Split_Medium_Value)
Conditional_Split_Medium_Value.createOrReplaceTempView('Conditional_Split_Medium_Value')
# Low Value output (default)
Conditional_Split_Low_Value = f"""
SELECT * FROM source_table
WHERE Amount <= 100
"""
Conditional_Split_Low_Value = spark.sql(Conditional_Split_Low_Value)
Conditional_Split_Low_Value.createOrReplaceTempView('Conditional_Split_Low_Value')
Lookup Transformation
SSIS Lookup:
Lookup Transformation
Reference Table: DimCustomerType
Join Columns: CustomerTypeID = TypeID
Lookup Columns: TypeName, TypeDescription
Cache Mode: Full Cache
Converted Spark SQL:
# Processing node Package\Data Flow Task\Lookup Customer Type, type LOOKUP
# component nameLookup Customer Type
Lookup_Customer_Type = f"""
SELECT
src.*,
ref.TypeName,
ref.TypeDescription
FROM source_table src
LEFT JOIN dim.customer_type ref
ON src.CustomerTypeID = ref.TypeID
"""
Lookup_Customer_Type = spark.sql(Lookup_Customer_Type)
Lookup_Customer_Type.createOrReplaceTempView('Lookup_Customer_Type')
Variables and Expressions
SSIS Variables
SSIS Package Variables:
Variables:
User::MaxProcessDate (DateTime)
User::RowCount (Int32)
User::SourceFolder (String)
Converted Spark SQL (with Python variables):
# SSIS Package Variables converted to Python variables
V_MaxProcessDate = f'2024-01-01'
V_SourceFolder = f'/mnt/source/'
# Use in SQL transformations via spark.sql()
Incremental_Data_Query = f"""
SELECT *
FROM source_table
WHERE ProcessDate > '{V_MaxProcessDate}'
"""
spark.sql(Incremental_Data_Query)
SSIS Expressions
SSIS Expression Language:
File Connection Manager Expression:
@[User::SourceFolder] + "customers_" +
(DT_WSTR, 8) DATEPART("yyyy", GETDATE()) +
RIGHT("0" + (DT_WSTR, 2) DATEPART("mm", GETDATE()), 2) + ".csv"
Converted Spark SQL:
from datetime import datetime
# SSIS Variables
V_SourceFolder = f'/mnt/source/'
# Build dynamic file path
current_date = datetime.now()
year = current_date.strftime("%Y")
month = current_date.strftime("%m")
V_FilePath = f"{V_SourceFolder}customers_{year}{month}.csv"
# Read from dynamic path using spark.sql()
Read_Customers_Data = f"""SELECT * FROM csv.`{V_FilePath}`"""
Read_Customers_Data = spark.sql(Read_Customers_Data)
Read_Customers_Data.createOrReplaceTempView('Read_Customers_Data')
Control Flow Patterns
ForEach Loop Container
SSIS ForEach Loop:
ForEach Loop Container (File Enumerator)
Folder: C:\Data\Input\
Files: *.csv
Tasks:
- Data Flow: Process each file
- Execute SQL: Log processing
Converted Spark SQL:
# SSIS Variables
V_InputFolder = f'/mnt/data/input/'
# Process multiple CSV files using wildcard pattern
ForEach_Read_All_Files = f"""
SELECT
*,
input_file_name() AS source_file,
CURRENT_TIMESTAMP() AS process_date
FROM csv.`{V_InputFolder}*.csv`
"""
ForEach_Read_All_Files = spark.sql(ForEach_Read_All_Files)
ForEach_Read_All_Files.createOrReplaceTempView('ForEach_Read_All_Files')
# Insert into target table
ForEach_Insert_Customer_Data = f"""
INSERT INTO staging.customer_data
SELECT * FROM ForEach_Read_All_Files
"""
spark.sql(ForEach_Insert_Customer_Data)
# Log processing
ForEach_Log_Processing = f"""
INSERT INTO logs.processing_log
SELECT
source_file AS file_path,
COUNT(*) AS row_count,
MAX(process_date) AS process_time
FROM ForEach_Read_All_Files
GROUP BY source_file
"""
spark.sql(ForEach_Log_Processing)
Execute SQL Task
SSIS Execute SQL Task:
Execute SQL Task
Connection: DW_Connection
SQL Statement:
TRUNCATE TABLE staging.customer_temp;
INSERT INTO staging.customer_temp
SELECT * FROM staging.customer_stage
WHERE process_date >= ?;
Parameter Mapping:
Variable: User::MaxProcessDate → Parameter 0
Converted Spark SQL:
# SSIS Variable
V_MaxProcessDate = f'2024-01-01'
# Processing node Package\Execute SQL Task, type EXECUTE_SQL
# component nameExecute SQL Task
Execute_SQL_Truncate = f"""TRUNCATE TABLE staging.customer_temp"""
spark.sql(Execute_SQL_Truncate)
Execute_SQL_Insert = f"""
INSERT INTO staging.customer_temp
SELECT * FROM staging.customer_stage
WHERE process_date >= '{V_MaxProcessDate}'
"""
spark.sql(Execute_SQL_Insert)
Script Component Conversion
Script Task (Control Flow)
SSIS Script Task (C#):
public void Main()
{
string sourceFolder = Dts.Variables["User::SourceFolder"].Value.ToString();
DateTime processDate = DateTime.Now;
int fileCount = Directory.GetFiles(sourceFolder, "*.csv").Length;
Dts.Variables["User::FileCount"].Value = fileCount;
Dts.TaskResult = (int)ScriptResults.Success;
}
Converted Python (with Spark SQL):
from datetime import datetime
# SSIS Variables
V_SourceFolder = f'/mnt/source/'
V_ProcessDate = datetime.now()
# Count files (requires Python/dbutils for file system operations)
file_list = dbutils.fs.ls(V_SourceFolder)
V_FileCount = str(len([f for f in file_list if f.path.endswith('.csv')]))
Script Component (Data Flow)
SSIS Script Component (transformation):
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
// Custom business logic
if (Row.Amount > 1000)
{
Row.PriorityFlag = "HIGH";
Row.DiscountRate = 0.15;
}
else
{
Row.PriorityFlag = "NORMAL";
Row.DiscountRate = 0.05;
}
Row.ProcessedDate = DateTime.Now;
}
Converted Spark SQL:
# Processing node Package\Data Flow Task\Script Component, type SCRIPT_COMPONENT
# component nameScript Component
Script_Component_Custom_Logic = f"""
SELECT
*,
CASE
WHEN Amount > 1000 THEN 'HIGH'
ELSE 'NORMAL'
END AS PriorityFlag,
CASE
WHEN Amount > 1000 THEN 0.15
ELSE 0.05
END AS DiscountRate,
CURRENT_TIMESTAMP() AS ProcessedDate
FROM source_table
"""
Script_Component_Custom_Logic = spark.sql(Script_Component_Custom_Logic)
Script_Component_Custom_Logic.createOrReplaceTempView('Script_Component_Custom_Logic')
Package Configurations
Connection Managers
SSIS Connection Managers:
<ConnectionManagers>
<ConnectionManager>
<Name>SourceDB</Name>
<ConnectionString>
Data Source=source-server;Initial Catalog=SourceDB;
Integrated Security=True;
</ConnectionString>
</ConnectionManager>
</ConnectionManagers>
Converted Databricks:
# Processing node Package\Data Flow Task\OLE DB Source, type SOURCE
# component nameOLE DB Source - SourceDB Connection
Connection_Manager_SourceDB = spark.read \
.format("jdbc") \
.option("url", "source-db-url") \
.option("dbtable", "dbo.Customers") \
.option("user", "source-db-user") \
.option("password", "source-db-password") \
.load()
Connection_Manager_SourceDB.createOrReplaceTempView('Connection_Manager_SourceDB')
Conversion Example
Complete SSIS Package
SSIS Package: LoadCustomerData.dtsx (download sample)
Control Flow:
- Execute SQL Task: Get Destination Table Name or File Name (with ProcessName parameter)
- Execute SQL Task: Truncate Staging Table
- Execute SQL Task: Load Customer Data
- Execute SQL Task: Update Control Table (with ProcessName parameter)
Parameters:
- ProcessName (String): "CustomerETL"
Converted Spark SQL Notebook:
# Databricks notebook source
ProcessName = f'CustomerETL'
# COMMAND ----------
# Processing node Package\Get Destination Table Name or File Name, type EXECUTE_SQL
# component nameGet Destination Table Name or File Name
# input parameters :
# ProcessName
# output parameters :
Package_Get_Destination_Table_Name_or_File_Name = f"""DECLARE VARIABLE V_TblNm STRING;
call aud.spGetPackageDesTbl( {ProcessName}, V_TblNm );
SELECT V_TblNm AS DestinationTable;"""
spark.sql(Package_Get_Destination_Table_Name_or_File_Name)
# COMMAND ----------
# Processing node Package\Truncate Staging Table, type EXECUTE_SQL
# component nameTruncate Staging Table
# input parameters :
# output parameters :
Package_Truncate_Staging_Table = f"""TRUNCATE TABLE staging.customers;"""
spark.sql(Package_Truncate_Staging_Table)
# COMMAND ----------
# Processing node Package\Load Customer Data, type EXECUTE_SQL
# component nameLoad Customer Data
# input parameters :
# output parameters :
Package_Load_Customer_Data = f"""INSERT INTO staging.customers
(CustomerID, CustomerName, Email, Status, ProcessDate)
SELECT
CustomerID,
CustomerName,
Email,
Status,
current_timestamp() AS ProcessDate
FROM source.customers
WHERE Status = 'Active';"""
spark.sql(Package_Load_Customer_Data)
# COMMAND ----------
# Processing node Package\Update Control Table, type EXECUTE_SQL
# component nameUpdate Control Table
# input parameters :
# ProcessName
# output parameters :
Package_Update_Control_Table = f"""INSERT INTO control.load_log
(process_name, rows_processed, process_date)
SELECT
{ProcessName} AS process_name,
COUNT(*) AS rows_processed,
current_timestamp() AS process_date
FROM dw.customers
WHERE ProcessDate >= CAST(current_timestamp() AS DATE);"""
spark.sql(Package_Update_Control_Table)
Next Steps
- Export SSIS packages to DTSX files
- Run conversion using Lakebridge CLI:
databricks labs lakebridge transpile \
--source-dialect ssis \
--input-source /path/to/ssis/packages \
--output-folder /output/sparksql \
--target-technology sparksql - Review generated notebooks for conversion warnings
- Configure Databricks secrets for connection strings
- Test with sample data in Databricks
- Deploy workflows to production
For more information, see: