From 7bb003802914b5f0c2431b7054deffdeb82bb3f8 Mon Sep 17 00:00:00 2001 From: poriz Date: Tue, 5 Mar 2024 00:10:41 +0900 Subject: [PATCH 1/5] [feat] add anal_broadcast.py, fix-live_viewer_script.py template(x) --- dags/elt/anal_broadcast.py | 120 +++++++++++++ ...ewer_template.py => live_viewer_script.py} | 167 +++++++++--------- 2 files changed, 205 insertions(+), 82 deletions(-) create mode 100644 dags/elt/anal_broadcast.py rename dataCollector/glue&athena/{live_viewer_template.py => live_viewer_script.py} (81%) diff --git a/dags/elt/anal_broadcast.py b/dags/elt/anal_broadcast.py new file mode 100644 index 0000000..5c8bbee --- /dev/null +++ b/dags/elt/anal_broadcast.py @@ -0,0 +1,120 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from datetime import datetime, timedelta +import slack + + +def connect_to_redshift(): + db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id") + conn = db_hook.get_conn() + conn.autocommit = True + + return conn.cursor() + + +@task +def elt(): + # 1. reshift external_raw_data 스키마 + cur = connect_to_redshift() + print("Successfully connected to Redshift") + conn = cur.connection + + try: + # Start a new transaction + conn.autocommit = False + + # analytics.ANAL_YSD_GAME_CCU 테이블의 모든 데이터 삭제 + sql = """ + DELETE FROM analytics.ANAL_BROADCAST; + """ + cur.execute(sql) + print("Successfully deleted all data from analytics.ANAL_BROADCAST") + + # SELECT 쿼리의 결과를 analytics.ANAL_YSD_GAME_CCU 테이블에 삽입 + sql = """ + INSERT INTO analytics.ANAL_BROADCAST(STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, AVG_VIEWER_NUM, BROADCAST_START_TIME, BROADCAST_END_TIME, GAME_DURATION, CREATED_DATE) + WITH ParsedData AS ( + SELECT *, live_collect_time::TIMESTAMPTZ AS parsed_time + FROM external_raw_data.table_name_raw_live_viewer + ), + RankedData AS( + SELECT + *, + LAG(parsed_time, 1) OVER ( + PARTITION BY streamer_id, broadcast_id, game_code + ORDER BY parsed_time + ) AS prev_timestamp + FROM ParsedData + ), + GroupedData AS( + SELECT *, + CASE + WHEN parsed_time - INTERVAL '5' MINUTE > prev_timestamp THEN 1 + ELSE 0 + END AS is_new_group, + COALESCE(NULLIF(game_code, ''), 'talk') AS normalized_game_code + FROM RankedData + ), + GroupIDs AS ( + SELECT *, + SUM(is_new_group) OVER ( + PARTITION BY streamer_id, broadcast_id, normalized_game_code + ORDER BY parsed_time + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS group_id, + normalized_game_code AS n_game_code + FROM GroupedData + ) + SELECT + s_info.streamer_nm AS STREAMER_NM, + g_ids.broadcast_id AS BROADCAST_ID, + COALESCE(g_info.game_nm,g_ids.n_game_code) AS GAME_NM, + g_ids.platform AS PLATFORM, + AVG(g_ids.viewer_num)::integer AS AVG_VIEWER_NUM, + MIN(g_ids.parsed_time) AS start_time, + MAX(g_ids.parsed_time) AS end_time, + (EXTRACT(EPOCH FROM MAX(g_ids.parsed_time)) - EXTRACT(EPOCH FROM MIN(g_ids.parsed_time))) / 60 AS GAME_DURATION, + CURRENT_DATE AS created_date + FROM GroupIDs AS g_ids + LEFT JOIN external_raw_data.streamer_info AS s_info ON s_info.streamer_id = g_ids.streamer_id + LEFT JOIN external_raw_data.game_info AS g_info + ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code + GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code + ORDER BY STREAMER_NM, BROADCAST_ID, start_time; + """ + cur.execute(sql) + print("Successfully inserted data into analytics.ANAL_BROADCAST") + + # 트랜잭션 commit + conn.commit() + print("Successfully committed the transaction") + + except Exception as e: + # Rollback + print("Error occurred. Start to rollback", e) + conn.rollback() + raise + + finally: + # Close the cursor and connection + cur.close() + conn.close() + print("Connection to Redshift is closed") + + +with DAG( + dag_id="ELT_ANAL_BROADCAST", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["ELT", "analytics", "game_ccu"], + schedule_interval="0 2 * * *", # 매일 11시 + default_args={ + "retries": 3, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": slack.on_failure_callback, + }, +) as dag: + + elt() diff --git a/dataCollector/glue&athena/live_viewer_template.py b/dataCollector/glue&athena/live_viewer_script.py similarity index 81% rename from dataCollector/glue&athena/live_viewer_template.py rename to dataCollector/glue&athena/live_viewer_script.py index 73b3804..796c3d3 100644 --- a/dataCollector/glue&athena/live_viewer_template.py +++ b/dataCollector/glue&athena/live_viewer_script.py @@ -1,82 +1,85 @@ -## live_viewer glue script template - -import sys -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext -from awsglue.context import GlueContext -from awsglue.job import Job -from pyspark.sql.functions import * -from awsglue.dynamicframe import DynamicFrame - -# SparkContext와 GlueContext 초기화 -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session - -# Job 초기화 (Job Bookmark 활성화 포함) -job = Job(glueContext) -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -job.init(args["JOB_NAME"], args) - -# S3에서 데이터를 읽어오는 부분 -datasource = glueContext.create_dynamic_frame.from_options( - "s3", - {"paths": ["{{ input_path }}"], "recurse": True}, - format="json", - transformation_ctx="datasource", -) -# 데이터 가공 -datasource_df = datasource.toDF() - -chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) -afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) - -# chzzk_source.printSchema() -chzzk_df = chzzk_source.select( - col("col.streamer_id").alias("STREAMER_ID"), - col("col.content.liveID").alias("BROADCAST_ID"), - col("col.current_time").alias("LIVE_COLLECT_TIME"), - col("col.content.liveTitle").alias("BROADCAST_TITLE"), - col("col.content.liveCategoryValue").alias("GAME_CODE"), - col("col.content.concurrentUserCount").alias("VIEWER_NUM"), -) -# add platform -chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) -# chzzk_df.show() - -# afreeca_source.printSchema() -afreeca_df = afreeca_source.select( - col("col.streamer_id").alias("STREAMER_ID"), - col("col.live_status.BNO").alias("BROADCAST_ID"), - col("col.current_time").alias("LIVE_COLLECT_TIME"), - col("col.live_status.TITLE").alias("BROADCAST_TITLE"), - col("col.live_status.CATE").alias("GAME_CODE"), - col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"), -) -afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca")) -# afreeca_df.show(truncate=False) - -result_df = chzzk_df.union(afreeca_df) - -# 스키마 정보를 로깅 -print("Schema Information:") -result_df.printSchema() - -# "PLATFORM" 컬럼을 기준으로 파티션을 구성 -partitioned_df = result_df.repartition("PLATFORM") - -# 파티션된 Spark DataFrame을 DynamicFrame으로 변환 -partitioned_dynamic_frame = DynamicFrame.fromDF( - partitioned_df, glueContext, "partitioned_dynamic_frame" -) - - -# Parquet으로 변환하여 S3에 저장 -glueContext.write_dynamic_frame.from_options( - frame=partitioned_dynamic_frame, - connection_type="s3", - connection_options={"path": "{{ output_path }}"}, - format="parquet", - transformation_ctx="datasource", -) +## live_viewer glue script template + +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from pyspark.sql.functions import * +from awsglue.dynamicframe import DynamicFrame + +# SparkContext와 GlueContext 초기화 +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session + +# Job 초기화 (Job Bookmark 활성화 포함) +job = Job(glueContext) +args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path']) +job.init(args["JOB_NAME"], args) + +input_path = args['input_path'] +output_path = args['output_path'] + +# S3에서 데이터를 읽어오는 부분 +datasource = glueContext.create_dynamic_frame.from_options( + "s3", + {"paths": [input_path], "recurse": True}, + format="json", + transformation_ctx="datasource", +) +# 데이터 가공 +datasource_df = datasource.toDF() + +chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) +afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) + +# chzzk_source.printSchema() +chzzk_df = chzzk_source.select( + col("col.streamer_id").alias("STREAMER_ID"), + col("col.content.liveID").alias("BROADCAST_ID"), + col("col.current_time").alias("LIVE_COLLECT_TIME"), + col("col.content.liveTitle").alias("BROADCAST_TITLE"), + col("col.content.liveCategoryValue").alias("GAME_CODE"), + col("col.content.concurrentUserCount").alias("VIEWER_NUM"), +) +# add platform +chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) +# chzzk_df.show() + +# afreeca_source.printSchema() +afreeca_df = afreeca_source.select( + col("col.streamer_id").alias("STREAMER_ID"), + col("col.live_status.BNO").alias("BROADCAST_ID"), + col("col.current_time").alias("LIVE_COLLECT_TIME"), + col("col.live_status.TITLE").alias("BROADCAST_TITLE"), + col("col.live_status.CATE").alias("GAME_CODE"), + col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"), +) +afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca")) +# afreeca_df.show(truncate=False) + +result_df = chzzk_df.union(afreeca_df) + +result_df = result_df.withColumn("GAME_CODE", when(col("GAME_CODE").isNull(), lit("talk")).otherwise(col("GAME_CODE"))) + +# 스키마 정보를 로깅 +print("Schema Information:") +result_df.printSchema() + +# "PLATFORM" 컬럼을 기준으로 파티션을 구성 +partitioned_df = result_df.repartition("PLATFORM") + +# 파티션된 Spark DataFrame을 DynamicFrame으로 변환 +partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame") + + +# Parquet으로 변환하여 S3에 저장 +glueContext.write_dynamic_frame.from_options( + frame=partitioned_dynamic_frame, + connection_type="s3", + connection_options={"path": output_path}, + format="parquet", + transformation_ctx="datasource", +) \ No newline at end of file From fe9d71b9164090b8402f29fb8de2c9702a2e7235 Mon Sep 17 00:00:00 2001 From: poriz Date: Tue, 5 Mar 2024 00:12:52 +0900 Subject: [PATCH 2/5] [feat] script change --- .../glue&athena/live_viewer_script.py | 70 +++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/dataCollector/glue&athena/live_viewer_script.py b/dataCollector/glue&athena/live_viewer_script.py index 796c3d3..b968b34 100644 --- a/dataCollector/glue&athena/live_viewer_script.py +++ b/dataCollector/glue&athena/live_viewer_script.py @@ -8,6 +8,7 @@ from awsglue.job import Job from pyspark.sql.functions import * from awsglue.dynamicframe import DynamicFrame +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType # SparkContext와 GlueContext 초기화 sc = SparkContext() @@ -29,41 +30,52 @@ format="json", transformation_ctx="datasource", ) + +# 공통 스키마 정의 +common_schema = StructType([ + StructField("STREAMER_ID", StringType(), True), + StructField("BROADCAST_ID", StringType(), True), + StructField("LIVE_COLLECT_TIME", TimestampType(), True), + StructField("BROADCAST_TITLE", StringType(), True), + StructField("GAME_CODE", StringType(), True), + StructField("VIEWER_NUM", IntegerType(), True), + StructField("PLATFORM", StringType(), True), +]) + # 데이터 가공 datasource_df = datasource.toDF() +try: + chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) + # chzzk_source.printSchema() + chzzk_df = chzzk_source.select( + col("col.streamer_id").alias("STREAMER_ID"), + col("col.content.liveID").alias("BROADCAST_ID"), + col("col.current_time").alias("LIVE_COLLECT_TIME"), + col("col.content.liveTitle").alias("BROADCAST_TITLE"), + col("col.content.liveCategoryValue").alias("GAME_CODE"), + col("col.content.concurrentUserCount").alias("VIEWER_NUM"), + ) + chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) +except: + chzzk_df = spark.createDataFrame([],schema = common_schema) -chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) -afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) +try: + afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) + afreeca_df = afreeca_source.select( + col("col.streamer_id").alias("STREAMER_ID"), + col("col.live_status.BNO").alias("BROADCAST_ID"), + col("col.current_time").alias("LIVE_COLLECT_TIME"), + col("col.live_status.TITLE").alias("BROADCAST_TITLE"), + col("col.live_status.CATE").alias("GAME_CODE"), + col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"), + ) + afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca")) +except: + afreeca_df = spark.createDataFrame([], schema=common_schema) -# chzzk_source.printSchema() -chzzk_df = chzzk_source.select( - col("col.streamer_id").alias("STREAMER_ID"), - col("col.content.liveID").alias("BROADCAST_ID"), - col("col.current_time").alias("LIVE_COLLECT_TIME"), - col("col.content.liveTitle").alias("BROADCAST_TITLE"), - col("col.content.liveCategoryValue").alias("GAME_CODE"), - col("col.content.concurrentUserCount").alias("VIEWER_NUM"), -) -# add platform -chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) -# chzzk_df.show() - -# afreeca_source.printSchema() -afreeca_df = afreeca_source.select( - col("col.streamer_id").alias("STREAMER_ID"), - col("col.live_status.BNO").alias("BROADCAST_ID"), - col("col.current_time").alias("LIVE_COLLECT_TIME"), - col("col.live_status.TITLE").alias("BROADCAST_TITLE"), - col("col.live_status.CATE").alias("GAME_CODE"), - col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"), -) -afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca")) -# afreeca_df.show(truncate=False) result_df = chzzk_df.union(afreeca_df) -result_df = result_df.withColumn("GAME_CODE", when(col("GAME_CODE").isNull(), lit("talk")).otherwise(col("GAME_CODE"))) - # 스키마 정보를 로깅 print("Schema Information:") result_df.printSchema() @@ -82,4 +94,4 @@ connection_options={"path": output_path}, format="parquet", transformation_ctx="datasource", -) \ No newline at end of file +) From 3b96c214f2691107ebb63da18a0304f3a1057ced Mon Sep 17 00:00:00 2001 From: poriz Date: Tue, 5 Mar 2024 00:22:36 +0900 Subject: [PATCH 3/5] [feat] script change add 19 filter --- dataCollector/glue&athena/live_viewer_script.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/dataCollector/glue&athena/live_viewer_script.py b/dataCollector/glue&athena/live_viewer_script.py index b968b34..4bfeb0f 100644 --- a/dataCollector/glue&athena/live_viewer_script.py +++ b/dataCollector/glue&athena/live_viewer_script.py @@ -46,8 +46,11 @@ datasource_df = datasource.toDF() try: chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) + + chz_filtered_source = chzzk_source.filter(col("col.content.adult") == 'false') + # chzzk_source.printSchema() - chzzk_df = chzzk_source.select( + chzzk_df = chz_filtered_source.select( col("col.streamer_id").alias("STREAMER_ID"), col("col.content.liveID").alias("BROADCAST_ID"), col("col.current_time").alias("LIVE_COLLECT_TIME"), @@ -61,7 +64,9 @@ try: afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) - afreeca_df = afreeca_source.select( + afc_filtered_source = afreeca_source.filter(col("col.broad_info.broad.broad_grade") < 19) + + afreeca_df = afc_filtered_source.select( col("col.streamer_id").alias("STREAMER_ID"), col("col.live_status.BNO").alias("BROADCAST_ID"), col("col.current_time").alias("LIVE_COLLECT_TIME"), From de86b075032e5e3c88c1a76dba8865b52a99196c Mon Sep 17 00:00:00 2001 From: poriz Date: Tue, 5 Mar 2024 13:35:40 +0900 Subject: [PATCH 4/5] [feat] analytics table add --- dags/elt/anal_broadcast.py | 5 +- dags/elt/anal_week_broadcast_duration.py | 79 +++++++++++++++++++++++ dags/elt/anal_week_broadcast_freq.py | 80 ++++++++++++++++++++++++ 3 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 dags/elt/anal_week_broadcast_duration.py create mode 100644 dags/elt/anal_week_broadcast_freq.py diff --git a/dags/elt/anal_broadcast.py b/dags/elt/anal_broadcast.py index 5c8bbee..198ed27 100644 --- a/dags/elt/anal_broadcast.py +++ b/dags/elt/anal_broadcast.py @@ -81,6 +81,7 @@ def elt(): LEFT JOIN external_raw_data.streamer_info AS s_info ON s_info.streamer_id = g_ids.streamer_id LEFT JOIN external_raw_data.game_info AS g_info ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code + WHERE g_ids.broadcast_id IS NOT NULL AND g_ids.parsed_time::DATE - 1 >= (CURRENT_DATE - 8) GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code ORDER BY STREAMER_NM, BROADCAST_ID, start_time; """ @@ -108,8 +109,8 @@ def elt(): dag_id="ELT_ANAL_BROADCAST", start_date=datetime(2024, 1, 1), catchup=False, - tags=["ELT", "analytics", "game_ccu"], - schedule_interval="0 2 * * *", # 매일 11시 + tags=["ELT", "analytics", "broadcast"], + schedule_interval="0 22 * * *", # 매일 11시 default_args={ "retries": 3, "retry_delay": timedelta(minutes=5), diff --git a/dags/elt/anal_week_broadcast_duration.py b/dags/elt/anal_week_broadcast_duration.py new file mode 100644 index 0000000..991833e --- /dev/null +++ b/dags/elt/anal_week_broadcast_duration.py @@ -0,0 +1,79 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from datetime import datetime, timedelta +import slack + + +def connect_to_redshift(): + db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id") + conn = db_hook.get_conn() + conn.autocommit = True + + return conn.cursor() + + +@task +def elt(): + # 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기 + cur = connect_to_redshift() + print("Successfully connected to Redshift") + conn = cur.connection + + try: + # Start a new transaction + conn.autocommit = False + + # analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제 + sql = """ + DELETE FROM analytics.anal_week_broadcast_duration; + """ + cur.execute(sql) + print("Successfully deleted all data from analytics.anal_week_broadcast_duration") + + # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 + sql = """ + INSERT INTO analytics.anal_week_broadcast_duration(GAME_NM, STREAMER_NM, BROADCAST_DURATION, CREATED_DATE) + select + game_nm AS GAME_NM, + streamer_nm AS STREAMER_NM, + SUM(game_duration) AS BROADCAST_DURATION, + current_date as created_date + from dev.analytics.anal_broadcast + group by 1,2; + """ + cur.execute(sql) + print("Successfully inserted data into analytics.anal_week_broadcast_duration") + + # 트랜잭션 commit + conn.commit() + print("Successfully committed the transaction") + + except Exception as e: + # Rollback + print("Error occurred. Start to rollback", e) + conn.rollback() + raise + + finally: + # Close the cursor and connection + cur.close() + conn.close() + print("Connection to Redshift is closed") + + +with DAG( + dag_id="ELT_Anal_Week_Broadcast_Duration", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["ELT", "analytics", "broadcast"], + schedule_interval="0 2 * * *", # 11:00(KST) + default_args={ + "retries": 3, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": slack.on_failure_callback, + }, +) as dag: + + elt() diff --git a/dags/elt/anal_week_broadcast_freq.py b/dags/elt/anal_week_broadcast_freq.py new file mode 100644 index 0000000..a167acb --- /dev/null +++ b/dags/elt/anal_week_broadcast_freq.py @@ -0,0 +1,80 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from datetime import datetime, timedelta +import slack + + +def connect_to_redshift(): + db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id") + conn = db_hook.get_conn() + conn.autocommit = True + + return conn.cursor() + + +@task +def elt(): + # 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기 + cur = connect_to_redshift() + print("Successfully connected to Redshift") + conn = cur.connection + + try: + # Start a new transaction + conn.autocommit = False + + # analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제 + sql = """ + DELETE FROM analytics.anal_week_broadcast_freq; + """ + cur.execute(sql) + print("Successfully deleted all data from analytics.anal_week_broadcast_freq") + + # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 + sql = """ + INSERT INTO analytics.anal_week_broadcast_freq(GAME_NM, STREAMER_NM, BROADCAST_FREQ, CREATED_DATE) + select + game_nm AS GAME_NM, + streamer_nm AS STREAMER_NM, + COUNT(distinct broadcast_id) AS BROADCAST_FREQ, + current_date as created_date + from dev.analytics.anal_broadcast + group by 1,2 + order by 4 DESC; + """ + cur.execute(sql) + print("Successfully inserted data into analytics.anal_week_broadcast_freq") + + # 트랜잭션 commit + conn.commit() + print("Successfully committed the transaction") + + except Exception as e: + # Rollback + print("Error occurred. Start to rollback", e) + conn.rollback() + raise + + finally: + # Close the cursor and connection + cur.close() + conn.close() + print("Connection to Redshift is closed") + + +with DAG( + dag_id="ELT_Anal_Week_Broadcast_Freq", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["ELT", "analytics", "broadcast"], + schedule_interval="0 2 * * *", # 11:00(KST) + default_args={ + "retries": 3, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": slack.on_failure_callback, + }, +) as dag: + + elt() From 5e60b005654f78c9fbcf9c5ec6751e6bbc1382ef Mon Sep 17 00:00:00 2001 From: poriz Date: Tue, 5 Mar 2024 13:55:34 +0900 Subject: [PATCH 5/5] [feat] change analytics time --- dags/elt/anal_broadcast.py | 6 ++++-- dags/elt/anal_week_broadcast_duration.py | 2 +- dags/elt/anal_week_broadcast_freq.py | 2 +- dags/elt/anal_week_game_viewer.py | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/dags/elt/anal_broadcast.py b/dags/elt/anal_broadcast.py index 198ed27..e707f09 100644 --- a/dags/elt/anal_broadcast.py +++ b/dags/elt/anal_broadcast.py @@ -81,7 +81,9 @@ def elt(): LEFT JOIN external_raw_data.streamer_info AS s_info ON s_info.streamer_id = g_ids.streamer_id LEFT JOIN external_raw_data.game_info AS g_info ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code - WHERE g_ids.broadcast_id IS NOT NULL AND g_ids.parsed_time::DATE - 1 >= (CURRENT_DATE - 8) + WHERE g_ids.broadcast_id IS NOT NULL + AND g_ids.parsed_time >= (CURRENT_DATE - INTERVAL '7 days' + INTERVAL '6 hours') + AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours') GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code ORDER BY STREAMER_NM, BROADCAST_ID, start_time; """ @@ -110,7 +112,7 @@ def elt(): start_date=datetime(2024, 1, 1), catchup=False, tags=["ELT", "analytics", "broadcast"], - schedule_interval="0 22 * * *", # 매일 11시 + schedule_interval="0 22 * * *", # 매일 07시 default_args={ "retries": 3, "retry_delay": timedelta(minutes=5), diff --git a/dags/elt/anal_week_broadcast_duration.py b/dags/elt/anal_week_broadcast_duration.py index 991833e..93fa2e2 100644 --- a/dags/elt/anal_week_broadcast_duration.py +++ b/dags/elt/anal_week_broadcast_duration.py @@ -68,7 +68,7 @@ def elt(): start_date=datetime(2024, 1, 1), catchup=False, tags=["ELT", "analytics", "broadcast"], - schedule_interval="0 2 * * *", # 11:00(KST) + schedule_interval="0 23 * * *", # 08:00(KST) default_args={ "retries": 3, "retry_delay": timedelta(minutes=5), diff --git a/dags/elt/anal_week_broadcast_freq.py b/dags/elt/anal_week_broadcast_freq.py index a167acb..6199732 100644 --- a/dags/elt/anal_week_broadcast_freq.py +++ b/dags/elt/anal_week_broadcast_freq.py @@ -69,7 +69,7 @@ def elt(): start_date=datetime(2024, 1, 1), catchup=False, tags=["ELT", "analytics", "broadcast"], - schedule_interval="0 2 * * *", # 11:00(KST) + schedule_interval="0 23 * * *", # 08:00(KST) default_args={ "retries": 3, "retry_delay": timedelta(minutes=5), diff --git a/dags/elt/anal_week_game_viewer.py b/dags/elt/anal_week_game_viewer.py index 956e718..eb1dfdf 100644 --- a/dags/elt/anal_week_game_viewer.py +++ b/dags/elt/anal_week_game_viewer.py @@ -79,7 +79,7 @@ def elt(): start_date=datetime(2024, 1, 1), catchup=False, tags=["ELT", "analytics", "game_viewer"], - schedule_interval="0 0 * * *", # 매일 자정 + schedule_interval="0 23 * * *", # 매일 08시 default_args={ "retries": 3, "retry_delay": timedelta(minutes=5),