Skip to content

Incremental Processing

Rocky supports multiple incremental processing strategies to avoid reprocessing data that has not changed. The simplest is watermark-based append, where only rows newer than a stored timestamp are copied. More advanced strategies include partition-level checksums and column-level change propagation.

Every model (replication or transformation) declares a materialization strategy:

StrategyBehaviorUse case
FullRefreshCREATE OR REPLACE TABLE ... AS SELECT ...Small tables, schema changes, initial loads
IncrementalINSERT INTO ... SELECT ... WHERE ts > watermarkAppend-only data with a reliable timestamp
MergeMERGE INTO ... USING ... ON key WHEN MATCHED THEN UPDATE WHEN NOT MATCHED THEN INSERTMutable data with a unique key
MaterializedViewCREATE OR REPLACE MATERIALIZED VIEW ... AS SELECT ...Warehouse-managed refresh (Databricks)
DynamicTableCREATE OR REPLACE DYNAMIC TABLE ... TARGET_LAG = '...'Auto-refresh with target lag (Snowflake)
TimeIntervalPer-partition INSERT OVERWRITE with @start_date/@end_date placeholdersTime-series data with partition-level reprocessing

The default incremental strategy for replication. It relies on a monotonically increasing timestamp column (typically _fivetran_synced) to determine which rows are new.

  1. Read watermark. Rocky reads the last stored watermark for the table from the state store. The watermark is keyed by the fully qualified table name (catalog.schema.table).

  2. No watermark (first run). If no watermark exists, Rocky performs a full refresh — it copies all rows from the source. This establishes the baseline.

  3. Watermark exists. Rocky generates an incremental query that filters to rows newer than the watermark:

SELECT *, CAST(NULL AS STRING) AS _loaded_by
FROM source_catalog.source_schema.orders
WHERE _fivetran_synced > TIMESTAMP '2025-03-15T14:30:00Z'
  1. Update watermark. After a successful copy, Rocky writes the new watermark (the maximum timestamp seen in this batch) to the state store. The next run picks up from this point.

Replication strategy and watermark column live on the pipeline:

[pipeline.bronze]
type = "replication"
strategy = "incremental"
timestamp_column = "_fivetran_synced"

The timestamp column must exist in the source table and contain monotonically increasing values. If the source system backfills historical data with old timestamps, those rows will be missed by incremental processing.

For mutable data where rows can be updated after initial insert, the merge strategy uses a unique key to upsert:

[strategy]
type = "merge"
unique_key = ["customer_id"]
update_columns = ["name", "email", "updated_at"]

This generates:

MERGE INTO target_catalog.target_schema.customers AS target
USING (SELECT ... FROM source WHERE ...) AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET
name = source.name,
email = source.email,
updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT *

If update_columns is omitted, all columns are updated on match.

Beyond watermark-based incremental (which only handles appends), Rocky supports partition-level checksums to detect changes in existing rows. This is implemented in the incremental module of rocky-core.

  1. Each partition of a model (e.g., one partition per date) gets a checksum computed from its data (a hash of the partition contents and row count).
  2. On subsequent runs, Rocky compares current checksums against stored checksums from the previous run.
  3. Only partitions with changed checksums are reprocessed. Unchanged partitions are skipped entirely.
Previous run: { "2026-03-28": 0xABCD, "2026-03-29": 0x1234 }
Current run: { "2026-03-28": 0xABCD, "2026-03-29": 0x5678, "2026-03-30": 0x9999 }
Result: Changed: ["2026-03-29", "2026-03-30"]
Unchanged: ["2026-03-28"]

This catches scenarios that watermarks miss: backfills, late-arriving corrections, and retroactive updates to historical data.

The compiler’s semantic graph (see The Rocky Compiler) tracks column-level lineage across the entire DAG. Rocky uses this lineage to skip downstream models that do not depend on any changed columns.

Consider three models:

orders (source) → orders_summary (uses: amount, customer_id)
→ orders_audit (uses: status, updated_at)

If an upstream schema change only affects the status column, Rocky determines:

  • orders_summary does not depend on status — skip
  • orders_audit depends on status — recompute

This is a PropagationDecision: either Recompute or Skip { reason }. The skip reason is logged so you can verify the decision.

Column propagation works with any incremental strategy. It adds a layer of intelligence on top of watermarks or checksums by pruning the DAG execution to only what actually needs to change.

For time-series data that requires partition-level control, the time_interval strategy processes data in discrete time partitions. The model SQL uses @start_date and @end_date placeholders:

SELECT event_date, event_type, COUNT(*) AS event_count
FROM events.page_views
WHERE event_date >= @start_date AND event_date < @end_date
GROUP BY event_date, event_type
[strategy]
type = "time_interval"
time_column = "event_date"
granularity = "day"
lookback = 3
  1. Rocky determines which partitions to process based on CLI flags (--partition, --from/--to, --latest, --missing, --lookback).
  2. For each partition, it substitutes @start_date and @end_date with quoted timestamp literals.
  3. The generated SQL uses INSERT OVERWRITE semantics (atomic on Databricks via Delta, multi-statement transaction on Snowflake).
  4. Per-partition state is tracked in the state store for gap discovery (--missing).
  • Databricks: INSERT INTO <target> REPLACE WHERE <filter> <select> (single atomic statement via Delta)
  • Snowflake: BEGIN; DELETE FROM <target> WHERE <filter>; INSERT INTO <target> <select>; COMMIT; (4 statements)
  • DuckDB: Same shape as Snowflake
Terminal window
rocky run --partition 2026-04-01 # Process one partition
rocky run --from 2026-03-01 --to 2026-04-01 # Date range
rocky run --latest # Most recent partition
rocky run --missing # Discover and fill gaps
rocky run --lookback 7 # Reprocess last N partitions
rocky run --parallel 4 # Parallelize partitions

Rocky falls back to full refresh in two situations:

When the schema drift detector (in rocky-core/drift.rs) finds a type mismatch between source and target columns, it triggers DropAndRecreate. The target table is dropped and rebuilt from scratch. This is necessary because inserting rows with incompatible types would fail at the warehouse level.

Source: orders.amount (DECIMAL(10,2))
Target: orders.amount (STRING)
→ Schema drift detected → DROP TABLE → Full refresh

Removing a table’s watermark from the state store causes Rocky to treat it as a first run:

Terminal window
rocky state delete --table "warehouse.staging.orders"

The next run performs a full refresh for that table, establishing a new baseline watermark.

Watermarks and partition checksums are stored in an embedded key-value store backed by redb. See the State Management page for details on the state store, remote persistence backends, and the state lifecycle.

The state store tracks:

  • Watermarks — last successfully replicated timestamp per table
  • Check history — historical row counts for anomaly detection
  • Run history — metadata about previous runs
  • Partition checksums — per-partition hashes for checksum-based incremental
  • DAG snapshots — previous DAG structure for change detection

All state is scoped per environment. Dev, staging, and prod maintain independent state with no cross-environment coordination.