diff --git a/dags/elt/anal_broadcast.py b/dags/elt/anal_broadcast.py new file mode 100644 index 0000000..e707f09 --- /dev/null +++ b/dags/elt/anal_broadcast.py @@ -0,0 +1,123 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from datetime import datetime, timedelta +import slack + + +def connect_to_redshift(): + db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id") + conn = db_hook.get_conn() + conn.autocommit = True + + return conn.cursor() + + +@task +def elt(): + # 1. reshift external_raw_data 스키마 + cur = connect_to_redshift() + print("Successfully connected to Redshift") + conn = cur.connection + + try: + # Start a new transaction + conn.autocommit = False + + # analytics.ANAL_YSD_GAME_CCU 테이블의 모든 데이터 삭제 + sql = """ + DELETE FROM analytics.ANAL_BROADCAST; + """ + cur.execute(sql) + print("Successfully deleted all data from analytics.ANAL_BROADCAST") + + # SELECT 쿼리의 결과를 analytics.ANAL_YSD_GAME_CCU 테이블에 삽입 + sql = """ + INSERT INTO analytics.ANAL_BROADCAST(STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, AVG_VIEWER_NUM, BROADCAST_START_TIME, BROADCAST_END_TIME, GAME_DURATION, CREATED_DATE) + WITH ParsedData AS ( + SELECT *, live_collect_time::TIMESTAMPTZ AS parsed_time + FROM external_raw_data.table_name_raw_live_viewer + ), + RankedData AS( + SELECT + *, + LAG(parsed_time, 1) OVER ( + PARTITION BY streamer_id, broadcast_id, game_code + ORDER BY parsed_time + ) AS prev_timestamp + FROM ParsedData + ), + GroupedData AS( + SELECT *, + CASE + WHEN parsed_time - INTERVAL '5' MINUTE > prev_timestamp THEN 1 + ELSE 0 + END AS is_new_group, + COALESCE(NULLIF(game_code, ''), 'talk') AS normalized_game_code + FROM RankedData + ), + GroupIDs AS ( + SELECT *, + SUM(is_new_group) OVER ( + PARTITION BY streamer_id, broadcast_id, normalized_game_code + ORDER BY parsed_time + ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW + ) AS group_id, + normalized_game_code AS n_game_code + FROM GroupedData + ) + SELECT + s_info.streamer_nm AS STREAMER_NM, + g_ids.broadcast_id AS BROADCAST_ID, + COALESCE(g_info.game_nm,g_ids.n_game_code) AS GAME_NM, + g_ids.platform AS PLATFORM, + AVG(g_ids.viewer_num)::integer AS AVG_VIEWER_NUM, + MIN(g_ids.parsed_time) AS start_time, + MAX(g_ids.parsed_time) AS end_time, + (EXTRACT(EPOCH FROM MAX(g_ids.parsed_time)) - EXTRACT(EPOCH FROM MIN(g_ids.parsed_time))) / 60 AS GAME_DURATION, + CURRENT_DATE AS created_date + FROM GroupIDs AS g_ids + LEFT JOIN external_raw_data.streamer_info AS s_info ON s_info.streamer_id = g_ids.streamer_id + LEFT JOIN external_raw_data.game_info AS g_info + ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code + WHERE g_ids.broadcast_id IS NOT NULL + AND g_ids.parsed_time >= (CURRENT_DATE - INTERVAL '7 days' + INTERVAL '6 hours') + AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours') + GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code + ORDER BY STREAMER_NM, BROADCAST_ID, start_time; + """ + cur.execute(sql) + print("Successfully inserted data into analytics.ANAL_BROADCAST") + + # 트랜잭션 commit + conn.commit() + print("Successfully committed the transaction") + + except Exception as e: + # Rollback + print("Error occurred. Start to rollback", e) + conn.rollback() + raise + + finally: + # Close the cursor and connection + cur.close() + conn.close() + print("Connection to Redshift is closed") + + +with DAG( + dag_id="ELT_ANAL_BROADCAST", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["ELT", "analytics", "broadcast"], + schedule_interval="0 22 * * *", # 매일 07시 + default_args={ + "retries": 3, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": slack.on_failure_callback, + }, +) as dag: + + elt() diff --git a/dags/elt/anal_week_broadcast_duration.py b/dags/elt/anal_week_broadcast_duration.py new file mode 100644 index 0000000..93fa2e2 --- /dev/null +++ b/dags/elt/anal_week_broadcast_duration.py @@ -0,0 +1,79 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from datetime import datetime, timedelta +import slack + + +def connect_to_redshift(): + db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id") + conn = db_hook.get_conn() + conn.autocommit = True + + return conn.cursor() + + +@task +def elt(): + # 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기 + cur = connect_to_redshift() + print("Successfully connected to Redshift") + conn = cur.connection + + try: + # Start a new transaction + conn.autocommit = False + + # analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제 + sql = """ + DELETE FROM analytics.anal_week_broadcast_duration; + """ + cur.execute(sql) + print("Successfully deleted all data from analytics.anal_week_broadcast_duration") + + # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 + sql = """ + INSERT INTO analytics.anal_week_broadcast_duration(GAME_NM, STREAMER_NM, BROADCAST_DURATION, CREATED_DATE) + select + game_nm AS GAME_NM, + streamer_nm AS STREAMER_NM, + SUM(game_duration) AS BROADCAST_DURATION, + current_date as created_date + from dev.analytics.anal_broadcast + group by 1,2; + """ + cur.execute(sql) + print("Successfully inserted data into analytics.anal_week_broadcast_duration") + + # 트랜잭션 commit + conn.commit() + print("Successfully committed the transaction") + + except Exception as e: + # Rollback + print("Error occurred. Start to rollback", e) + conn.rollback() + raise + + finally: + # Close the cursor and connection + cur.close() + conn.close() + print("Connection to Redshift is closed") + + +with DAG( + dag_id="ELT_Anal_Week_Broadcast_Duration", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["ELT", "analytics", "broadcast"], + schedule_interval="0 23 * * *", # 08:00(KST) + default_args={ + "retries": 3, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": slack.on_failure_callback, + }, +) as dag: + + elt() diff --git a/dags/elt/anal_week_broadcast_freq.py b/dags/elt/anal_week_broadcast_freq.py new file mode 100644 index 0000000..6199732 --- /dev/null +++ b/dags/elt/anal_week_broadcast_freq.py @@ -0,0 +1,80 @@ +from airflow import DAG +from airflow.decorators import task +from airflow.providers.postgres.hooks.postgres import PostgresHook + +from datetime import datetime, timedelta +import slack + + +def connect_to_redshift(): + db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id") + conn = db_hook.get_conn() + conn.autocommit = True + + return conn.cursor() + + +@task +def elt(): + # 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기 + cur = connect_to_redshift() + print("Successfully connected to Redshift") + conn = cur.connection + + try: + # Start a new transaction + conn.autocommit = False + + # analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제 + sql = """ + DELETE FROM analytics.anal_week_broadcast_freq; + """ + cur.execute(sql) + print("Successfully deleted all data from analytics.anal_week_broadcast_freq") + + # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 + sql = """ + INSERT INTO analytics.anal_week_broadcast_freq(GAME_NM, STREAMER_NM, BROADCAST_FREQ, CREATED_DATE) + select + game_nm AS GAME_NM, + streamer_nm AS STREAMER_NM, + COUNT(distinct broadcast_id) AS BROADCAST_FREQ, + current_date as created_date + from dev.analytics.anal_broadcast + group by 1,2 + order by 4 DESC; + """ + cur.execute(sql) + print("Successfully inserted data into analytics.anal_week_broadcast_freq") + + # 트랜잭션 commit + conn.commit() + print("Successfully committed the transaction") + + except Exception as e: + # Rollback + print("Error occurred. Start to rollback", e) + conn.rollback() + raise + + finally: + # Close the cursor and connection + cur.close() + conn.close() + print("Connection to Redshift is closed") + + +with DAG( + dag_id="ELT_Anal_Week_Broadcast_Freq", + start_date=datetime(2024, 1, 1), + catchup=False, + tags=["ELT", "analytics", "broadcast"], + schedule_interval="0 23 * * *", # 08:00(KST) + default_args={ + "retries": 3, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": slack.on_failure_callback, + }, +) as dag: + + elt() diff --git a/dags/elt/anal_week_game_viewer.py b/dags/elt/anal_week_game_viewer.py index c99786e..eb1dfdf 100644 --- a/dags/elt/anal_week_game_viewer.py +++ b/dags/elt/anal_week_game_viewer.py @@ -79,7 +79,7 @@ def elt(): start_date=datetime(2024, 1, 1), catchup=False, tags=["ELT", "analytics", "game_viewer"], - schedule_interval="0 23 * * *", # 매일 자정 + schedule_interval="0 23 * * *", # 매일 08시 default_args={ "retries": 3, "retry_delay": timedelta(minutes=5), diff --git a/dataCollector/glue&athena/live_viewer_script.py b/dataCollector/glue&athena/live_viewer_script.py new file mode 100644 index 0000000..4bfeb0f --- /dev/null +++ b/dataCollector/glue&athena/live_viewer_script.py @@ -0,0 +1,102 @@ +## live_viewer glue script template + +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from awsglue.context import GlueContext +from awsglue.job import Job +from pyspark.sql.functions import * +from awsglue.dynamicframe import DynamicFrame +from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType + +# SparkContext와 GlueContext 초기화 +sc = SparkContext() +glueContext = GlueContext(sc) +spark = glueContext.spark_session + +# Job 초기화 (Job Bookmark 활성화 포함) +job = Job(glueContext) +args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path']) +job.init(args["JOB_NAME"], args) + +input_path = args['input_path'] +output_path = args['output_path'] + +# S3에서 데이터를 읽어오는 부분 +datasource = glueContext.create_dynamic_frame.from_options( + "s3", + {"paths": [input_path], "recurse": True}, + format="json", + transformation_ctx="datasource", +) + +# 공통 스키마 정의 +common_schema = StructType([ + StructField("STREAMER_ID", StringType(), True), + StructField("BROADCAST_ID", StringType(), True), + StructField("LIVE_COLLECT_TIME", TimestampType(), True), + StructField("BROADCAST_TITLE", StringType(), True), + StructField("GAME_CODE", StringType(), True), + StructField("VIEWER_NUM", IntegerType(), True), + StructField("PLATFORM", StringType(), True), +]) + +# 데이터 가공 +datasource_df = datasource.toDF() +try: + chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) + + chz_filtered_source = chzzk_source.filter(col("col.content.adult") == 'false') + + # chzzk_source.printSchema() + chzzk_df = chz_filtered_source.select( + col("col.streamer_id").alias("STREAMER_ID"), + col("col.content.liveID").alias("BROADCAST_ID"), + col("col.current_time").alias("LIVE_COLLECT_TIME"), + col("col.content.liveTitle").alias("BROADCAST_TITLE"), + col("col.content.liveCategoryValue").alias("GAME_CODE"), + col("col.content.concurrentUserCount").alias("VIEWER_NUM"), + ) + chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) +except: + chzzk_df = spark.createDataFrame([],schema = common_schema) + +try: + afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) + afc_filtered_source = afreeca_source.filter(col("col.broad_info.broad.broad_grade") < 19) + + afreeca_df = afc_filtered_source.select( + col("col.streamer_id").alias("STREAMER_ID"), + col("col.live_status.BNO").alias("BROADCAST_ID"), + col("col.current_time").alias("LIVE_COLLECT_TIME"), + col("col.live_status.TITLE").alias("BROADCAST_TITLE"), + col("col.live_status.CATE").alias("GAME_CODE"), + col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"), + ) + afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca")) +except: + afreeca_df = spark.createDataFrame([], schema=common_schema) + + +result_df = chzzk_df.union(afreeca_df) + +# 스키마 정보를 로깅 +print("Schema Information:") +result_df.printSchema() + +# "PLATFORM" 컬럼을 기준으로 파티션을 구성 +partitioned_df = result_df.repartition("PLATFORM") + +# 파티션된 Spark DataFrame을 DynamicFrame으로 변환 +partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame") + + +# Parquet으로 변환하여 S3에 저장 +glueContext.write_dynamic_frame.from_options( + frame=partitioned_dynamic_frame, + connection_type="s3", + connection_options={"path": output_path}, + format="parquet", + transformation_ctx="datasource", +) diff --git a/dataCollector/glue&athena/live_viewer_template.py b/dataCollector/glue&athena/live_viewer_template.py deleted file mode 100644 index 73b3804..0000000 --- a/dataCollector/glue&athena/live_viewer_template.py +++ /dev/null @@ -1,82 +0,0 @@ -## live_viewer glue script template - -import sys -from awsglue.transforms import * -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext -from awsglue.context import GlueContext -from awsglue.job import Job -from pyspark.sql.functions import * -from awsglue.dynamicframe import DynamicFrame - -# SparkContext와 GlueContext 초기화 -sc = SparkContext() -glueContext = GlueContext(sc) -spark = glueContext.spark_session - -# Job 초기화 (Job Bookmark 활성화 포함) -job = Job(glueContext) -args = getResolvedOptions(sys.argv, ["JOB_NAME"]) -job.init(args["JOB_NAME"], args) - -# S3에서 데이터를 읽어오는 부분 -datasource = glueContext.create_dynamic_frame.from_options( - "s3", - {"paths": ["{{ input_path }}"], "recurse": True}, - format="json", - transformation_ctx="datasource", -) -# 데이터 가공 -datasource_df = datasource.toDF() - -chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) -afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) - -# chzzk_source.printSchema() -chzzk_df = chzzk_source.select( - col("col.streamer_id").alias("STREAMER_ID"), - col("col.content.liveID").alias("BROADCAST_ID"), - col("col.current_time").alias("LIVE_COLLECT_TIME"), - col("col.content.liveTitle").alias("BROADCAST_TITLE"), - col("col.content.liveCategoryValue").alias("GAME_CODE"), - col("col.content.concurrentUserCount").alias("VIEWER_NUM"), -) -# add platform -chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) -# chzzk_df.show() - -# afreeca_source.printSchema() -afreeca_df = afreeca_source.select( - col("col.streamer_id").alias("STREAMER_ID"), - col("col.live_status.BNO").alias("BROADCAST_ID"), - col("col.current_time").alias("LIVE_COLLECT_TIME"), - col("col.live_status.TITLE").alias("BROADCAST_TITLE"), - col("col.live_status.CATE").alias("GAME_CODE"), - col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"), -) -afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca")) -# afreeca_df.show(truncate=False) - -result_df = chzzk_df.union(afreeca_df) - -# 스키마 정보를 로깅 -print("Schema Information:") -result_df.printSchema() - -# "PLATFORM" 컬럼을 기준으로 파티션을 구성 -partitioned_df = result_df.repartition("PLATFORM") - -# 파티션된 Spark DataFrame을 DynamicFrame으로 변환 -partitioned_dynamic_frame = DynamicFrame.fromDF( - partitioned_df, glueContext, "partitioned_dynamic_frame" -) - - -# Parquet으로 변환하여 S3에 저장 -glueContext.write_dynamic_frame.from_options( - frame=partitioned_dynamic_frame, - connection_type="s3", - connection_options={"path": "{{ output_path }}"}, - format="parquet", - transformation_ctx="datasource", -)