Skip to content

Core Pipeline Commands

The core pipeline commands cover the full lifecycle of a Rocky pipeline: project initialization, configuration validation, source discovery, dry-run planning, execution, and state inspection.

These flags apply to all commands and are documented in the CLI Reference.

FlagShortTypeDefaultDescription
--config <PATH>-cPathBufrocky.tomlPath to the pipeline configuration file.
--output <FORMAT>-ostringjsonOutput format: json or table.
--state-path <PATH>PathBuf.rocky-state.redbPath to the embedded state store for watermarks.

Initialize a new Rocky project with starter configuration and directory structure.

Terminal window
rocky init [path]
ArgumentTypeDefaultDescription
pathstring.Directory where the project will be created.

Create a project in the current directory:

Terminal window
rocky init
Created rocky.toml
Created models/
Rocky project initialized.

Create a project in a new directory:

Terminal window
rocky init acme-pipeline
Created acme-pipeline/rocky.toml
Created acme-pipeline/models/
Rocky project initialized in acme-pipeline/

Check the pipeline configuration for correctness without connecting to any external APIs. Returns a non-zero exit code if any check fails.

Terminal window
rocky validate [flags]

No command-specific flags. Uses global flags only.

CheckDescription
TOML syntaxThe config file parses without errors as v2 (named adapters + named pipelines).
AdaptersEach [adapter.NAME] is a recognized type (databricks, snowflake, duckdb, fivetran, manual) with the required fields populated. For Databricks, at least one of token or client_id/client_secret must be set.
PipelinesEach [pipeline.NAME] references existing adapters for source, target, and (optional) discovery, and its schema_pattern parses.
DAG validationIf models/ exists, loads all models and checks for dependency cycles.

Validate the default config:

Terminal window
rocky validate
ok Config syntax valid (v2 format)
ok adapter.fivetran: fivetran
ok adapter.prod: databricks (auth configured)
ok pipeline.bronze: schema pattern parseable
ok pipeline.bronze: replication / incremental -> warehouse / stage__{source}
Validation complete.

Validate a specific config file:

Terminal window
rocky -c pipelines/prod.toml validate
ok Config syntax valid (v2 format)
ok adapter.fivetran: fivetran
!! adapter.prod: no auth configured (token or client_id/secret)
ok pipeline.bronze: schema pattern parseable
ok pipeline.bronze: replication / incremental -> warehouse / stage__{source}
Validation complete.

List available connectors and their tables from the configured source. This is a metadata-only operation — it identifies what schemas and tables exist, but does not move any data.

Terminal window
rocky discover [flags]
FlagTypeDefaultDescription
--pipeline <NAME>stringPipeline name (required if multiple pipelines are defined).

Discover all sources with JSON output:

Terminal window
rocky discover
{
"version": "0.1.0",
"command": "discover",
"sources": [
{
"id": "connector_abc123",
"components": { "tenant": "acme", "regions": ["us_west"], "source": "shopify" },
"source_type": "fivetran",
"last_sync_at": "2026-03-30T10:00:00Z",
"tables": [
{ "name": "orders", "row_count": null },
{ "name": "customers", "row_count": null }
]
},
{
"id": "connector_def456",
"components": { "tenant": "acme", "regions": ["eu_central"], "source": "stripe" },
"source_type": "fivetran",
"last_sync_at": "2026-03-29T22:15:00Z",
"tables": [
{ "name": "charges", "row_count": null },
{ "name": "refunds", "row_count": null }
]
}
]
}

Discover with table output:

Terminal window
rocky -o table discover
connector_id | components | tables
------------------+-------------------------------------+-------
connector_abc123 | acme / us_west / shopify | 12
connector_def456 | acme / eu_central / stripe | 8

Discover a specific pipeline when multiple are defined:

Terminal window
rocky discover --pipeline shopify_us
  • rocky plan — generate SQL from discovered sources
  • rocky run — discover and execute in one step

Generate the SQL statements Rocky would execute without actually running them. Useful for auditing, previewing changes, and CI/CD approval workflows.

Terminal window
rocky plan --filter <key=value> [flags]
FlagTypeDefaultDescription
--filter <key=value>string(required)Filter sources by component value (e.g., client=acme).
--pipeline <NAME>stringPipeline name (required if multiple pipelines are defined).

Plan all SQL for a specific tenant:

Terminal window
rocky plan --filter client=acme
{
"version": "0.1.0",
"command": "plan",
"filter": "client=acme",
"statements": [
{
"purpose": "create_catalog",
"target": "acme_warehouse",
"sql": "CREATE CATALOG IF NOT EXISTS acme_warehouse"
},
{
"purpose": "create_schema",
"target": "acme_warehouse.staging__us_west__shopify",
"sql": "CREATE SCHEMA IF NOT EXISTS acme_warehouse.staging__us_west__shopify"
},
{
"purpose": "incremental_copy",
"target": "acme_warehouse.staging__us_west__shopify.orders",
"sql": "SELECT *, CAST(NULL AS STRING) AS _loaded_by FROM source_catalog.src__acme__us_west__shopify.orders WHERE _fivetran_synced > (SELECT COALESCE(MAX(_fivetran_synced), TIMESTAMP '1970-01-01') FROM acme_warehouse.staging__us_west__shopify.orders)"
}
]
}

Plan with table output and a custom config:

Terminal window
rocky -c pipelines/prod.toml -o table plan --filter client=acme
purpose | target | sql (truncated)
------------------+----------------------------------------------------+--------------------------
create_catalog | acme_warehouse | CREATE CATALOG IF NOT...
create_schema | acme_warehouse.staging__us_west__shopify | CREATE SCHEMA IF NOT...
incremental_copy | acme_warehouse.staging__us_west__shopify.orders | SELECT *, CAST(NULL...
incremental_copy | acme_warehouse.staging__us_west__shopify.customers | SELECT *, CAST(NULL...

Plan for a specific pipeline:

Terminal window
rocky plan --filter client=acme --pipeline shopify_us

Execute the full pipeline end-to-end: discover sources, detect schema drift, create catalogs/schemas, copy data, apply governance, and run quality checks.

Terminal window
rocky run --filter <key=value> [flags]
FlagTypeDefaultDescription
--filter <key=value>string(required)Filter sources by component value (e.g., client=acme).
--pipeline <NAME>stringPipeline name (required if multiple pipelines are defined).
--governance-override <JSON>stringAdditional governance config as inline JSON or @file.json, merged with defaults.
--models <PATH>PathBufModels directory for transformation execution.
--allboolfalseExecute both replication and compiled models.
--resume <RUN_ID>stringResume a specific previous run from its last checkpoint.
--resume-latestboolfalseResume the most recent failed run from its last checkpoint.
--shadowboolfalseRun in shadow mode: write to shadow targets instead of production.
--shadow-suffix <SUFFIX>string_rocky_shadowSuffix appended to table names in shadow mode.
--shadow-schema <NAME>stringOverride schema for shadow tables (mutually exclusive with --shadow-suffix).
  1. Discover — enumerate sources and tables.
  2. Governance setup (sequential, per catalog/schema) — create catalogs, apply tags, bind workspaces, grant permissions, create schemas.
  3. Parallel table processing (up to execution.concurrency) — drift detection, incremental copy, tag application, watermark update.
  4. Batched checks — row count, column match, freshness.
  5. Retry — failed tables retried sequentially (per execution.table_retries).

Run the pipeline for a specific tenant:

Terminal window
rocky run --filter client=acme
{
"version": "0.1.0",
"command": "run",
"filter": "client=acme",
"duration_ms": 45200,
"tables_copied": 20,
"materializations": [
{
"asset_key": ["fivetran", "acme", "us_west", "shopify", "orders"],
"rows_copied": null,
"duration_ms": 2300,
"metadata": { "strategy": "incremental", "watermark": "2026-03-30T10:00:00Z" }
}
],
"check_results": [],
"permissions": { "grants_added": 3, "grants_revoked": 0 },
"drift": { "tables_checked": 20, "tables_drifted": 1, "actions_taken": [] }
}

Run with a governance override file:

Terminal window
rocky run --filter client=acme --governance-override @overrides/acme.json

Run both replication and model transformations:

Terminal window
rocky run --filter client=acme --models models/ --all

Resume the most recent failed run from its last checkpoint:

Terminal window
rocky run --filter client=acme --resume-latest

Run in shadow mode (writes to *_rocky_shadow tables instead of production) so you can compare results before promoting:

Terminal window
rocky run --filter client=acme --shadow
rocky compare --filter client=acme

Display stored watermarks from the embedded state file. Shows every tracked table with its last watermark value and the timestamp it was recorded.

Terminal window
rocky state [flags]

No command-specific flags. Uses global flags only.

Show watermarks with JSON output:

Terminal window
rocky state
{
"version": "0.1.0",
"command": "state",
"watermarks": [
{
"table": "acme_warehouse.staging__us_west__shopify.orders",
"last_value": "2026-03-30T10:00:00Z",
"updated_at": "2026-03-30T10:01:32Z"
},
{
"table": "acme_warehouse.staging__us_west__shopify.customers",
"last_value": "2026-03-30T09:55:00Z",
"updated_at": "2026-03-30T10:01:32Z"
}
]
}

Show watermarks with table output using a custom state path:

Terminal window
rocky -o table --state-path /var/rocky/state.redb state
table | last_value | updated_at
-----------------------------------------------------+---------------------------+---------------------------
acme_warehouse.staging__us_west__shopify.orders | 2026-03-30T10:00:00Z | 2026-03-30T10:01:32Z
acme_warehouse.staging__us_west__shopify.customers | 2026-03-30T09:55:00Z | 2026-03-30T10:01:32Z
acme_warehouse.staging__eu_central__stripe.charges | 2026-03-29T22:15:00Z | 2026-03-30T10:01:32Z