Skip to content

RockyResource

RockyResource is a dagster.ConfigurableResource that invokes the Rocky CLI via subprocess and parses JSON output into strongly-typed Pydantic models. It exposes one Python method per Rocky CLI command.

FieldTypeDefaultDescription
binary_pathstr"rocky"Path to the rocky binary. Accepts an absolute path, a relative path, or just "rocky" to resolve from PATH. For deployment, point this at a vendored binary (e.g. "vendor/rocky").
config_pathstr"rocky.toml"Path to the pipeline config file.
state_pathstr".rocky-state.redb"Path to the state store file.
models_dirstr"models"Path to the directory containing .rocky model files. Used by compile, lineage, test, ci, ai_sync, ai_explain, and ai_test.
contracts_dirstr | NoneNoneOptional directory containing contract files. Passed to compile, test, and ci when set.
server_urlstr | NoneNoneOptional URL for a running rocky serve instance. When set, compile(), lineage(), and metrics() use the HTTP API instead of spawning a subprocess.
timeout_secondsint3600Subprocess timeout for any single CLI invocation (in seconds).
  • All methods return strongly-typed Pydantic models (see Type Reference).
  • Default subprocess timeout is 3600 seconds (1 hour).
  • On CLI failure, raises dagster.Failure with stderr attached as metadata.
  • If the binary is not found on PATH, raises Failure with a link to the installation instructions.
  • Partial success: Rocky can exit non-zero but still emit valid JSON (e.g., when some tables succeed and others fail). Methods like run(), compile(), test(), and ci() handle this automatically, returning the parsed result so callers can distinguish successes from failures.

Runs rocky discover and returns all discovered sources and their tables.

Wraps: rocky discover --output json

result = rocky.discover()
for source in result.sources:
print(f"{source.id}: {len(source.tables)} tables")

Runs rocky plan and returns the planned SQL statements without executing them.

Wraps: rocky plan --filter <filter> --output json

ParameterTypeDescription
filterstrComponent filter (e.g. "tenant=acme")

run(filter, governance_override=None, *, run_models=False, partition=None, partition_from=None, partition_to=None, latest=False, missing=False, lookback=None, parallel=None) -> RunResult

Section titled “run(filter, governance_override=None, *, run_models=False, partition=None, partition_from=None, partition_to=None, latest=False, missing=False, lookback=None, parallel=None) -> RunResult”

Runs rocky run in buffered mode (subprocess.run) and returns the full execution result including materializations, check results, drift detection, and permission changes.

Wraps: rocky run --filter <filter> --output json

ParameterTypeDefaultDescription
filterstrrequiredComponent filter (e.g. "tenant=acme")
governance_overridedict | NoneNonePer-run governance config (workspace_ids, grants), merged with rocky.toml defaults
run_modelsboolFalseAlso execute compiled models (passes --models and --all)
partitionstr | NoneNoneSingle partition key (e.g. "2026-04-07")
partition_fromstr | NoneNoneLower bound of a partition range (requires partition_to)
partition_tostr | NoneNoneUpper bound of a partition range (requires partition_from)
latestboolFalseRun the partition containing now() (UTC)
missingboolFalseRun partitions missing from the state store
lookbackint | NoneNoneRecompute the previous N partitions in addition to the selected ones
parallelint | NoneNoneRun N partitions concurrently

run_streaming(context, filter, governance_override=None, *, run_models=False, partition=None, partition_from=None, partition_to=None, latest=False, missing=False, lookback=None, parallel=None) -> RunResult

Section titled “run_streaming(context, filter, governance_override=None, *, run_models=False, partition=None, partition_from=None, partition_to=None, latest=False, missing=False, lookback=None, parallel=None) -> RunResult”

Pipes-style rocky run with live stderr streaming to context.log. Same semantics as run() but spawns the binary via subprocess.Popen and forwards Rocky’s stderr (tracing output) to context.log.info line-by-line as the run progresses. Use this from inside a Dagster @multi_asset or @op for runs longer than a few seconds.

Wraps: rocky run --filter <filter> --output json

ParameterTypeDescription
contextAssetExecutionContext | OpExecutionContextDagster execution context for log streaming
filterstrComponent filter
All other parametersSame as run()
@dg.asset
def replicate(context: dg.AssetExecutionContext, rocky: RockyResource):
result = rocky.run_streaming(context, filter="tenant=acme")
return result.tables_copied

run_pipes(context, filter, governance_override=None, *, run_models=False, partition=None, partition_from=None, partition_to=None, latest=False, missing=False, lookback=None, parallel=None, pipes_client=None) -> PipesClientCompletedInvocation

Section titled “run_pipes(context, filter, governance_override=None, *, run_models=False, partition=None, partition_from=None, partition_to=None, latest=False, missing=False, lookback=None, parallel=None, pipes_client=None) -> PipesClientCompletedInvocation”

Full Dagster Pipes execution with structured event streaming. Spawns rocky run via PipesSubprocessClient, which sets the DAGSTER_PIPES_CONTEXT / DAGSTER_PIPES_MESSAGES env vars. The engine emits one Pipes message per materialization, asset check, and log line, so the run viewer gets MaterializationEvent and AssetCheckEvaluation events in real time.

Wraps: rocky run --filter <filter> --output json (via Dagster Pipes protocol)

ParameterTypeDescription
contextAssetExecutionContext | OpExecutionContextDagster execution context
filterstrComponent filter
pipes_clientPipesSubprocessClient | NoneOptional pre-configured Pipes client
All other parametersSame as run()
@dg.asset
def my_warehouse_data(context: dg.AssetExecutionContext, rocky: RockyResource):
yield from rocky.run_pipes(context, filter="tenant=acme").get_results()

resume_run(run_id=None, *, filter="", governance_override=None) -> RunResult

Section titled “resume_run(run_id=None, *, filter="", governance_override=None) -> RunResult”

Resume a failed run from where it left off.

Wraps: rocky run --resume <run_id> or rocky run --resume-latest

ParameterTypeDefaultDescription
run_idstr | NoneNoneSpecific run ID to resume. If None, resumes the latest failed run.
filterstr""Optional filter expression
governance_overridedict | NoneNoneOptional governance overrides

Runs rocky state and returns the current watermark state for all tracked tables.

Wraps: rocky state --output json


compile(model_filter=None) -> CompileResult

Section titled “compile(model_filter=None) -> CompileResult”

Runs rocky compile and returns compiler diagnostics (errors, warnings, info). When server_url is configured, fetches from the HTTP API instead of spawning a subprocess.

Wraps: rocky compile --models <models_dir> --output json or GET /api/v1/compile

ParameterTypeDefaultDescription
model_filterstr | NoneNoneOptional model name to filter diagnostics

lineage(target, column=None) -> ModelLineageResult | ColumnLineageResult

Section titled “lineage(target, column=None) -> ModelLineageResult | ColumnLineageResult”

Runs rocky lineage and returns the dependency graph for a model or a single column trace. When server_url is configured, fetches from the HTTP API instead.

Wraps: rocky lineage --models <models_dir> <target> [--column <column>] --output json or GET /api/v1/models/<target>/lineage[/<column>]

ParameterTypeDefaultDescription
targetstrrequiredModel name (e.g. "customer_orders")
columnstr | NoneNoneOptional column name to trace. When set, returns ColumnLineageResult; otherwise returns ModelLineageResult.

Runs rocky test to execute models locally via DuckDB without warehouse credentials.

Wraps: rocky test --models <models_dir> --output json

ParameterTypeDefaultDescription
model_filterstr | NoneNoneOptional model name to test

Runs rocky ci (compile + test) and returns the combined result.

Wraps: rocky ci --models <models_dir> --output json


Generate a model from a natural-language intent description.

Wraps: rocky ai "<intent>" --format <format> --output json

ParameterTypeDefaultDescription
intentstrrequiredNatural-language description of the desired model
formatstr"rocky"Output format for the generated model

ai_sync(*, apply=False, model=None, with_intent=False) -> AiSyncResult

Section titled “ai_sync(*, apply=False, model=None, with_intent=False) -> AiSyncResult”

Detect schema changes in upstream sources and propose intent-guided model updates.

Wraps: rocky ai-sync --models <models_dir> --output json

ParameterTypeDefaultDescription
applyboolFalseApply proposed changes directly
modelstr | NoneNoneFilter to a specific model
with_intentboolFalseInclude intent metadata in proposals

ai_explain(model=None, *, all=False, save=False) -> AiExplainResult

Section titled “ai_explain(model=None, *, all=False, save=False) -> AiExplainResult”

Generate intent descriptions from existing model code.

Wraps: rocky ai-explain --models <models_dir> --output json

ParameterTypeDefaultDescription
modelstr | NoneNoneSpecific model to explain
allboolFalseExplain all models
saveboolFalseSave generated intents to model files

ai_test(model=None, *, all=False, save=False) -> AiTestResult

Section titled “ai_test(model=None, *, all=False, save=False) -> AiTestResult”

Generate test assertions from model intents.

Wraps: rocky ai-test --models <models_dir> --output json

ParameterTypeDefaultDescription
modelstr | NoneNoneSpecific model to generate tests for
allboolFalseGenerate tests for all models
saveboolFalseSave generated tests to model files

history(model=None, since=None) -> HistoryResult | ModelHistoryResult

Section titled “history(model=None, since=None) -> HistoryResult | ModelHistoryResult”

Retrieve pipeline run history. Returns ModelHistoryResult when filtered to a single model, otherwise returns HistoryResult with all runs.

Wraps: rocky history --output json

ParameterTypeDefaultDescription
modelstr | NoneNoneFilter to a specific model’s execution history
sincestr | NoneNoneDate filter (ISO 8601 or YYYY-MM-DD)

metrics(model, *, trend=False, column=None, alerts=False) -> MetricsResult

Section titled “metrics(model, *, trend=False, column=None, alerts=False) -> MetricsResult”

Retrieve quality metrics for a model. When server_url is configured, fetches from the HTTP API instead.

Wraps: rocky metrics <model> --output json or GET /api/v1/models/<model>/metrics

ParameterTypeDefaultDescription
modelstrrequiredModel name
trendboolFalseShow trend over recent runs
columnstr | NoneNoneFilter null rate trends to a specific column
alertsboolFalseInclude quality alerts

Analyze materialization strategies and return cost optimization recommendations.

Wraps: rocky optimize --output json

ParameterTypeDefaultDescription
modelstr | NoneNoneFilter analysis to a specific model

Run health checks on the Rocky installation and configuration.

Wraps: rocky doctor --output json

validate_migration(dbt_project, rocky_project=None, *, sample_size=None) -> ValidateMigrationResult

Section titled “validate_migration(dbt_project, rocky_project=None, *, sample_size=None) -> ValidateMigrationResult”

Compare a dbt project against a Rocky import to validate migration correctness.

Wraps: rocky validate-migration --dbt-project <path> --output json

ParameterTypeDefaultDescription
dbt_projectstrrequiredPath to the dbt project directory
rocky_projectstr | NoneNonePath to the Rocky project directory
sample_sizeint | NoneNoneNumber of rows to sample for comparison

test_adapter(adapter=None, command=None) -> ConformanceResult

Section titled “test_adapter(adapter=None, command=None) -> ConformanceResult”

Run adapter conformance tests against a warehouse adapter.

Wraps: rocky test-adapter --output json

ParameterTypeDefaultDescription
adapterstr | NoneNoneAdapter to test (e.g. "databricks")
commandstr | NoneNoneSpecific conformance command to run

List all configured hooks. Returns raw stdout (not parsed JSON).

Wraps: rocky hooks list --output json

Fire a test hook event. Returns raw stdout (not parsed JSON).

Wraps: rocky hooks test <event> --output json

ParameterTypeDescription
eventstrHook event to fire

The resource provides three execution modes, all sharing the same partition and governance flag plumbing:

ModeMethodUse case
Bufferedrun()Scripts, tests, notebooks. No Dagster context needed.
Streamingrun_streaming()Long Dagster runs. Live stderr forwarding to context.log.
Pipesrun_pipes()Full Dagster Pipes. Structured MaterializationEvent and AssetCheckEvaluation per table.

Choose run() for simple cases, run_streaming() when you want live log visibility in the Dagster UI, and run_pipes() when you want per-table materialization events.

When server_url is configured, the following methods use the rocky serve HTTP API instead of spawning a subprocess:

  • compile()GET /api/v1/compile
  • lineage()GET /api/v1/models/<target>/lineage[/<column>]
  • metrics()GET /api/v1/models/<model>/metrics

This is useful when a Rocky server is already running (e.g., in a development environment or alongside the LSP).

from dagster_rocky import RockyResource
import dagster as dg
rocky = RockyResource(
binary_path="rocky",
config_path="config/rocky.toml",
state_path=".rocky-state.redb",
models_dir="models",
contracts_dir="contracts",
)
@dg.asset
def replicate(context: dg.AssetExecutionContext, rocky: RockyResource):
result = rocky.run_streaming(context, filter="tenant=acme")
return result.tables_copied
@dg.asset
def compile_check(rocky: RockyResource):
result = rocky.compile()
if result.has_errors:
raise dg.Failure(description=f"{len(result.diagnostics)} compiler errors")
return result.models
@dg.asset
def health(rocky: RockyResource):
result = rocky.doctor()
return result.overall
defs = dg.Definitions(
assets=[replicate, compile_check, health],
resources={"rocky": rocky},
)