Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/compatibility/cube.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Not mapped: `propagate_filters_to_sub_query`.
|---------|--------|
| `type: count` | Supported |
| `type: count_distinct` | Supported |
| `type: count_distinct_approx` | Supported (maps to `count_distinct`, approximation semantics lost) |
| `type: count_distinct_approx` | Supported (maps to `approx_count_distinct`) |
| `type: sum` | Supported |
| `type: avg` | Supported |
| `type: min` | Supported |
Expand Down Expand Up @@ -183,7 +183,7 @@ Pre-aggregations are fully mapped to Sidemantic's `PreAggregation` model, includ
|---------|--------|
| `type: rollup` | Supported |
| `type: rollupJoin` / `rollup_join` | Supported (type normalized to `rollup_join`) |
| `type: rollupLambda` / `lambda` | Supported (type normalized to `lambda`) |
| `type: rollupLambda` / `lambda` | Supported (type normalized to `lambda`). With `unionWithSourceData` + `build_range_end`, queries are served as a UNION of the batch rollup (older buckets) and a fresh source aggregation (newer rows), re-aggregated at the query grain. |
| `type: original_sql` | Supported |
| `measures` (list of measure references) | Supported (`CUBE.` prefix stripped) |
| `dimensions` (list of dimension references) | Supported (`CUBE.` prefix stripped) |
Expand All @@ -198,7 +198,8 @@ Pre-aggregations are fully mapped to Sidemantic's `PreAggregation` model, includ
| `indexes: [{ name, columns, type }]` | Supported |
| `build_range_start` / `build_range_end` | Supported (SQL expression extracted) |
| Cross-cube dimension references in pre-aggs (e.g., `visitors.source`) | Partial support: parsed as dimension name string; the cross-cube prefix is not stripped. |
| `rollups` (list of rollup references for rollupJoin/rollupLambda) | Unsupported (not stored) |
| `rollups` (list of rollup references for rollupJoin/rollupLambda) | Stored (`CUBE.` prefix stripped on import, re-prefixed on export). Used for round-trip; query routing matches the lambda directly on its own grain. |
| `unionWithSourceData` (rollupLambda real-time union) | Supported (executes the batch+source union when `build_range_end` is set). |
| Empty pre-aggregation sections (YAML null) | Supported (treated as empty list) |

---
Expand Down
44 changes: 44 additions & 0 deletions sidemantic-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,22 @@
"default": null,
"description": "Refresh strategy configuration"
},
"rollups": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "For type='lambda' (Cube rollupLambda): the constituent rollups this lambda unions. Stored for round-trip; query routing matches this lambda directly on its own measures/dimensions/granularity.",
"title": "Rollups"
},
"scheduled_refresh": {
"default": true,
"description": "Whether to enable scheduled refresh",
Expand Down Expand Up @@ -1260,6 +1276,12 @@
],
"title": "Type",
"type": "string"
},
"union_with_source_data": {
"default": false,
"description": "For type='lambda': when True (and build_range_end is set), serve a query as a UNION of the batch rollup table (buckets before build_range_end) with a fresh aggregation of source rows at/after build_range_end, re-aggregated at the query grain.",
"title": "Union With Source Data",
"type": "boolean"
}
},
"required": [
Expand Down Expand Up @@ -3357,6 +3379,22 @@
"default": null,
"description": "Refresh strategy configuration"
},
"rollups": {
"anyOf": [
{
"items": {
"type": "string"
},
"type": "array"
},
{
"type": "null"
}
],
"default": null,
"description": "For type='lambda' (Cube rollupLambda): the constituent rollups this lambda unions. Stored for round-trip; query routing matches this lambda directly on its own measures/dimensions/granularity.",
"title": "Rollups"
},
"scheduled_refresh": {
"default": true,
"description": "Whether to enable scheduled refresh",
Expand Down Expand Up @@ -3400,6 +3438,12 @@
],
"title": "Type",
"type": "string"
},
"union_with_source_data": {
"default": false,
"description": "For type='lambda': when True (and build_range_end is set), serve a query as a UNION of the batch rollup table (buckets before build_range_end) with a fresh aggregation of source rows at/after build_range_end, re-aggregated at the query grain.",
"title": "Union With Source Data",
"type": "boolean"
}
},
"required": [
Expand Down
17 changes: 17 additions & 0 deletions sidemantic/adapters/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,17 @@ def _parse_preaggregation(self, preagg_def: dict, cube_name: str) -> PreAggregat
dim_name = dim_ref.replace("CUBE.", "").replace(f"{cube_name}.", "")
dimensions.append(dim_name)

# Extract rollups (rollupLambda/rollupJoin constituents) - strip CUBE prefix
rollups = []
for rollup_ref in preagg_def.get("rollups") or []:
if isinstance(rollup_ref, str):
rollups.append(rollup_ref.replace("CUBE.", "").replace(f"{cube_name}.", ""))

# Lambda union flag (Cube rollupLambda real-time union); accept snake/camel case.
union_with_source_data = bool(
preagg_def.get("union_with_source_data", preagg_def.get("unionWithSourceData", False))
)

# Parse time dimension
time_dimension = preagg_def.get("time_dimension")
if time_dimension:
Expand Down Expand Up @@ -880,6 +891,8 @@ def _parse_preaggregation(self, preagg_def: dict, cube_name: str) -> PreAggregat
indexes=indexes if indexes else None,
build_range_start=build_range_start,
build_range_end=build_range_end,
rollups=rollups if rollups else None,
union_with_source_data=union_with_source_data,
)

def _parse_view(self, view_def: dict, graph: SemanticGraph, parent_model: Model | None = None) -> Model | None:
Expand Down Expand Up @@ -1337,6 +1350,10 @@ def _export_cube(self, model: Model, resolved_models: dict[str, Model]) -> dict:
preagg_def["build_range_start"] = {"sql": preagg.build_range_start}
if preagg.build_range_end:
preagg_def["build_range_end"] = {"sql": preagg.build_range_end}
if preagg.rollups:
preagg_def["rollups"] = [f"CUBE.{r}" for r in preagg.rollups]
if preagg.union_with_source_data:
preagg_def["union_with_source_data"] = True
cube["pre_aggregations"].append(preagg_def)

return cube
7 changes: 6 additions & 1 deletion sidemantic/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ def refresh(

typer.echo(f"\nRefreshing {len(preaggs_to_refresh)} pre-aggregation(s)...\n", err=True)

# Get dialect from connection string for engine mode
# Determine the dialect: drives index DDL (DuckDB/Postgres) and engine MVs.
dialect = None
if mode == "engine":
if "snowflake" in connection_str:
Expand All @@ -1819,6 +1819,8 @@ def refresh(
typer.echo(f"Error: Unsupported dialect for engine mode: {connection_str}", err=True)
typer.echo("Engine mode supports: snowflake, clickhouse, bigquery", err=True)
raise typer.Exit(1)
elif connection_str.startswith("duckdb://"):
dialect = "duckdb"

# Refresh each pre-aggregation
for model_name, model_obj, preagg_obj in preaggs_to_refresh:
Expand All @@ -1844,6 +1846,9 @@ def refresh(
mode=mode,
watermark_column=watermark_column,
dialect=dialect,
model=model_obj,
database=database,
schema=schema,
)

# Print result
Expand Down
Loading
Loading