Freshness Policies
dagster-rocky 0.4 maps Rocky’s freshness configuration onto Dagster’s
FreshnessPolicy
so the Dagster UI surfaces stale-data badges and the declarative-automation
freshness conditions trigger correctly.
Two sources of freshness are supported, with per-model overriding pipeline-level:
- Pipeline-level: from
[checks.freshness]inrocky.toml. Applies to every source-replication asset by default. - Per-model: from
[freshness]in a model’s TOML frontmatter. Overrides the pipeline-level default for the matching model.
Pipeline-level freshness
Section titled “Pipeline-level freshness”Add a [checks.freshness] block to rocky.toml:
[checks]enabled = true
[checks.freshness]threshold_seconds = 86400 # 24 hoursThe RockyComponent reads discover.checks.freshness automatically and attaches
a matching FreshnessPolicy to every source-replication asset. No additional
configuration needed:
import dagster as dgfrom dagster_rocky import RockyComponent
defs = dg.Definitions( assets=[RockyComponent(config_path="rocky.toml")],)The functional API works the same way:
from dagster_rocky import RockyResource, load_rocky_assets
rocky = RockyResource(config_path="rocky.toml")specs = load_rocky_assets(rocky)# specs[*].freshness_policy is set to FreshnessPolicy.time_window(fail_window=24h)Per-model freshness
Section titled “Per-model freshness”Declare freshness in a model’s TOML frontmatter (or sidecar .toml file):
name = "fct_daily_orders"depends_on = ["stg_orders"]
[strategy]type = "incremental"timestamp_column = "updated_at"
[target]catalog = "warehouse"schema = "marts"table = "fct_daily_orders"
[freshness]max_lag_seconds = 3600 # 1 hour — overrides the pipeline defaultThe compiler emits this in the JSON output for rocky compile (via the new
models_detail field), and RockyComponent reads it at load time. When a
source-replication table name matches a compiled model name, the per-model
policy wins.
Helper functions
Section titled “Helper functions”Both helpers are pure functions you can use to attach freshness policies to
hand-rolled assets without going through RockyComponent:
freshness_policy_from_checks(checks)
Section titled “freshness_policy_from_checks(checks)”Builds a FreshnessPolicy from a ChecksConfig (the projection of
[checks.freshness]). Returns None when freshness is not configured.
from dagster_rocky import freshness_policy_from_checks, RockyResource
rocky = RockyResource(config_path="rocky.toml")result = rocky.discover()policy = freshness_policy_from_checks(result.checks)# Use policy on AssetSpec.freshness_policyfreshness_policy_from_model(freshness)
Section titled “freshness_policy_from_model(freshness)”Builds a FreshnessPolicy from a ModelFreshnessConfig (the projection of a
model’s [freshness] frontmatter). Returns None when not configured.
from dagster_rocky import freshness_policy_from_modelfrom dagster_rocky.types import ModelFreshnessConfig
policy = freshness_policy_from_model(ModelFreshnessConfig(max_lag_seconds=3600))per_model_freshness_policies(compile_result)
Section titled “per_model_freshness_policies(compile_result)”Indexes freshness_policy_from_model by model name across an entire compile
result. Models without [freshness] are absent from the dict, so callers can
use .get(model_name) to fall back to the pipeline-level default.
from dagster_rocky import RockyResource, per_model_freshness_policies
rocky = RockyResource(config_path="rocky.toml", models_dir="models")compile_result = rocky.compile()policies = per_model_freshness_policies(compile_result)# {"fct_daily_orders": <FreshnessPolicy>, ...}API choice: FreshnessPolicy.time_window
Section titled “API choice: FreshnessPolicy.time_window”dagster-rocky uses Dagster 1.12+‘s
FreshnessPolicy.time_window(fail_window=...)
constructor — not the legacy FreshnessPolicy(maximum_lag_minutes=...)
which is deprecated.
This means:
- Comparisons in tests need
policy.fail_window.to_timedelta().total_seconds()— Dagster wraps thetimedeltain aSerializableTimeDeltathat doesn’t compare-equal to a plaintimedelta. - The check shows up under “Freshness” in the asset detail page with the
fail_windowvalue.