IBM DataStage to Databricks
Conversion Information
- Transpiler: BladeBridge
- Available targets: Databricks notebooks (SparkSQL, PySpark)
Supported DataStage Versions
- Version 8 onwards - The DataStage XML export format has remained consistent since version 8
Input Requirements
Export your DataStage jobs as XML files (DataStage XML export format):
- Designer Client Export: Use "Export DataStage Components" from the repository
- Command Line Export: Use
dsexportcommand-line utility - Repository Export: Export from the DataStage Administration Console
The converter does not support DSX files. You must export DataStage jobs in XML format only. DSX is an older binary format that cannot be processed by the converter.
If you attempt to use DSX files, the conversion will fail. Always use the XML export option when exporting from DataStage.
What Gets Converted
| DataStage Object | Databricks Equivalent | Notes |
|---|---|---|
| Parallel Job | Databricks Notebook | Data transformation logic |
| Sequence Job | Databricks Workflow JSON | Orchestration and task dependencies |
| Shared Container | Reusable Python function | Encapsulated transformation logic |
| Parameters | Notebook widgets (dbutils.widgets) | Job parameterization |
Stages - Supported
The converter supports the following DataStage stages:
Data Processing Stages
| Transformational Stage | Stage Type | Spark Equivalent | Notes |
|---|---|---|---|
| Aggregator | PxAggregator | GROUP BY with aggregation | Sum, count, avg, min, max operations |
| Change Capture | PxChangeCapture | CDC logic | Detect insert/update/delete operations |
| Checksum | PxChecksum | md5(concat_ws(...)) | Generate row checksums |
| Column Export | PxColumnExport | String concatenation | Combine columns into delimited string |
| Column Import | PxColumnImport | XML/JSON parsing | Extract data from complex columns |
| Copy | PxCopy | DataFrame replication | Replicate data to multiple outputs |
| Difference | PxDifference | Set difference | Compare two datasets |
| Filter | PxFilter | WHERE clause | Row filtering |
| Funnel | PxFunnel | UNION ALL | Combine multiple inputs |
| Join | PxJoin | JOIN | Inner, left, right, full outer joins |
| Lookup | PxLookup | LEFT JOIN | Reference data lookup |
| Merge | PxMerge | JOIN with update logic | Merge datasets with matching |
| Modify | PxModify | Column transformations | Column renaming, dropping, reordering |
| Peek | PxPeek | Row logging | Log sample rows for debugging |
| Pivot | PxPivot | PIVOT / UNPIVOT | Rows to columns or columns to rows |
| Remove Duplicates | PxRemDup | dropDuplicates() | Deduplicate rows based on keys |
| Row Generator | PxRowGenerator | Generated rows | Create test data rows |
| Sort | PxSort | ORDER BY | Sorting operations |
| Surrogate Key Generator | PxSurrogateKeyGenerator | NextSurrogateKey() | Generate unique keys |
| Switch | PxSwitch | CASE WHEN routing | Route rows based on conditions |
| Tail | PxTail | LIMIT from end | Return last N rows |
| Transformer | TransformerStage | SELECT with transformations | Core data transformation logic |
Source/Target Connectors
| DataStage Connector | System Type | Notes |
|---|---|---|
| Amazon S3 | AMAZON_S3 | AWS S3 storage |
| DB2 Connector | DB2 | IBM DB2 database |
| Dataset | FLATFILE | DataStage dataset files |
| File Connector | FLATFILE | Generic file access |
| HDFS (BDFS) | FLATFILE | Hadoop file system |
| Hive Connector | HIVE | Hive metastore tables |
| Netezza Connector | NETEZZA | Netezza appliance |
| Oracle Connector | ORACLE | Oracle database |
| Sequential File | FLATFILE | CSV/delimited files |
| Snowflake Connector | SNOWFLAKE | Snowflake cloud DW |
| SQL Server (ODBC) | MSSQL | Microsoft SQL Server |
| Teradata Connector | TERADATA | Teradata database |
The actual Spark code generated for sources and targets is configurable. See Source and Target Code Generation in Advanced Configuration.
Orchestration Stages (Sequence Jobs)
| DataStage Stage | Stage Type | Databricks Equivalent | Notes |
|---|---|---|---|
| Job Activity | JSJobActivity | notebook_task | Execute parallel job |
| Exec Command | JSExecCmdActivity | spark_python_task | Execute shell command |
| Sequencer | JSSequencer | depends_on | Synchronization point |
| Terminator | JSTerminatorActivity | Job termination | End workflow execution |
| Routine Activity | JSRoutineActivity | Python function call | Execute DataStage routine |
| Mail Activity | JSMailActivity | Email notification | Send notification emails |
| Condition | JSCondition | run_if | Conditional execution |
| Start Loop | JSStartLoopActivity | Loop iteration | Begin loop construct |
| End Loop | JSEndLoopActivity | Loop termination | End loop construct |
| Exception Handler | JSExceptionHandler | Error handling | Exception handling logic |
| User Variables | JSUserVarsActivity | Widget assignment | Set runtime variables |
DataStage Routines
DataStage routines (custom functions written in BASIC or other languages) are not converted to Databricks. This is by design—routines typically perform DataStage-specific operations such as monitoring job statuses, checking job execution states, managing DataStage server resources, or other platform-specific administrative tasks that have no direct equivalent in Databricks.
Since these routines are tightly coupled to the DataStage runtime environment, they are generally not candidates for migration. However, the routine body (source code) is captured in the analyzer output, allowing you to review the logic and determine if any business functionality needs to be reimplemented in Python or SparkSQL.
Sources and Targets
The converter translates DataStage connector configurations to appropriate Spark read/write operations:
Database Sources
DataStage Oracle Source:
Oracle Connector
Connection: DW_ORACLE
Table: DWH.EGIDIMFIL
SQL: SELECT * FROM DWH.EGIDIMFIL WHERE status = 'ACTIVE'
Converted SparkSQL:
# Component DSLink2, Type SOURCE Original node name S_FIL, link DSLink2
DSLink2 = spark.sql(f"""SELECT
SRKFIL, CODFIL, DESFIL, DATCADFIL
FROM DWH.EGIDIMFIL
WHERE status = 'ACTIVE'
""")
DSLink2.createOrReplaceTempView("DSLink2")
File Sources
DataStage Sequential File:
Sequential File Stage
File: /data/input/customers.csv
Format: Delimited (comma)
First Line Column Names: Yes
Converted PySpark:
# Component SRC_FILE, Type SOURCE Original node name SEQ_CUSTOMERS, link SRC_FILE
SRC_FILE = spark.read.format('csv').option('header', 'true').option('delimiter', ',').load('/data/input/customers.csv')
# Conforming field names to the component layout
SRC_FILE_conformed_cols = ["CUSTOMER_ID", "CUSTOMER_NAME", "EMAIL", "CREATED_DATE"]
SRC_FILE = DatabricksConversionSupplements.conform_df_columns(SRC_FILE, SRC_FILE_conformed_cols)
The converter generates column conforming logic to ensure DataFrame column names match the expected component layout. This handles cases where source file columns may have different names, casing, or ordering than what downstream transformations expect.
Helper Library (databricks_conversion_supplements.py):
This file is generated as part of each conversion run and contains utility functions used by the converted notebooks. The content is static—it does not change from run to run, so you only need to deploy it once. Take the file from your first conversion output and deploy it to your Databricks workspace where the converted notebooks can import it.
class DatabricksConversionSupplements:
@staticmethod
def conform_df_columns(df: DataFrame, new_col_names: list) -> DataFrame:
# Preserves sys_row_id and source_record_id if present
if 'sys_row_id' in df.columns and 'sys_row_id' not in new_col_names:
new_col_names.append('sys_row_id')
if 'source_record_id' in df.columns and 'source_record_id' not in new_col_names:
new_col_names.append('source_record_id')
return df.withColumnsRenamed(dict(zip(df.columns, new_col_names)))
Sequence Job Conversion
DataStage sequence jobs are converted to Databricks Workflow JSON definitions that orchestrate the execution of notebooks and tasks.
Sequence Job Components
| DataStage Component | Databricks Workflow Equivalent |
|---|---|
| Job Activity (run parallel job) | notebook_task |
| Exec Command Activity | spark_python_task |
| Sequencer (sync point) | depends_on array |
| Condition | run_if property |
| Mail Activity | Email notifications |
| Terminator | Workflow termination |
Example Workflow JSON
{
"name": "SEQX_ETL_DAILY_LOAD",
"tasks": [
{
"task_key": "Extract_Data",
"notebook_task": {
"notebook_path": "/Workspace/Jobs/PXJ_EXTRACT_DATA",
"source": "WORKSPACE"
}
},
{
"task_key": "Transform_Data",
"depends_on": [
{ "task_key": "Extract_Data" }
],
"run_if": "ALL_SUCCESS",
"notebook_task": {
"notebook_path": "/Workspace/Jobs/PXJ_TRANSFORM_DATA",
"source": "WORKSPACE"
}
},
{
"task_key": "Send_Notification",
"depends_on": [
{ "task_key": "Transform_Data" }
],
"spark_python_task": {
"python_file": "/Workspace/Jobs/SHELL_SEND_EMAIL.py"
}
}
]
}
Conversion Examples
SparkSQL Output
The SparkSQL output uses spark.sql() calls with SQL statements and temporary views for data flow.
DataStage Transformer with Join:
# Databricks notebook source
# Code converted on 2025-09-02 18:22:19
import os
from pyspark.sql import *
from pyspark.sql.functions import *
# COMMAND ----------
# Component DSLink11, Type TRANSFORMATION Original node name Transformer_8, link DSLink11
DSLink11 = spark.sql(f"""
SELECT
trim(T1.IDNO) as IDNO,
IF(trim(T1.STATUS) = 'Y', 'Active', 'Inactive') as STATUS_DESC,
IF(trim(T1.FLAG) = '1', 'Y', 'N') as PROCESSED_FLAG
FROM T1
""")
DSLink11.createOrReplaceTempView("DSLink11")
# COMMAND ----------
# Component DSLink49_CONV2, Type JOINER Original node name Join_1, link DSLink49_CONV2
DSLink49_CONV2 = spark.sql(f"""
SELECT
DSLink11.IDNO as IDNO,
DSLink17.AMOUNT as AMOUNT,
DSLink11.STATUS_DESC as STATUS_DESC,
DSLink11.PROCESSED_FLAG as PROCESSED_FLAG
FROM DSLink11
INNER JOIN DSLink17 ON DSLink11.IDNO = DSLink17.IDNO
""")
DSLink49_CONV2.createOrReplaceTempView("DSLink49_CONV2")
# COMMAND ----------
# Component TARGET_TABLE, Type INSERT
spark.sql(f"""INSERT INTO TARGET_SCHEMA.TARGET_TABLE
(IDNO, AMOUNT, STATUS_DESC, PROCESSED_FLAG)
SELECT IDNO, AMOUNT, STATUS_DESC, PROCESSED_FLAG
FROM DSLink49_CONV2
""")
PySpark Output
The PySpark output uses DataFrame API with method chaining.
DataStage Checksum and Deduplication:
# Databricks notebook source
# Code converted on 2025-09-30 09:01:47
import os
from pyspark.sql import *
from pyspark.sql.functions import *
# COMMAND ----------
# Processing node DSLink4, type SOURCE
DSLink4 = spark.sql(f"""SELECT SRKFIL, CODFIL, DESFIL, DATCADFIL FROM DIMFIL""")
# COMMAND ----------
# Processing node DSLink5, type CHECKSUM
DSLink5 = DSLink4.select(
DSLink4.SRKFIL.alias('SRKFIL'),
DSLink4.CODFIL.alias('CODFIL'),
md5(concat_ws('|', DSLink4.DESFIL, DSLink4.DATCADFIL)).alias('CurrentChecksum')
)
# COMMAND ----------
# Processing node DSLink3, type UNIQUE_ROW (Remove Duplicates)
DSLink3 = DSLink2.dropDuplicates(['CODFIL'])
# COMMAND ----------
# Processing node Matched, type MERGE
Matched = DSLink3.join(
DSLink5,
[DSLink5.CODFIL == DSLink3.CODFIL],
'LEFT_OUTER'
).select(
DSLink3.CODFIL.alias('CODFIL'),
DSLink3.DESFIL.alias('DESFIL'),
DSLink5.SRKFIL.alias('SRKFIL'),
DSLink5.CurrentChecksum.alias('CurrentChecksum')
)
# COMMAND ----------
# Processing node Updates, type TRANSFORMATION with filter
Updates = DSLink7.filter("CurrentChecksum <> UpdatesChecksum").select(
DSLink7.SRKFIL.alias('SRKFIL'),
DSLink7.CODFIL.alias('CODFIL'),
DSLink7.DESFIL.alias('DESFIL')
)
Pivot Stage Example
DataStage Pivot (Rows to Columns):
# COMMAND ----------
# Component DSLink83, Type PIVOT_ROWS_TO_COLUMNS Original node name Pivot_Enterprise_6
DSLink83 = spark.sql(f"""
WITH DSLink83_numbered_rows AS (
SELECT
IDNO, SOURCE, VALUE,
ROW_NUMBER() OVER (PARTITION BY IDNO ORDER BY SOURCE) AS rn
FROM DSLink49_CONV1
)
SELECT
IDNO,
MAX(CASE WHEN rn = 1 THEN SOURCE END) AS SOURCE_1,
MAX(CASE WHEN rn = 1 THEN VALUE END) AS VALUE_1,
MAX(CASE WHEN rn = 2 THEN SOURCE END) AS SOURCE_2,
MAX(CASE WHEN rn = 2 THEN VALUE END) AS VALUE_2
FROM DSLink83_numbered_rows
GROUP BY IDNO
""")
DSLink83.createOrReplaceTempView("DSLink83")
Advanced Configuration
Configuration Inheritance
The converter uses a hierarchy of JSON configuration files:
SparkSQL Output:
base_datastage2databricks_sparksql.json
└── inherits from: base_datastage2databricks.json
└── inherits from: base_etl2databricks_defs.json
PySpark Output:
base_datastage2databricks_pyspark.json
└── inherits from: base_datastage2databricks.json
└── inherits from: base_etl2databricks_defs.json
Configuration Files
| Setting | File | Purpose |
|---|---|---|
etl_converter_config_file | datastage2deltalake.json | ETL-level expressions and function mappings |
| Stage translation | ds2dws.cfg | DataStage stage type to generic type mapping |
Custom Configuration Override
Use the --transpiler-config-path switch to provide custom configuration overrides:
databricks labs lakebridge transpile \
--source-dialect datastage \
--input-source /path/to/datastage/exports \
--output-folder /output/sparksql \
--target-technology sparksql
Function Translation
Common DataStage functions are automatically translated:
| DataStage Function | Spark Equivalent |
|---|---|
DSJobStartDate | current_timestamp() |
DownCase(str) | LOWER(str) |
UpCase(str) | UPPER(str) |
Oconv(date, fmt) | date_format(date, fmt) |
Trim(str) | TRIM(str) |
Left(str, n) | LEFT(str, n) |
Right(str, n) | RIGHT(str, n) |
Len(str) | LENGTH(str) |
Substr(str, pos, len) | SUBSTRING(str, pos, len) |
IIF(cond, t, f) | IF(cond, t, f) |
IsNull(val) | ISNULL(val) |
NullToValue(val, def) | NVL(val, def) or COALESCE(val, def) |
Source and Target Code Generation
The code generated for source and target connectors is fully configurable through JSON configuration files. This allows customization of how data is read from sources and written to targets.
System Type Classification
The system_type_class setting maps database/connector types to a system class:
"system_type_class": {
"ORACLE": "RELATIONAL",
"DB2": "RELATIONAL",
"MSSQL": "RELATIONAL",
"TERADATA": "RELATIONAL",
"HIVE": "RELATIONAL",
"FLATFILE": "FILE_DELIMITED",
"DEFAULT": "RELATIONAL"
}
Reader/Writer Command Templates
The commands section defines templates for generating reader and writer code based on system class:
"commands": {
"READER_RELATIONAL": "spark.sql(f\"\"\"%TABLE_NAME%\"\"\")",
"READER_FILE_DELIMITED": "spark.read.format('csv').option('header', '%HEADER%').load(f'''%PATH%''')",
"WRITER_RELATIONAL": "%DF%.write.saveAsTable('%TABLE_NAME%', mode = 'append')",
"WRITER_FILE_DELIMITED": "%DF%.write.format('csv').option('header','%HEADER%').mode('overwrite').csv(f'''%PATH%''')"
}
Template Tokens
Command templates use placeholder tokens that are replaced at conversion time:
| Token | Description |
|---|---|
%NODE_NAME% | Name of the current component/stage |
%DF% | DataFrame variable name (same as node name) |
%TABLE_NAME% | Name of the table referenced in the node metadata; depending on the node, could be source or target table |
%PATH% | File path for flat files |
%FILENAME% | File directory |
%DELIMITER% | Field delimiter for CSV |
%HEADER% | 'true' or 'false' for CSV header row |
%SQL% | Raw SQL content |
%COLUMN_LIST% | Comma-separated column names |
SparkSQL Hybrid Approach
When generating SparkSQL output, the converter uses a hybrid approach:
- Relational sources/targets: Generate SparkSQL via
spark.sql() - Flat file sources/targets: Generate PySpark code (e.g.,
spark.read.csv())
This is controlled by the force_pyspark_output_by_node_types setting in base_datastage2databricks_sparksql.json:
"force_pyspark_output_by_node_types": ["SOURCE|FLATFILE", "TARGET|FLATFILE", "INSERT|FLATFILE"]
When a node matches this list, the converter uses the pyspark_config.commands section instead of generating SparkSQL. This is necessary because SparkSQL does not have native file I/O syntax.
Troubleshooting
DSX File Format Error
Problem: Conversion fails with parsing errors when using DSX files.
Solution: Export your DataStage jobs in XML format, not DSX:
- In DataStage Designer, use "Export DataStage Components" and select XML format
- Use command-line
dsexportwith XML output option - DSX is an older binary format that is not supported
Missing Job Dependencies
Problem: Converted workflow references notebooks that don't exist.
Solution:
- Ensure all parallel jobs referenced by sequence jobs are included in the export
- Export entire project or folder to capture all dependencies
- Check that shared containers are exported along with jobs that use them
Parameter Set Resolution
Problem: Parameters appear as unresolved placeholders like $PROJDEF.
Solution:
- Export parameter sets along with jobs
- Review the generated
dbutils.widgetsdeclarations and provide default values - Update placeholders in converted code with actual values or Databricks secrets
Next Steps
- Export DataStage jobs as XML files (not DSX)
- Run conversion using Lakebridge CLI:
databricks labs lakebridge transpile \
--source-dialect datastage \
--input-source /path/to/datastage/exports \
--output-folder /output/sparksql \
--target-technology sparksql - Review generated notebooks for conversion warnings
- Configure Databricks secrets for connection strings
- Deploy workflow JSON for sequence job orchestration
- Test with sample data in Databricks
For more information, see: