Skip to content

Commit

Permalink
Add model name and filepath to dbt conversion parsing err messages
Browse files Browse the repository at this point in the history
  • Loading branch information
treysp committed Nov 11, 2024
1 parent cb6e247 commit 96b25b4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
24 changes: 24 additions & 0 deletions docs/integrations/engines/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,30 @@ from other_schema.other_table;

Note that there is an `=` between the `primary_key` key name and value `col1`.

### TTL

Clickhouse tables accept a [TTL expression that triggers actions](https://clickhouse.com/docs/en/guides/developer/ttl) like deleting rows after a certain amount of time has passed.

Similar to `ORDER_BY` and `PRIMARY_KEY`, specify a TTL key in the model DDL's `physical_properties` dictionary. For example:

``` sql linenums="1" hl_lines="6"
MODEL (
name my_schema.my_log_table,
kind full,
physical_properties (
order_by = (col1, col2),
primary_key = col1,
ttl = timestamp + INTERVAL 1 WEEK
)
);

select
*
from other_schema.other_table;
```

Note that there is an `=` between the `ttl` key name and value `timestamp + INTERVAL 1 WEEK`.

### Partitioning

Some Clickhouse table engines support partitioning. Specify the partitioning columns/expressions in the model DDL's `partitioned_by` key.
Expand Down
20 changes: 15 additions & 5 deletions sqlmesh/dbt/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ def _big_query_partition_by_expr(self, context: DbtContext) -> exp.Expression:
try:
field = d.parse_one(raw_field, dialect="bigquery")
except SqlglotError as e:
raise ConfigError(f"Failed to parse partition_by field '{raw_field}': {e}") from e
raise ConfigError(
f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{raw_field}' in '{self.path}': {e}"
) from e

if data_type == "date" and self.partition_by["granularity"].lower() == "day":
return field
Expand Down Expand Up @@ -393,7 +395,9 @@ def to_sqlmesh(self, context: DbtContext) -> Model:
try:
partitioned_by.append(d.parse_one(p, dialect=model_dialect))
except SqlglotError as e:
raise ConfigError(f"Failed to parse partition_by field '{p}': {e}") from e
raise ConfigError(
f"Failed to parse model '{self.canonical_name(context)}' partition_by field '{p}' in '{self.path}': {e}"
) from e
else:
partitioned_by.append(self._big_query_partition_by_expr(context))
optional_kwargs["partitioned_by"] = partitioned_by
Expand All @@ -404,7 +408,9 @@ def to_sqlmesh(self, context: DbtContext) -> Model:
try:
clustered_by.append(d.parse_one(c, dialect=model_dialect).name)
except SqlglotError as e:
raise ConfigError(f"Failed to parse cluster_by field '{c}': {e}") from e
raise ConfigError(
f"Failed to parse model '{self.canonical_name(context)}' cluster_by field '{c}' in '{self.path}': {e}"
) from e
optional_kwargs["clustered_by"] = clustered_by

model_kwargs = self.sqlmesh_model_kwargs(context)
Expand Down Expand Up @@ -486,7 +492,9 @@ def to_sqlmesh(self, context: DbtContext) -> Model:
try:
order_by.append(d.parse_one(o, dialect=model_dialect))
except SqlglotError as e:
raise ConfigError(f"Failed to parse 'order_by' field '{o}': {e}") from e
raise ConfigError(
f"Failed to parse model '{self.canonical_name(context)}' 'order_by' field '{o}' in '{self.path}': {e}"
) from e
physical_properties["order_by"] = order_by

if self.primary_key:
Expand All @@ -495,7 +503,9 @@ def to_sqlmesh(self, context: DbtContext) -> Model:
try:
primary_key.append(d.parse_one(p, dialect=model_dialect))
except SqlglotError as e:
raise ConfigError(f"Failed to parse 'primary_key' field '{p}': {e}") from e
raise ConfigError(
f"Failed to parse model '{self.canonical_name(context)}' 'primary_key' field '{p}' in '{self.path}': {e}"
) from e
physical_properties["primary_key"] = primary_key

if self.sharding_key:
Expand Down

0 comments on commit 96b25b4

Please sign in to comment.