-
Notifications
You must be signed in to change notification settings - Fork 163
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* poc: microbatch using merge * update base tests * use dynamic insert_overwrite under the hood for bigquery * changelog entry * clean up validation + add testing
- Loading branch information
1 parent
563633b
commit 4932b96
Showing
5 changed files
with
157 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
kind: Features | ||
body: Add Microbatch Strategy to dbt-spark | ||
This comment has been minimized.
Sorry, something went wrong. |
||
time: 2024-09-25T23:22:38.216277+01:00 | ||
custom: | ||
Author: michelleark | ||
Issue: "1354" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
dbt/include/bigquery/macros/materializations/incremental_strategy/microbatch.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
{% macro bq_validate_microbatch_config(config) %} | ||
{% if config.get("partition_by") is none %} | ||
{% set missing_partition_msg -%} | ||
The 'microbatch' strategy requires a `partition_by` config. | ||
{%- endset %} | ||
{% do exceptions.raise_compiler_error(missing_partition_msg) %} | ||
{% endif %} | ||
|
||
{% if config.get("partition_by").granularity != config.get('batch_size') %} | ||
{% set invalid_partition_by_granularity_msg -%} | ||
The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`. | ||
Got: | ||
`batch_size`: {{ config.get('batch_size') }} | ||
`partition_by.granularity`: {{ config.get("partition_by").granularity }} | ||
{%- endset %} | ||
{% do exceptions.raise_compiler_error(invalid_partition_by_granularity_msg) %} | ||
{% endif %} | ||
{% endmacro %} | ||
|
||
{% macro bq_generate_microbatch_build_sql( | ||
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions | ||
) %} | ||
{% set build_sql = bq_insert_overwrite_sql( | ||
tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns, tmp_relation_exists, copy_partitions | ||
) %} | ||
|
||
{{ return(build_sql) }} | ||
{% endmacro %} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
55 changes: 55 additions & 0 deletions
55
tests/functional/adapter/incremental/test_incremental_microbatch.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
import os | ||
import pytest | ||
from unittest import mock | ||
|
||
from dbt.tests.util import run_dbt_and_capture | ||
from dbt.tests.adapter.incremental.test_incremental_microbatch import ( | ||
BaseMicrobatch, | ||
patch_microbatch_end_time, | ||
) | ||
|
||
from tests.functional.adapter.incremental.incremental_strategy_fixtures import ( | ||
microbatch_model_no_unique_id_sql, | ||
microbatch_input_sql, | ||
microbatch_model_no_partition_by_sql, | ||
microbatch_model_invalid_partition_by_sql, | ||
) | ||
|
||
|
||
class TestBigQueryMicrobatch(BaseMicrobatch): | ||
@pytest.fixture(scope="class") | ||
def microbatch_model_sql(self) -> str: | ||
return microbatch_model_no_unique_id_sql | ||
|
||
|
||
class TestBigQueryMicrobatchMissingPartitionBy: | ||
@pytest.fixture(scope="class") | ||
def models(self) -> str: | ||
return { | ||
"microbatch.sql": microbatch_model_no_partition_by_sql, | ||
"input_model.sql": microbatch_input_sql, | ||
} | ||
|
||
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) | ||
def test_execution_failure_no_partition_by(self, project): | ||
with patch_microbatch_end_time("2020-01-03 13:57:00"): | ||
_, stdout = run_dbt_and_capture(["run"], expect_pass=False) | ||
assert "The 'microbatch' strategy requires a `partition_by` config" in stdout | ||
|
||
|
||
class TestBigQueryMicrobatchInvalidPartitionByGranularity: | ||
@pytest.fixture(scope="class") | ||
def models(self) -> str: | ||
return { | ||
"microbatch.sql": microbatch_model_invalid_partition_by_sql, | ||
"input_model.sql": microbatch_input_sql, | ||
} | ||
|
||
@mock.patch.dict(os.environ, {"DBT_EXPERIMENTAL_MICROBATCH": "True"}) | ||
def test_execution_failure_no_partition_by(self, project): | ||
with patch_microbatch_end_time("2020-01-03 13:57:00"): | ||
_, stdout = run_dbt_and_capture(["run"], expect_pass=False) | ||
assert ( | ||
"The 'microbatch' strategy requires a `partition_by` config with the same granularity as its configured `batch_size`" | ||
in stdout | ||
) |
dbt-spark -> dbt-bigquery ? :)