Skip to content

Configuration

Rocky reads a single rocky.toml file for all configuration. The file uses named adapters ([adapter.NAME]) and named pipelines ([pipeline.NAME]), so a single config can host multiple sources, warehouses, and pipelines side by side.

Rocky applies sensible defaults to minimize boilerplate. Many fields can be omitted:

WhatDefaultWhen to omit
pipeline.type"replication"Always (unless using a different type)
Unnamed [adapter] with a type keyAuto-wraps as adapter.defaultSingle-adapter projects
Pipeline adapter refs"default"When only one adapter is defined
[state]\nbackend = "local""local"Local development (always the default)
auto_create_catalogs / auto_create_schemasfalseWhen you don’t need auto-creation
Model sidecar nameFilename stemWhen file is fct_orders.toml and name is fct_orders
Model sidecar target.tableSame as nameWhen table name matches model name
models/_defaults.tomlN/AProvides directory-level [target] defaults for catalog and schema

Create a models/_defaults.toml to avoid repeating [target] in every model:

models/_defaults.toml
[target]
catalog = "analytics"
schema = "warehouse"

Individual models inherit these defaults and only need to override what differs.

# Define one or more adapter instances by name
[adapter.local]
type = "duckdb"
path = "warehouse.duckdb"
# Define one or more pipelines and reference adapters by name
[pipeline.replication]
type = "replication"
strategy = "full_refresh"
[pipeline.replication.source]
adapter = "local"
[pipeline.replication.source.schema_pattern]
prefix = "raw__"
separator = "__"
components = ["source"]
[pipeline.replication.target]
adapter = "local"
catalog_template = "warehouse"
schema_template = "analytics"
[state]
backend = "local"

The same config can declare additional adapters ([adapter.prod_databricks], [adapter.prod_fivetran]) and additional pipelines, and pipelines select which adapters to use via the adapter = "..." field on source/target.

Environment variables can be referenced anywhere in the config using ${VAR_NAME} syntax. They are substituted at parse time before TOML is evaluated.

[adapter.prod]
type = "databricks"
host = "${DATABRICKS_HOST}"
token = "${DATABRICKS_TOKEN}"

If a referenced variable is not set, Rocky returns a parse error listing the missing variable.

Use ${VAR_NAME:-default} to provide a fallback when a variable is unset or empty:

[state]
backend = "${ROCKY_STATE_BACKEND:-local}"
s3_bucket = "${ROCKY_STATE_BUCKET:-}"

If ROCKY_STATE_BACKEND is not set, it defaults to "local". If ROCKY_STATE_BUCKET is not set, it defaults to an empty string.


Each [adapter.NAME] block defines one adapter instance. The name is arbitrary — pipelines reference adapters by this name. The type field selects which adapter implementation handles the connection.

FieldTypeRequiredDescription
typestringYesAdapter type. One of "databricks", "snowflake", "duckdb", "fivetran", "manual".
retrytableNoRetry policy (see [adapter.NAME.retry]).

The remaining fields depend on the adapter type.

Local in-process execution adapter. Use as a warehouse, source, or both — the same adapter instance can handle discovery and execution because they share the same database.

FieldTypeRequiredDefaultDescription
pathstringNo(in-memory)Path to a persistent DuckDB file. Required when using the same DuckDB adapter for both discovery and execution, so the discovery side sees rows written by the warehouse side.
# In-memory DuckDB
[adapter.local]
type = "duckdb"
# Persistent DuckDB file
[adapter.local]
type = "duckdb"
path = "warehouse.duckdb"

Databricks SQL warehouse adapter. Executes SQL via the Statement Execution REST API and manages Unity Catalog governance.

FieldTypeRequiredDescription
hoststringYesWorkspace hostname (e.g., "workspace.cloud.databricks.com").
http_pathstringYesSQL warehouse HTTP path (e.g., "/sql/1.0/warehouses/abc123").
tokenstringNoPersonal Access Token. Tried first if set.
client_idstringNoOAuth M2M client ID (service principal). Used as fallback when token is not set.
client_secretstringNoOAuth M2M client secret. Required if client_id is set.
timeout_secsintegerNoStatement execution timeout in seconds (default 120). Increase for large full-refresh queries.
[adapter.prod]
type = "databricks"
host = "${DATABRICKS_HOST}"
http_path = "${DATABRICKS_HTTP_PATH}"
token = "${DATABRICKS_TOKEN}"

OAuth M2M instead of PAT:

[adapter.prod]
type = "databricks"
host = "${DATABRICKS_HOST}"
http_path = "${DATABRICKS_HTTP_PATH}"
client_id = "${DATABRICKS_CLIENT_ID}"
client_secret = "${DATABRICKS_CLIENT_SECRET}"

Snowflake warehouse adapter. Supports OAuth, key-pair (RS256 JWT), and password authentication.

FieldTypeRequiredDescription
accountstringYesSnowflake account identifier (e.g., "org-account").
warehousestringYesWarehouse name for query execution.
databasestringNoDefault database.
schemastringNoDefault schema.
rolestringNoRole to assume.
usernamestringNoUsername for key-pair or password auth.
passwordstringNoPassword for password auth.
private_key_pathstringNoPath to PKCS#8 PEM private key for key-pair JWT auth.
oauth_tokenstringNoPre-supplied OAuth token from an IdP.

Authentication priority: OAuth (highest) > Key-pair JWT > Password (lowest).

# Key-pair JWT auth
[adapter.snow]
type = "snowflake"
account = "${SNOWFLAKE_ACCOUNT}"
warehouse = "COMPUTE_WH"
username = "${SNOWFLAKE_USER}"
private_key_path = "${SNOWFLAKE_KEY_PATH}"
# Password auth
[adapter.snow]
type = "snowflake"
account = "${SNOWFLAKE_ACCOUNT}"
warehouse = "COMPUTE_WH"
username = "${SNOWFLAKE_USER}"
password = "${SNOWFLAKE_PASSWORD}"

Fivetran source adapter. Calls the Fivetran REST API to discover connectors and tables. Metadata only — Rocky never moves data through this adapter.

FieldTypeRequiredDescription
destination_idstringYesFivetran destination ID.
api_keystringYesFivetran API key (Basic Auth).
api_secretstringYesFivetran API secret (Basic Auth).
[adapter.fivetran]
type = "fivetran"
destination_id = "${FIVETRAN_DESTINATION_ID}"
api_key = "${FIVETRAN_API_KEY}"
api_secret = "${FIVETRAN_API_SECRET}"

Lets you define source schemas and tables inline in rocky.toml instead of discovering them from an API. Useful for tests and small fixed sources.

Retry policy for transient errors (HTTP 429/503, rate limits, timeouts). Uses exponential backoff with optional jitter.

FieldTypeDefaultDescription
max_retriesinteger3Maximum retry attempts. Set to 0 to disable.
initial_backoff_msinteger1000Initial backoff in milliseconds.
max_backoff_msinteger30000Maximum backoff cap in milliseconds.
backoff_multiplierfloat2.0Multiplier applied after each retry.
jitterbooltrueAdd random jitter to prevent thundering herd.
circuit_breaker_thresholdinteger5Trip after this many consecutive failures. Set to 0 to disable.
[adapter.prod.retry]
max_retries = 5
initial_backoff_ms = 500
max_backoff_ms = 60000

Each [pipeline.NAME] block defines a pipeline. The name is arbitrary — Rocky CLI commands accept --pipeline NAME to select one when multiple are defined.

FieldTypeRequiredDefaultDescription
typestringYesPipeline type. Currently "replication".
strategystringNo"incremental"Replication strategy: "incremental" or "full_refresh".
timestamp_columnstringNo"_fivetran_synced"Watermark column for incremental strategy.
metadata_columnslistNo[]Extra columns to add to copied data (see below).
[pipeline.bronze]
type = "replication"
strategy = "incremental"
timestamp_column = "_fivetran_synced"
metadata_columns = [
{ name = "_loaded_by", type = "STRING", value = "NULL" },
{ name = "_loaded_at", type = "TIMESTAMP", value = "CURRENT_TIMESTAMP()" },
]

The value field is inserted as a SQL expression. Use "NULL" for null values and SQL function calls like "CURRENT_TIMESTAMP()" for computed values.

FieldTypeRequiredDescription
adapterstringYesName of the adapter that owns the source data. Must match a [adapter.NAME] key.
catalogstringNoSource catalog name (used by warehouse-resident sources like Databricks).
[pipeline.bronze.source]
adapter = "fivetran"

Optional override for the adapter that lists schemas/tables. Useful when the source is discovered from one system (e.g., DuckDB) but its data lives somewhere else.

FieldTypeRequiredDescription
adapterstringYesAdapter name to use for discovery.
[pipeline.bronze.source.discovery]
adapter = "fivetran"

If omitted, Rocky uses the source adapter for discovery.

Defines how source schema names are parsed into structured components.

FieldTypeRequiredDescription
prefixstringYesPrefix that identifies managed schemas (e.g., "src__").
separatorstringYesSeparator between components (e.g., "__").
componentslist of stringsYesOrdered list of component names. A trailing "..." marks a component as multi-valued.
[pipeline.bronze.source.schema_pattern]
prefix = "src__"
separator = "__"
components = ["client", "regions...", "connector"]

Given src__acme__us_west__us_east__shopify, this pattern extracts:

ComponentValue
client"acme"
regions["us_west", "us_east"]
connector"shopify"
FieldTypeRequiredDescription
adapterstringYesName of the warehouse adapter. Must match a [adapter.NAME] key.
catalog_templatestringYesTemplate for the target catalog name. Uses {component} placeholders.
schema_templatestringYesTemplate for the target schema name. Uses {component} placeholders.
[pipeline.bronze.target]
adapter = "prod"
catalog_template = "warehouse"
schema_template = "stage__{source}"

Given source=shopify:

TemplateResult
warehousewarehouse (static — no substitution)
stage__{source}stage__shopify

For multi-tenant setups with per-tenant catalogs, use {component} placeholders in catalog_template — see Schema Patterns for the full pattern reference (e.g. catalog_template = "{tenant}_warehouse" with components = ["tenant", "regions...", "source"]).

Catalog/schema lifecycle, tagging, grants, and isolation. These features are implemented against Databricks Unity Catalog APIs and apply only when the target adapter is Databricks.

FieldTypeDefaultDescription
auto_create_catalogsboolfalseCreate target catalogs if they do not exist.
auto_create_schemasboolfalseCreate target schemas if they do not exist.
tagstable{}Tags applied to managed catalogs, schemas, and tables.
grantslist[]Catalog-level grants. Each entry has principal (string) and permissions (list of strings).
schema_grantslist[]Schema-level grants. Same format as grants.
isolationtableWorkspace isolation settings (see below).
[pipeline.bronze.target.governance]
auto_create_catalogs = true
auto_create_schemas = true
[pipeline.bronze.target.governance.tags]
managed_by = "rocky"
environment = "production"
[[pipeline.bronze.target.governance.grants]]
principal = "group:data_engineers"
permissions = ["USE CATALOG", "MANAGE"]
[[pipeline.bronze.target.governance.schema_grants]]
principal = "group:data_engineers"
permissions = ["USE SCHEMA", "SELECT", "MODIFY"]

[pipeline.NAME.target.governance.isolation]

Section titled “[pipeline.NAME.target.governance.isolation]”

Workspace isolation for Databricks Unity Catalog. Binds managed catalogs to specific workspaces and optionally enables isolated mode.

FieldTypeDefaultDescription
enabledboolfalseSet catalog isolation mode to ISOLATED.
workspace_idslist of tables[]Workspace bindings — see below.

Each entry in workspace_ids is a table with two fields:

FieldTypeDefaultDescription
idintegerrequiredDatabricks workspace ID.
binding_typestring"READ_WRITE"Access level: "READ_WRITE" or "READ_ONLY".
[pipeline.bronze.target.governance.isolation]
enabled = true
[[pipeline.bronze.target.governance.isolation.workspace_ids]]
id = 7474656540609532
binding_type = "READ_WRITE"
[[pipeline.bronze.target.governance.isolation.workspace_ids]]
id = 7474647537929812
binding_type = "READ_ONLY"

The binding type maps to the Databricks API values BINDING_TYPE_READ_WRITE and BINDING_TYPE_READ_ONLY.

Post-replication data quality checks.

FieldTypeDefaultDescription
enabledboolfalseMaster switch to enable or disable all checks.
row_countboolfalseCompare row counts between source and target.
column_matchboolfalseVerify source and target have the same column sets.
freshnesstable{ threshold_seconds = N, overrides = { ... } }.
null_ratetable{ columns = [...], threshold = 0.0–1.0, sample_percent = 10 }.
customlist[]Custom SQL checks. Each entry has name, sql, and optional threshold.
anomaly_threshold_pctfloat50.0Row count deviation percentage that triggers an anomaly. Set to 0 to disable.
[pipeline.bronze.checks]
enabled = true
row_count = true
column_match = true
freshness = { threshold_seconds = 86400 }
anomaly_threshold_pct = 50.0

Parallelism and error handling.

FieldTypeDefaultDescription
concurrencyinteger8Maximum number of tables processed in parallel.
fail_fastboolfalseAbort all remaining tables on first error.
error_rate_abort_pctinteger50Abort if error rate exceeds this percentage (0–100). Set to 0 to disable.
table_retriesinteger1Times to retry failed tables after the initial parallel phase. Set to 0 to disable.
[pipeline.bronze.execution]
concurrency = 8
fail_fast = false
error_rate_abort_pct = 50
table_retries = 1

Global state persistence — where Rocky stores watermarks, run history, and checkpoint progress.

FieldTypeDefaultDescription
backendstring"local"Storage backend: "local", "s3", "valkey", or "tiered".
s3_bucketstringS3 bucket name. Required when backend is "s3" or "tiered".
s3_prefixstring"rocky/state/"S3 key prefix for state files.
valkey_urlstringValkey/Redis connection URL. Required when backend is "valkey" or "tiered".
valkey_prefixstring"rocky:state:"Valkey key prefix for state entries.

Local (default):

[state]
backend = "local"

S3 (durable, for ephemeral environments):

[state]
backend = "s3"
s3_bucket = "${ROCKY_STATE_BUCKET}"
s3_prefix = "rocky/state/"

Valkey (low-latency, shared):

[state]
backend = "valkey"
valkey_url = "${VALKEY_URL}"

Tiered (Valkey + S3 fallback):

[state]
backend = "tiered"
valkey_url = "${VALKEY_URL}"
s3_bucket = "${ROCKY_STATE_BUCKET}"

Tiered downloads from Valkey first (fast), falls back to S3 (durable). Uploads to both.


Optional caching configuration. Rocky uses a three-tier cache (memory, Valkey, API) to reduce redundant warehouse calls.

FieldTypeDefaultDescription
valkey_urlstringValkey/Redis URL for distributed caching.
[cache]
valkey_url = "${VALKEY_URL}"

When configured, Rocky caches metadata queries (table descriptions, schema lookups) in Valkey to avoid repeated warehouse API calls across runs. Without Valkey, only in-memory caching is used (effective within a single run).


Cost assumptions used by rocky optimize when recommending materialization strategies.

FieldTypeDefaultDescription
storage_cost_per_gb_monthfloat0.023Storage cost per GB-month.
compute_cost_per_dbufloat0.40Compute cost per DBU.
warehouse_sizestring"Medium"Warehouse size for cost estimation (e.g., "Small", "Medium", "Large").
min_history_runsinteger5Minimum runs before cost recommendations are emitted.
[cost]
storage_cost_per_gb_month = 0.023
compute_cost_per_dbu = 0.40
warehouse_size = "Medium"

A complete Fivetran → Databricks pipeline with governance:

# ──────────────────────────────────────────────────
# Adapters: connections to source and warehouse
# ──────────────────────────────────────────────────
[adapter.fivetran]
type = "fivetran"
destination_id = "${FIVETRAN_DESTINATION_ID}"
api_key = "${FIVETRAN_API_KEY}"
api_secret = "${FIVETRAN_API_SECRET}"
[adapter.prod]
type = "databricks"
host = "${DATABRICKS_HOST}"
http_path = "${DATABRICKS_HTTP_PATH}"
token = "${DATABRICKS_TOKEN}"
# ──────────────────────────────────────────────────
# Pipeline: bronze layer replication
# ──────────────────────────────────────────────────
[pipeline.bronze]
type = "replication"
strategy = "incremental"
timestamp_column = "_fivetran_synced"
metadata_columns = [
{ name = "_loaded_by", type = "STRING", value = "NULL" },
]
[pipeline.bronze.source]
adapter = "fivetran"
[pipeline.bronze.source.schema_pattern]
prefix = "src__"
separator = "__"
components = ["source"]
[pipeline.bronze.target]
adapter = "prod"
catalog_template = "warehouse"
schema_template = "stage__{source}"
[pipeline.bronze.target.governance]
auto_create_catalogs = true
auto_create_schemas = true
[pipeline.bronze.target.governance.tags]
managed_by = "rocky"
environment = "production"
[[pipeline.bronze.target.governance.grants]]
principal = "group:data_engineers"
permissions = ["USE CATALOG", "MANAGE"]
[[pipeline.bronze.target.governance.schema_grants]]
principal = "group:data_engineers"
permissions = ["USE SCHEMA", "SELECT", "MODIFY"]
[pipeline.bronze.target.governance.isolation]
enabled = true
[[pipeline.bronze.target.governance.isolation.workspace_ids]]
id = 123456789
binding_type = "READ_WRITE"
[pipeline.bronze.checks]
enabled = true
row_count = true
column_match = true
freshness = { threshold_seconds = 86400 }
[pipeline.bronze.execution]
concurrency = 8
fail_fast = false
table_retries = 1
# ──────────────────────────────────────────────────
# State: persistence backend for watermarks
# ──────────────────────────────────────────────────
[state]
backend = "${ROCKY_STATE_BACKEND:-local}"
# s3_bucket = "${ROCKY_STATE_BUCKET}"
# valkey_url = "${VALKEY_URL}"

A credential-free DuckDB pipeline (good for examples and tests). Using config inference, many defaults can be omitted:

[adapter.local]
type = "duckdb"
path = "warehouse.duckdb"
[pipeline.demo]
strategy = "full_refresh"
[pipeline.demo.source]
adapter = "local"
[pipeline.demo.source.schema_pattern]
prefix = "raw__"
separator = "__"
components = ["source"]
[pipeline.demo.target]
adapter = "local"
catalog_template = "warehouse"
schema_template = "analytics"
[pipeline.demo.checks]
row_count = true
[pipeline.demo.execution]
concurrency = 4

Note: pipeline.type = "replication" (default), auto_create_catalogs = false (default), auto_create_schemas = false (default), and [state]\nbackend = "local" (default) are all omitted.

With a single adapter, this can be even more minimal using the unnamed adapter shorthand:

[adapter]
type = "duckdb"
path = "warehouse.duckdb"
[pipeline.demo]
strategy = "full_refresh"
[pipeline.demo.source.schema_pattern]
prefix = "raw__"
separator = "__"
components = ["source"]
[pipeline.demo.target]
catalog_template = "warehouse"
schema_template = "analytics"

Configure shell scripts and webhooks to run at pipeline lifecycle events.

# Shell hooks — run a command, pipe JSON context to stdin
[[hook.pipeline_start]]
command = "bash scripts/notify.sh"
timeout_ms = 5000
on_failure = "warn" # abort | warn | ignore
[[hook.materialize_error]]
command = "bash scripts/pagerduty.sh"
on_failure = "ignore"
# Webhooks — HTTP POST with template body
[hook.webhooks.pipeline_complete]
url = "https://hooks.slack.com/services/T.../B.../xxx"
preset = "slack"
secret = "${WEBHOOK_SECRET}"
[hook.webhooks.materialize_error]
url = "https://events.pagerduty.com/v2/enqueue"
preset = "pagerduty"
EventTrigger
pipeline_startPipeline execution begins
discover_completeSource discovery finishes
compile_completeCompilation finishes
pipeline_completePipeline execution succeeds
pipeline_errorPipeline execution fails
before_materializeBefore a table is materialized
after_materializeAfter a table is materialized
materialize_errorTable materialization fails
before_model_runBefore a compiled model runs
after_model_runAfter a compiled model runs
model_errorCompiled model execution fails
check_resultA quality check completes
drift_detectedSchema drift detected
anomaly_detectedRow count anomaly detected
state_syncedState store sync completes
FieldTypeDefaultDescription
commandstringrequiredShell command to execute
timeout_msnumber30000Max execution time in milliseconds
on_failurestring"warn"Behavior on failure: abort, warn, or ignore
envobject{}Extra environment variables
FieldTypeDefaultDescription
urlstringrequiredWebhook endpoint URL
presetstringBuilt-in preset: slack, pagerduty, datadog, teams
methodstring"POST"HTTP method
headersobject{}Additional HTTP headers
body_templatestringMustache-style template ({{event}}, {{model}}, {{error}})
secretstringHMAC-SHA256 signing key
timeout_msnumber5000Request timeout
async_modebooleanfalseFire-and-forget (don’t wait for response)
on_failurestring"warn"Behavior on failure
retry_countnumber0Number of retries
retry_delay_msnumber1000Delay between retries