Skip to content

SQL Generation

Rocky generates all SQL from its internal IR (Intermediate Representation). No Jinja templates, no string concatenation with untrusted input. Every identifier is validated before it reaches a SQL statement.

CREATE CATALOG IF NOT EXISTS <catalog>
ALTER CATALOG <catalog> SET TAGS ('managed_by' = 'rocky', 'tenant' = 'acme')
DESCRIBE CATALOG <catalog>
CREATE SCHEMA IF NOT EXISTS <catalog>.<schema>
ALTER SCHEMA <catalog>.<schema> SET TAGS ('managed_by' = 'rocky', 'source' = 'shopify')
SHOW SCHEMAS IN <catalog>

Governance tags are applied to each replicated table:

ALTER TABLE <catalog>.<schema>.<table> SET TAGS ('managed_by' = 'rocky')

The core replication operation. Copies only rows newer than the latest timestamp in the target:

INSERT INTO <target_catalog>.<target_schema>.<table>
SELECT *, CAST(NULL AS STRING) AS _loaded_by
FROM <source_catalog>.<source_schema>.<table>
WHERE _fivetran_synced > (
SELECT COALESCE(MAX(_fivetran_synced), TIMESTAMP '1970-01-01')
FROM <target_catalog>.<target_schema>.<table>
)

Used when drift is detected or when explicitly configured:

CREATE OR REPLACE TABLE <target> AS SELECT * FROM <source>

For tables that require key-based deduplication:

MERGE INTO <target> AS t
USING (SELECT ... FROM <source>) AS s
ON t.key = s.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *

User-defined SQL is wrapped in the appropriate statement depending on the materialization strategy:

  • Table: CREATE OR REPLACE TABLE ... AS <user_sql>
  • Incremental: INSERT INTO ... <user_sql>
  • Merge: MERGE INTO ... USING (<user_sql>) ...
  • Materialized View: CREATE OR REPLACE MATERIALIZED VIEW ... AS <user_sql> (Databricks)
  • Dynamic Table: CREATE OR REPLACE DYNAMIC TABLE ... TARGET_LAG = '<lag>' AS <user_sql> (Snowflake)
  • Time Interval: Per-partition INSERT OVERWRITE with @start_date/@end_date substitution
CREATE OR REPLACE MATERIALIZED VIEW <catalog>.<schema>.<table> AS
<user_sql>
CREATE OR REPLACE DYNAMIC TABLE <catalog>.<schema>.<table>
TARGET_LAG = '<lag>'
WAREHOUSE = <warehouse>
AS <user_sql>

Per-warehouse SQL for partition-keyed materialization:

Databricks (Delta, atomic):

INSERT INTO <target>
REPLACE WHERE <time_column> >= '<start>' AND <time_column> < '<end>'
<user_sql with @start_date/@end_date substituted>

Snowflake (multi-statement transaction):

BEGIN;
DELETE FROM <target> WHERE <time_column> >= '<start>' AND <time_column> < '<end>';
INSERT INTO <target> <user_sql>;
COMMIT;
DESCRIBE TABLE <catalog>.<schema>.<table>

When drift is found:

DROP TABLE IF EXISTS <target_catalog>.<target_schema>.<table>
SHOW GRANTS ON CATALOG <catalog>
SHOW GRANTS ON SCHEMA <catalog>.<schema>
GRANT <PERMISSION> ON CATALOG <catalog> TO `<principal>`
REVOKE <PERMISSION> ON CATALOG <catalog> FROM `<principal>`

Row count (batched):

SELECT '<catalog>', '<schema>', '<table>', COUNT(*)
FROM <catalog>.<schema>.<table>
UNION ALL
SELECT '<catalog>', '<schema>', '<table>', COUNT(*)
FROM <catalog>.<schema>.<table>
-- ... up to 200 tables per batch

Freshness:

SELECT MAX(<timestamp_column>) FROM <catalog>.<schema>.<table>

Null rate (sampled):

SELECT ... FROM <catalog>.<schema>.<table> TABLESAMPLE (N PERCENT)

Rocky uses the Databricks REST API for workspace binding and isolation (not SQL):

  • PATCH /api/2.1/unity-catalog/bindings/catalog/{name} — Bind catalog to workspace IDs
  • PATCH /api/2.1/unity-catalog/catalogs/{name} — Set isolation_mode: "ISOLATED"

Find catalogs managed by Rocky using tags:

SELECT catalog_name
FROM system.information_schema.catalog_tags
WHERE tag_name = 'managed_by' AND tag_value = 'rocky'

All SQL generation follows strict safety rules:

  • Identifiers (catalogs, schemas, tables, tenants, regions, sources) are validated against ^[a-zA-Z0-9_]+$
  • Principal names are validated against ^[a-zA-Z0-9_ \-\.@]+$ and always wrapped in backticks
  • Rocky never uses format!() to interpolate untrusted input into SQL
  • All validation happens in rocky-sql/validation.rs before any SQL is constructed