Databricks¶
yggdrasil.databricks wraps the Databricks SDK with a single entrypoint, DatabricksClient, plus typed result wrappers and Arrow-first conversions.
One client, many services¶
from yggdrasil.databricks import DatabricksClient
c = DatabricksClient(host="https://<workspace>", token="<token>")
| Service | What it covers | First call |
|---|---|---|
c.sql |
Query execution, DDL/DML, result conversion | c.sql.execute("SELECT 1") |
c.warehouses |
Warehouse discovery / start / stop / update | c.warehouses.find_default() |
c.catalogs / c.tables |
Unity Catalog hierarchy + table resources | c.catalogs["main"]["default"]["orders"] |
c.compute |
Cluster lifecycle / runtime selection | c.compute.clusters.all_purpose_cluster(name="etl") |
c.dbfs_path(...) |
DBFS / Volumes / Workspace files | c.dbfs_path("dbfs:/tmp/a.txt") |
c.secrets |
Scope/secret CRUD | c.secrets.create_secret("scope/key", "value") |
c.iam |
Users/groups (workspace or account) | c.iam.users.current_user |
c.genie |
Conversational analytics | c.genie.ask("<space-id>", "weekly revenue") |
c.spark_connect() |
Spark Connect session | spark = c.spark_connect() |
Authentication¶
# PAT
DatabricksClient(host="https://<workspace>", token="<token>")
# OAuth client credentials
DatabricksClient(host="https://<workspace>", client_id="...", client_secret="...")
# Environment-driven (best for local + CI)
DatabricksClient() # reads DATABRICKS_HOST / TOKEN / CONFIG_PROFILE / ACCOUNT_ID / CLUSTER_ID
Reuse the global singleton: DatabricksClient.current().
SQL execution¶
stmt = c.sql.execute("SELECT current_user() AS me")
stmt.to_arrow_table()
stmt.to_pandas()
stmt.to_polars()
stmt.to_spark()
stmt.to_pylist()
End-to-end: write from many engines, read into any¶
import pyarrow as pa
from yggdrasil.databricks import DatabricksClient
from yggdrasil.pandas.lib import pandas as pd
from yggdrasil.polars.lib import polars as pl
c = DatabricksClient(host="https://<workspace>", token="<token>")
sql = c.sql
spark = c.spark_connect()
table = "main.default.demo_ingest"
sql.execute(f"""
CREATE TABLE IF NOT EXISTS {table} (
id BIGINT, source STRING, payload STRING
) USING DELTA
""")
sql.execute(f"DELETE FROM {table}")
# Write from pyarrow
sql.arrow_insert_into(table, pa.table({
"id": [1], "source": ["pyarrow"], "payload": ['{"k":"arrow"}'],
}))
# Write from pandas
sql.insert_into(table, pd.DataFrame([{"id": 2, "source": "pandas", "payload": '{"k":"pandas"}'}]))
# Write from polars
sql.insert_into(table, pl.DataFrame({
"id": [3], "source": ["polars"], "payload": ['{"k":"polars"}'],
}))
# Write from a Spark DataFrame
sdf = spark.createDataFrame([{"id": 4, "source": "pyspark", "payload": '{"k":"spark"}'}])
sql.spark_insert_into(table, sdf)
# Write unstructured rows
sql.insert_into(table, [
{"id": 5, "source": "raw", "payload": "free-form note"},
{"id": 6, "source": "raw", "payload": '{"freeform": [1,2,3]}'},
])
# Read once, project anywhere
stmt = sql.execute(f"SELECT * FROM {table} ORDER BY id")
arrow_table = stmt.to_arrow_table()
pandas_df = stmt.to_pandas()
polars_df = stmt.to_polars()
spark_df = stmt.to_spark(spark=spark)
pylist = stmt.to_pylist()
Files: DBFS, Volumes, Workspace¶
p = c.dbfs_path("/Volumes/main/default/raw/data.parquet")
p.exists(), p.is_file(), p.stat()
p.parent.mkdir(parents=True, exist_ok=True)
p.write_bytes(b"...")
p.read_text()
list(p.parent.ls())
p.rename("/Volumes/main/default/raw/data.archive.parquet")
p.remove()
Temp file workflow:
tmp = c.tmp_path(extension="json", max_lifetime=1800)
tmp.write_text('{"step": "created"}')
print(tmp.exists(), tmp.read_text())
c.clean_tmp_folder()
Secrets¶
c.secrets.create_scope("demo")
c.secrets.create_secret("api-key", "<value>", scope="demo")
# dict-style shortcuts
c.secrets["demo/api-key"] = "rotated"
del c.secrets["demo/api-key"]
Compute¶
clusters = c.compute.clusters
cluster = clusters.create_or_update(cluster_name="etl", num_workers=1)
cluster = clusters.all_purpose_cluster(name="shared-etl")
cluster = clusters.find_cluster("shared-etl")
clusters.latest_spark_version(photon=True, python_version="3.12")
Run code on a cluster:
from yggdrasil.databricks.compute import ExecutionContext
with ExecutionContext(cluster=cluster) as ctx:
print(ctx.execute("print('hello from databricks')"))
Function-level remote dispatch:
from yggdrasil.databricks.compute.remote import databricks_remote_compute
@databricks_remote_compute(cluster_name="shared-etl")
def add(x: int, y: int) -> int:
return x + y
Typed job widgets¶
from dataclasses import dataclass
from yggdrasil.databricks.jobs import NotebookConfig
@dataclass
class IngestConfig(NotebookConfig):
catalog: str = "main"
schema: str = "ingest"
table: str = "events"
dry_run: bool = True
cfg = IngestConfig.from_environment() # in a job run
# cfg = IngestConfig.init_widgets() # in a local notebook
IAM¶
iam = c.iam
iam.users.current_user
iam.users.create("analyst@company.com")
list(iam.users.list(limit=20))
grp = iam.groups.create("data-engineering")
list(iam.groups.list(name="data-engineering", limit=5))
iam.groups.delete(grp)
Genie¶
genie = c.genie
print(genie.ask("<space-id>", "Top 10 customers by revenue"))
conv = genie.start_conversation("<space-id>")
genie.create_message("<space-id>", conv.conversation_id, "Now split by region")
Troubleshooting¶
- 401 / 403 — verify host + token, and whether you need workspace vs account scope.
- Warehouse query issues — make sure a warehouse is running:
c.warehouses.find_default().start(). - Cluster code execution fails — check cluster policy, permissions, runtime version compatibility.
- Path not found — pick the right prefix (
dbfs:/...for DBFS,/Volumes/...for Volumes). - Optional package missing — install the right extra (
ygg[databricks],ygg[data],ygg[bigdata],ygg[http]). - Local integration tests skipped — they require
DATABRICKS_HOST(gated by theintegrationmarker).