Live Log Streaming (Pipes)
dagster-rocky 0.4 ships RockyResource.run_streaming() — a
Pipes-style alternative to RockyResource.run() that spawns the binary
via subprocess.Popen, forwards rocky’s stderr (where the engine’s
Rust tracing layer writes info!() / warn!() macros) to
context.log.info line-by-line as the run progresses, and parses the
final stdout JSON into a RunResult after the subprocess exits.
This unblocks live progress visibility for long-running pipelines.
Without it, the Dagster run viewer shows nothing for the duration of a
30-minute rocky run and dumps the entire log at the end. With it,
users see each model copy / contract check / drift action as it
happens.
Quickstart
Section titled “Quickstart”import dagster as dgfrom dagster_rocky import RockyResource
rocky = RockyResource(config_path="rocky.toml")
@dg.assetdef my_warehouse_data(context: dg.AssetExecutionContext, rocky: RockyResource): # Use run_streaming so the run viewer streams progress in real time result = rocky.run_streaming(context, filter="tenant=acme") return result.tables_copiedWhen materialized, the Dagster run viewer shows lines like:
[INFO] rocky: INFO discovering 12 sources[INFO] rocky: INFO catalog acme_warehouse created[INFO] rocky: INFO copying acme.orders (15000 rows)[INFO] rocky: INFO copying acme.payments (42000 rows)[INFO] rocky: INFO drift check passed for acme schema[INFO] rocky: INFO run complete in 18000ms— each line forwarded as the engine emits it, not at the end.
API parity with run()
Section titled “API parity with run()”run_streaming accepts the same keyword arguments as run():
result = rocky.run_streaming( context, filter="tenant=acme", governance_override={"workspace_ids": [12345]}, run_models=True, partition="2026-04-08", lookback=2, parallel=4,)The first positional argument is the Dagster execution context (an
AssetExecutionContext from a @multi_asset or an OpExecutionContext
from a @op). All the partition selection flags from
Phase 3 partitions work identically.
Automatic wiring in RockyComponent
Section titled “Automatic wiring in RockyComponent”When you use RockyComponent, the component already calls
run_streaming by default — every multi-asset materialization gets
live log streaming for free. No configuration needed:
defs = dg.Definitions( assets=[RockyComponent(config_path="rocky.toml")],)Inside the component’s asset factory (_make_rocky_asset), the
_run_filters helper passes the execution context through to
run_streaming for every filter pass. Users see progress in the run
viewer as the materialization runs.
Failure handling
Section titled “Failure handling”run_streaming matches run()’s failure semantics:
| Outcome | Behavior |
|---|---|
| Success (exit 0) | Returns the parsed RunResult |
Partial success (exit ≠0, stdout starts with {) | Returns the parsed RunResult (Rocky’s partial-success contract) |
| Hard failure (exit ≠0, no JSON) | Raises dg.Failure with the last 20 stderr lines in the metadata |
| Binary missing | Raises dg.Failure with installation instructions |
| Subprocess timeout | Kills the process, joins the reader thread, raises dg.Failure with the configured timeout in the message and the stderr tail |
The stderr_tail metadata on failures captures the actual progress
lines the engine emitted before crashing — much more useful for
debugging than a bare exit code.
How it works under the hood
Section titled “How it works under the hood”+-------------------+ +----------------------+| Dagster context | | rocky subprocess || | | || context.log <----+---<<<---+ stderr (line-buffered)|| | | || buffer <---+---<<<---+ stdout (JSON output) |+-------------------+ +----------------------+ | | | v | exit code | | v | parse RunResult <------+-----<<<----+ | (after communicate)subprocess.Popenspawns rocky withstdout=PIPE,stderr=PIPE,bufsize=1(line-buffered).- A daemon reader thread reads
stderrline-by-line and forwards each non-empty line tocontext.log.infowith arocky:prefix. - The main thread blocks on
proc.communicate(timeout=...)to capture the stdout and wait for exit. - After the subprocess exits, the reader thread joins (with a 2-second grace period for any in-flight lines).
- If exit is clean or partial-success, the captured stdout is parsed
into a
RunResult.
Three execution modes
Section titled “Three execution modes”RockyResource ships three ways to run rocky:
run() | run_streaming() | run_pipes() | |
|---|---|---|---|
| Live log streaming | ❌ buffered | ✅ stderr forwarding | ✅ via Pipes protocol |
Structured MaterializationEvent from Pipes | ❌ | ❌ | ✅ |
| Returns | RunResult | RunResult | PipesClientCompletedInvocation |
| Needs Dagster context | no | yes | yes |
| Engine Pipes support required | no | no | yes (shipped in v0.4) |
run() — buffered (non-Dagster callers)
Section titled “run() — buffered (non-Dagster callers)”result = rocky.run(filter="tenant=acme")For scripts, tests, notebooks, or any code that just wants the typed
result without a Dagster context. Buffered via subprocess.run.
run_streaming() — Pipes-style (live progress, batch result)
Section titled “run_streaming() — Pipes-style (live progress, batch result)”@dg.assetdef my_asset(context, rocky: RockyResource): result = rocky.run_streaming(context, filter="tenant=acme") return result.tables_copiedSpawns rocky via subprocess.Popen, forwards stderr line-by-line to
context.log.info for live progress, parses the final stdout JSON into
a RunResult after the subprocess exits. Doesn’t depend on Pipes
message emission — works against any rocky binary.
run_pipes() — full Dagster Pipes (structured events)
Section titled “run_pipes() — full Dagster Pipes (structured events)”@dg.assetdef my_asset(context: dg.AssetExecutionContext, rocky: RockyResource): yield from rocky.run_pipes(context, filter="tenant=acme").get_results()Spawns rocky via dg.PipesSubprocessClient
which sets DAGSTER_PIPES_CONTEXT and DAGSTER_PIPES_MESSAGES env vars.
The rocky engine (v0.4+) detects these and emits structured Pipes
messages on the messages channel:
report_asset_materializationper copied table — appears as aMaterializationEventin the run viewer with structured metadata (strategy, duration_ms, rows_copied, target_table_full_name, sql_hash, partition_key)report_asset_checkper Rocky check — appears asAssetCheckEvaluationin the run viewerlogevents for run start, completion, and drift actions
Returns a PipesClientCompletedInvocation. Call .get_results() to
extract the materialization events Dagster constructed from the Pipes
messages.
This is the canonical Dagster Pipes integration pattern.
Engine-side: Dagster Pipes message emission
Section titled “Engine-side: Dagster Pipes message emission”The engine half of T2 (committed in ef08cae) adds a hand-rolled
Dagster Pipes protocol module at
engine/crates/rocky-cli/src/pipes.rs that:
- Detects
DAGSTER_PIPES_CONTEXTandDAGSTER_PIPES_MESSAGESenv vars at the start ofrocky run. - Opens the messages channel (file path or stderr stream) per the protocol params.
- Emits one JSON-line message per progress event:
logat run start and completionreport_asset_materializationperoutput.materializationsentryreport_asset_checkperoutput.check_resultsentrylogat WARN level peroutput.drift.actions_takenentryclosedat run end
- When env vars are not set, the entire path is a no-op — zero overhead for non-Dagster callers.
The current engine emission is batch at end of run (events emit right before the JSON output payload, not as each table completes). This gives Dagster the structured events without requiring the larger refactor to thread the emitter through the async parallel execution path. Future v0.5 work can upgrade to per-event streaming with no wire-protocol or consumer changes.
RockyComponent default
Section titled “RockyComponent default”RockyComponent users get run_streaming automatically — no decision
needed. To get full Pipes integration with structured events, switch
to a hand-rolled @dg.asset that calls rocky.run_pipes() directly.
A future RockyComponent flag (pipes_mode=True) can flip the default
once we’ve shaken out the integration in the wild.