Motivation
Connecting to Databricks SQL Warehouses typically involves using one of the methods below:
Method | Advantages | Disadvantages |
---|---|---|
ODBC | Works with various authentication mechanisms, robust, acceptable performance. |
Heavy install that can be difficult to install on corporate devices. Very slow with larger write operations. |
JDBC | Not sure to be honest. |
Slower than ODBC. Requires Java. Very slow with larger write operations. |
Statement Execution API | Straightforward API that can handle data of all sizes. | No direct support for write operations. |
Databricks SQL Connector ( databricks-sql-connector ) |
Convenient to work with and good performance. | It’s in Python, requires reticulate. |
That leaves a void for something that is:
Native to R
Easy to install
Acceptable performance (or better)
Handles bulk uploads efficiently
Supports OAuth U2M (no tokens!)
brickster now provides both DBI and dbplyr backends for working with Databricks SQL warehouses.
It uses the Statement Execution API in combination with the Files API (the latter specifically for large data uploads).
Connecting to a Warehouse
Connecting to a Databricks SQL warehouse requires creating a DBI connection:
{DBI}
Backend
Reading Data
Execute SQL directly and return results as data frames:
trips <- dbGetQuery(con, "SELECT * FROM samples.nyctaxi.trips LIMIT 10")
Tables can be references either via Id()
,
I()
, or using the name as-is:
# List available tables
tables <- dbListTables(con)
# Check if specific table exists
dbExistsTable(con, "samples.nyctaxi.trips")
dbExistsTable(con, I("samples.nyctaxi.trips"))
dbExistsTable(con, Id("samples", "nyctaxi", "trips"))
# Get column information
dbListFields(con, "samples.nyctaxi.trips")
dbListFields(con, I("samples.nyctaxi.trips"))
dbListFields(con, Id("samples", "nyctaxi", "trips"))
Writing Data
When writing data (append, overwrite, etc) there are two possible behaviours:
- In-line SQL statement write
- Stage
.parquet
files to a Volume directory andCOPY INTO
orCTAS
For data larger than 50k rows brickster will only
permit the use of method (2), which requires staging_volume
to be specified when establishing the connection, or directly to
dbWriteQuery
.
Ensure that staging_volume
is a valid Volume path and
you have permission to write files.
# small data (150 rows)
# generates an in-line CTAS
dbWriteTable(
conn = con,
name = Id(catalog = "<catalog>", schema = "<schema>", table = "<table>"),
value = iris,
overwrite = TRUE
)
# bigger data (4 million rows)
# writes parquet files to volume then CTAS
iris_big <- sample_n(iris, replace = TRUE, size = 4000000)
dbWriteTable(
conn = con,
name = Id(catalog = "<catalog>", schema = "<schema>", table = "<table>"),
value = iris_big,
overwrite = TRUE,
staging_volume = "/Volumes/<catalog>/<schema>/<volume>/..." # or inherited from connection
progress = TRUE
)
{dbplyr}
Backend
Reading Data
As in the DBI backend tables can be referenced either
via Id()
, I()
, or using the name as-is in
tbl()
:
# Connect to existing tables
tbl(con, "samples.nyctaxi.trips")
tbl(con, I("samples.nyctaxi.trips"))
tbl(con, Id("samples", "nyctaxi", "trips"))
tbl(con, in_catalog("samples", "nyctaxi", "trips"))
Chain dplyr operations - they execute remotely on Databricks:
# Filter and select (translated to SQL)
long_trips <- tbl(con, "samples.nyctaxi.trips") |>
filter(trip_distance > 10) |>
select(
tpep_pickup_datetime,
tpep_dropoff_datetime,
trip_distance,
fare_amount
)
# View the generated SQL (without executing)
show_query(long_trips)
# Execute and collect results
long_trips |> collect()
As a general reminder, call collect()
at the latest
point possible in your analysis to take reduce the required computation
locally.
# Customer summary statistics
trips_summary <- tbl(con, "samples.nyctaxi.trips") |>
group_by(pickup_zip) %>%
summarise(
trip_count = n(),
total_fare_amount = sum(fare_amount, na.rm = TRUE),
total_trip_distance = sum(trip_distance, na.rm = TRUE),
avg_fare_amount = mean(fare_amount, na.rm = TRUE)
) |>
arrange(desc(avg_fare_amount))
# Execute to get the 20 most expensive pickip zip codes with more than 30 trips
top_zipz <- trips_summary |>
filter(trip_count > 20) |>
head(20) |>
collect()
Writing Data
A key difference is that temporary tables are not supported - this
makes functions like copy_to
only usable when specifying
temporary
as FALSE
which will use
dbWriteTable
to create a table.
iris_remote <- copy_to(con, iris, "iris_table", temporary = FALSE, overwrite = TRUE)
Connection Management
The connection for the DatabricksSQL
driver is different
to other DBI backends as it doesn’t have a persistent
session (it’s all just API calls). This means calling
dbDisconnect
serves no purpose when it comes to freeing
resources on the SQL warehouse.