Skip to content

Observability — Drift, Anomalies, Optimize

dagster-rocky 0.4 elevates Rocky’s runtime observability signals from log warnings to first-class Dagster primitives:

  • Schema driftdg.AssetObservation events on the asset timeline
  • Row-count anomaliesdg.AssetCheckResult with severity WARN
  • Optimization recommendationsAssetSpec.metadata (load-time)

Drift and anomaly emission is automatic when using RockyComponent — nothing to wire up. Standalone helpers are also exported for users with hand-rolled multi_assets.

When rocky run detects schema drift on a source replication table, the component yields an AssetObservation event for the affected asset with metadata describing the action taken:

Metadata keyTypeDescription
rocky/drift_actiontextThe DDL action (e.g. ALTER ADD COLUMN, DROP+RECREATE)
rocky/drift_reasontextHuman-readable explanation
rocky/drift_tabletextOriginal Rocky table identifier
rocky/drift_tables_checkedintTotal tables inspected for drift this run
rocky/drift_tables_driftedintTotal tables that drifted this run

Why AssetObservation and not AssetCheckResult? Drift is a change, not a pass/fail — observation is the right primitive. It shows up on the asset timeline as a discrete event without affecting check status.

Anomalies as AssetCheckResult (severity WARN)

Section titled “Anomalies as AssetCheckResult (severity WARN)”

When rocky run detects row-count anomalies, the component yields an AssetCheckResult per anomaly with check name row_count_anomaly:

Metadata keyTypeDescription
rocky/current_countintRow count from this run
rocky/baseline_avgfloatHistorical baseline average
rocky/deviation_pctfloat% deviation from baseline
rocky/reasontextHuman-readable anomaly description

The check spec is pre-declared at load time, so the Dagster UI shows the row_count_anomaly slot on every asset before any run. For runs without anomalies, a placeholder check result is emitted automatically.

Severity is WARN (not ERROR) because Rocky’s anomaly detection is a heuristic — a row-count deviation may be legitimate business behavior. Callers who want to treat anomalies as hard failures can post-process the check evaluation events.

If you don’t use RockyComponent, the same emission logic is exposed as pure-function builders:

from dagster_rocky import (
drift_observations,
anomaly_check_results,
ANOMALY_CHECK_NAME,
)
@dg.multi_asset(
specs=[...],
check_specs=[
dg.AssetCheckSpec(name=ANOMALY_CHECK_NAME, asset=...),
],
)
def my_rocky_asset(context, rocky):
result = rocky.run(filter="tenant=acme")
def resolver(table_name):
# Your own table-name → AssetKey mapping
...
yield dg.MaterializeResult(...)
yield from drift_observations(result, key_resolver=resolver)
yield from anomaly_check_results(result, key_resolver=resolver)

The key_resolver callable bridges Rocky’s table-name string identifiers (which may be catalog.schema.table or just table) to Dagster AssetKeys. The default RockyComponent resolver handles dotted-name fallback automatically.

rocky optimize returns per-model strategy recommendations. The optimize_metadata_for_keys helper builds a {AssetKey: metadata} dict that you can merge into AssetSpec.metadata at load time:

from dagster_rocky import optimize_metadata_for_keys, RockyResource
rocky = RockyResource(config_path="rocky.toml")
optimize_result = rocky.optimize()
metadata = optimize_metadata_for_keys(
optimize_result,
model_to_key={
"fct_orders": dg.AssetKey(["acme", "marts", "fct_orders"]),
},
)
# metadata = {AssetKey([...]): {"rocky/current_strategy": ..., ...}}

The metadata fields are namespaced under rocky/ and include current_strategy, recommended_strategy, estimated_monthly_savings, and optimize_reasoning.

build_column_lineage translates a Rocky ModelLineageResult into a Dagster TableColumnLineage ready to attach to MaterializeResult.metadata:

from dagster_rocky import build_column_lineage
result = rocky.lineage(target="fct_orders")
lineage = build_column_lineage(result, model_to_key={
"stg_orders": dg.AssetKey(["staging", "stg_orders"]),
})
yield dg.MaterializeResult(metadata={"dagster/column_lineage": lineage})

The asset detail page renders this as an interactive column-level dependency graph showing which upstream columns feed each output column.