Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athena Fixes #85

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/assets/athena.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,22 @@ select
from sales_data
group by order_date;
```

Bruin athena assets support partition by one column only
```sql
/* @bruin
name: daily_sales_analysis.view
type: athena.sql
materialization:
type: table
partition_by: order_date # <----------
@bruin */

select
order_date,
sum(total_amount) as total_sales,
count(distinct order_id) as total_orders,
avg(total_amount) as avg_order_value
from sales_data
group by order_date;
```
3 changes: 2 additions & 1 deletion examples/simple-pipeline/assets/athena.sql
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/* @bruin
name: travellers
name: destinations
type: athena.sql
materialization:
type: table
partition_by: id
# strategy: append
# strategy: merge
# incremental_key: id
Expand Down
12 changes: 9 additions & 3 deletions pkg/athena/materialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func buildIncrementalQuery(task *pipeline.Asset, query, location string) ([]stri
tempTableName := "__bruin_tmp_" + helpers.PrefixGenerator()

queries := []string{
fmt.Sprintf("CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s') AS %s", tempTableName, location, tempTableName, query),
fmt.Sprintf("CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s') AS %s", tempTableName, location, task.Name, query),
fmt.Sprintf("DELETE FROM %s WHERE %s in (SELECT DISTINCT %s FROM %s)", task.Name, mat.IncrementalKey, mat.IncrementalKey, tempTableName),
fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", task.Name, tempTableName),
"DROP TABLE IF EXISTS " + tempTableName,
Expand Down Expand Up @@ -114,12 +114,18 @@ func buildCreateReplaceQuery(task *pipeline.Asset, query, location string) ([]st

tempTableName := "__bruin_tmp_" + helpers.PrefixGenerator()

var partitionBy string
if task.Materialization.PartitionBy != "" {
partitionBy = fmt.Sprintf(", partitioning = ARRAY['%s']", task.Materialization.PartitionBy)
}

return []string{
fmt.Sprintf(
"CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s') AS %s",
"CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s'%s) AS %s",
tempTableName,
location,
tempTableName,
task.Name,
partitionBy,
query,
),
"DROP TABLE IF EXISTS " + task.Name,
Expand Down
22 changes: 19 additions & 3 deletions pkg/athena/materialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,23 @@ func TestMaterializer_Render(t *testing.T) {
},
query: "SELECT 1",
want: []string{
"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/__bruin_tmp_abcefghi') AS SELECT 1",
"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset') AS SELECT 1",
"DROP TABLE IF EXISTS my.asset",
"ALTER TABLE __bruin_tmp_abcefghi RENAME TO my.asset",
},
},
{
name: "materialize to a table, with partition, default to create+replace",
task: &pipeline.Asset{
Name: "my.asset",
Materialization: pipeline.Materialization{
Type: pipeline.MaterializationTypeTable,
PartitionBy: "some_column",
},
},
query: "SELECT 1 as some_column",
want: []string{
"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset', partitioning = ARRAY['some_column']) AS SELECT 1 as some_column",
"DROP TABLE IF EXISTS my.asset",
"ALTER TABLE __bruin_tmp_abcefghi RENAME TO my.asset",
},
Expand All @@ -62,7 +78,7 @@ func TestMaterializer_Render(t *testing.T) {
fullRefresh: true,
query: "SELECT 1",
want: []string{
"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/__bruin_tmp_abcefghi') AS SELECT 1",
"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset') AS SELECT 1",
"DROP TABLE IF EXISTS my.asset",
"ALTER TABLE __bruin_tmp_abcefghi RENAME TO my.asset",
},
Expand Down Expand Up @@ -114,7 +130,7 @@ func TestMaterializer_Render(t *testing.T) {
},
},
query: "SELECT 1",
want: []string{"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/__bruin_tmp_abcefghi') AS SELECT 1", "DELETE FROM my.asset WHERE dt in (SELECT DISTINCT dt FROM __bruin_tmp_abcefghi)", "INSERT INTO my.asset SELECT * FROM __bruin_tmp_abcefghi", "DROP TABLE IF EXISTS __bruin_tmp_abcefghi"},
want: []string{"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset') AS SELECT 1", "DELETE FROM my.asset WHERE dt in (SELECT DISTINCT dt FROM __bruin_tmp_abcefghi)", "INSERT INTO my.asset SELECT * FROM __bruin_tmp_abcefghi", "DROP TABLE IF EXISTS __bruin_tmp_abcefghi"},
},
{
name: "merge without columns",
Expand Down
Loading