From 20e6567f4c3c07990a6cd20c4b705d33c6c312ac Mon Sep 17 00:00:00 2001 From: "Alberto J. Gomez" Date: Mon, 23 Sep 2024 18:30:18 +0200 Subject: [PATCH 1/2] add partitioning and make location same as the table name --- examples/simple-pipeline/assets/athena.sql | 3 ++- pkg/athena/materialization.go | 12 +++++++++--- pkg/athena/materialization_test.go | 22 +++++++++++++++++++--- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/examples/simple-pipeline/assets/athena.sql b/examples/simple-pipeline/assets/athena.sql index f79f9bf..b38552c 100644 --- a/examples/simple-pipeline/assets/athena.sql +++ b/examples/simple-pipeline/assets/athena.sql @@ -1,8 +1,9 @@ /* @bruin -name: travellers +name: destinations type: athena.sql materialization: type: table + partition_by: id # strategy: append # strategy: merge # incremental_key: id diff --git a/pkg/athena/materialization.go b/pkg/athena/materialization.go index 1d80115..2755920 100644 --- a/pkg/athena/materialization.go +++ b/pkg/athena/materialization.go @@ -52,7 +52,7 @@ func buildIncrementalQuery(task *pipeline.Asset, query, location string) ([]stri tempTableName := "__bruin_tmp_" + helpers.PrefixGenerator() queries := []string{ - fmt.Sprintf("CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s') AS %s", tempTableName, location, tempTableName, query), + fmt.Sprintf("CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s') AS %s", tempTableName, location, task.Name, query), fmt.Sprintf("DELETE FROM %s WHERE %s in (SELECT DISTINCT %s FROM %s)", task.Name, mat.IncrementalKey, mat.IncrementalKey, tempTableName), fmt.Sprintf("INSERT INTO %s SELECT * FROM %s", task.Name, tempTableName), "DROP TABLE IF EXISTS " + tempTableName, @@ -114,12 +114,18 @@ func buildCreateReplaceQuery(task *pipeline.Asset, query, location string) ([]st tempTableName := "__bruin_tmp_" + helpers.PrefixGenerator() + var partitionBy string + if task.Materialization.PartitionBy != "" { + partitionBy = fmt.Sprintf(", partitioning = ARRAY['%s']", task.Materialization.PartitionBy) + } + return []string{ fmt.Sprintf( - "CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s') AS %s", + "CREATE TABLE %s WITH (table_type='ICEBERG', is_external=false, location='%s/%s'%s) AS %s", tempTableName, location, - tempTableName, + task.Name, + partitionBy, query, ), "DROP TABLE IF EXISTS " + task.Name, diff --git a/pkg/athena/materialization_test.go b/pkg/athena/materialization_test.go index 34cd4cb..e63e4f9 100644 --- a/pkg/athena/materialization_test.go +++ b/pkg/athena/materialization_test.go @@ -45,7 +45,23 @@ func TestMaterializer_Render(t *testing.T) { }, query: "SELECT 1", want: []string{ - "CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/__bruin_tmp_abcefghi') AS SELECT 1", + "CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset') AS SELECT 1", + "DROP TABLE IF EXISTS my.asset", + "ALTER TABLE __bruin_tmp_abcefghi RENAME TO my.asset", + }, + }, + { + name: "materialize to a table, with partition, default to create+replace", + task: &pipeline.Asset{ + Name: "my.asset", + Materialization: pipeline.Materialization{ + Type: pipeline.MaterializationTypeTable, + PartitionBy: "some_column", + }, + }, + query: "SELECT 1 as some_column", + want: []string{ + "CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset', partitioning = ARRAY['some_column']) AS SELECT 1 as some_column", "DROP TABLE IF EXISTS my.asset", "ALTER TABLE __bruin_tmp_abcefghi RENAME TO my.asset", }, @@ -62,7 +78,7 @@ func TestMaterializer_Render(t *testing.T) { fullRefresh: true, query: "SELECT 1", want: []string{ - "CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/__bruin_tmp_abcefghi') AS SELECT 1", + "CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset') AS SELECT 1", "DROP TABLE IF EXISTS my.asset", "ALTER TABLE __bruin_tmp_abcefghi RENAME TO my.asset", }, @@ -114,7 +130,7 @@ func TestMaterializer_Render(t *testing.T) { }, }, query: "SELECT 1", - want: []string{"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/__bruin_tmp_abcefghi') AS SELECT 1", "DELETE FROM my.asset WHERE dt in (SELECT DISTINCT dt FROM __bruin_tmp_abcefghi)", "INSERT INTO my.asset SELECT * FROM __bruin_tmp_abcefghi", "DROP TABLE IF EXISTS __bruin_tmp_abcefghi"}, + want: []string{"CREATE TABLE __bruin_tmp_abcefghi WITH (table_type='ICEBERG', is_external=false, location='s3://bucket/my.asset') AS SELECT 1", "DELETE FROM my.asset WHERE dt in (SELECT DISTINCT dt FROM __bruin_tmp_abcefghi)", "INSERT INTO my.asset SELECT * FROM __bruin_tmp_abcefghi", "DROP TABLE IF EXISTS __bruin_tmp_abcefghi"}, }, { name: "merge without columns", From 5249ac16582fd690b38b8de895ac80a6a6abcd70 Mon Sep 17 00:00:00 2001 From: "Alberto J. Gomez" Date: Tue, 24 Sep 2024 14:17:01 +0200 Subject: [PATCH 2/2] add docs --- docs/assets/athena.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/docs/assets/athena.md b/docs/assets/athena.md index bf57f68..cfd401d 100644 --- a/docs/assets/athena.md +++ b/docs/assets/athena.md @@ -40,3 +40,22 @@ select from sales_data group by order_date; ``` + +Bruin athena assets support partition by one column only +```sql +/* @bruin +name: daily_sales_analysis.view +type: athena.sql +materialization: + type: table + partition_by: order_date # <---------- +@bruin */ + +select + order_date, + sum(total_amount) as total_sales, + count(distinct order_id) as total_orders, + avg(total_amount) as avg_order_value +from sales_data +group by order_date; +```