From 52c31371272c7bd090ae3ffbe30adf0e8104d71d Mon Sep 17 00:00:00 2001 From: lyh Date: Tue, 5 Mar 2024 15:15:13 +0900 Subject: [PATCH 1/3] [fix] fix anal_week_game_viewer --- dags/elt/anal_broadcast.py | 6 +-- dags/elt/anal_week_broadcast_duration.py | 6 ++- dags/elt/anal_week_broadcast_freq.py | 2 +- dags/elt/anal_week_game_viewer.py | 8 +-- dags/elt/anal_ysd_game_ccu.py | 2 +- dags/elt/anal_ysd_game_rating.py | 2 +- dags/game/game_price_to_s3.py | 2 +- dags/game/game_rating_to_s3.py | 2 +- dags/glue/live_viewer.py | 3 +- .../glue&athena/live_viewer_script.py | 50 ++++++++++++------- 10 files changed, 51 insertions(+), 32 deletions(-) diff --git a/dags/elt/anal_broadcast.py b/dags/elt/anal_broadcast.py index e707f09..3b66490 100644 --- a/dags/elt/anal_broadcast.py +++ b/dags/elt/anal_broadcast.py @@ -40,7 +40,7 @@ def elt(): FROM external_raw_data.table_name_raw_live_viewer ), RankedData AS( - SELECT + SELECT *, LAG(parsed_time, 1) OVER ( PARTITION BY streamer_id, broadcast_id, game_code @@ -83,9 +83,9 @@ def elt(): ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code WHERE g_ids.broadcast_id IS NOT NULL AND g_ids.parsed_time >= (CURRENT_DATE - INTERVAL '7 days' + INTERVAL '6 hours') - AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours') + AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours') GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code - ORDER BY STREAMER_NM, BROADCAST_ID, start_time; + ORDER BY STREAMER_NM, BROADCAST_ID, start_time; """ cur.execute(sql) print("Successfully inserted data into analytics.ANAL_BROADCAST") diff --git a/dags/elt/anal_week_broadcast_duration.py b/dags/elt/anal_week_broadcast_duration.py index 93fa2e2..76edda4 100644 --- a/dags/elt/anal_week_broadcast_duration.py +++ b/dags/elt/anal_week_broadcast_duration.py @@ -30,12 +30,14 @@ def elt(): DELETE FROM analytics.anal_week_broadcast_duration; """ cur.execute(sql) - print("Successfully deleted all data from analytics.anal_week_broadcast_duration") + print( + "Successfully deleted all data from analytics.anal_week_broadcast_duration" + ) # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 sql = """ INSERT INTO analytics.anal_week_broadcast_duration(GAME_NM, STREAMER_NM, BROADCAST_DURATION, CREATED_DATE) - select + select game_nm AS GAME_NM, streamer_nm AS STREAMER_NM, SUM(game_duration) AS BROADCAST_DURATION, diff --git a/dags/elt/anal_week_broadcast_freq.py b/dags/elt/anal_week_broadcast_freq.py index 6199732..9dce0df 100644 --- a/dags/elt/anal_week_broadcast_freq.py +++ b/dags/elt/anal_week_broadcast_freq.py @@ -35,7 +35,7 @@ def elt(): # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 sql = """ INSERT INTO analytics.anal_week_broadcast_freq(GAME_NM, STREAMER_NM, BROADCAST_FREQ, CREATED_DATE) - select + select game_nm AS GAME_NM, streamer_nm AS STREAMER_NM, COUNT(distinct broadcast_id) AS BROADCAST_FREQ, diff --git a/dags/elt/anal_week_game_viewer.py b/dags/elt/anal_week_game_viewer.py index eb1dfdf..a0bc67d 100644 --- a/dags/elt/anal_week_game_viewer.py +++ b/dags/elt/anal_week_game_viewer.py @@ -35,19 +35,21 @@ def elt(): # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 sql = """ INSERT INTO analytics.anal_week_game_viewer - SELECT GAME_NM, CAST(HOUR AS INTEGER), si.streamer_nm as STREAMER_NM, AVG(viewer_num) AS VIEWER_AVG, TO_DATE(TO_CHAR(live_collect_time::DATE, 'YYYY-MM-DD'), 'YYYY-MM-DD') AS CREATED_DATE + SELECT GAME_NM, HOUR, si.streamer_nm as STREAMER_NM, AVG(viewer_num) AS VIEWER_AVG, TO_CHAR(live_collect_time::DATE, 'YYYY-MM-DD') AS CREATED_DATE FROM ( - SELECT gi.game_nm ,rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id + SELECT gi.game_nm ,rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, rlv.hour, rlv.streamer_id FROM external_raw_data.table_name_raw_live_viewer AS rlv INNER JOIN external_raw_data.game_info AS gi ON rlv.game_code = gi.chz_game_code + WHERE gi.chz_game_code <> '' UNION - SELECT gi.game_nm, rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id + SELECT gi.game_nm, rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, rlv.hour, rlv.streamer_id FROM external_raw_data.table_name_raw_live_viewer AS rlv INNER JOIN external_raw_data.game_info AS gi ON rlv.game_code = LPAD(gi.afc_game_code, LENGTH(gi.afc_game_code) + 3, '0') + WHERE gi.afc_game_code <> '' ) AS subquery INNER JOIN external_raw_data.streamer_info AS si ON subquery.streamer_id = si.streamer_id diff --git a/dags/elt/anal_ysd_game_ccu.py b/dags/elt/anal_ysd_game_ccu.py index 5a11178..990a1d1 100644 --- a/dags/elt/anal_ysd_game_ccu.py +++ b/dags/elt/anal_ysd_game_ccu.py @@ -41,7 +41,7 @@ def elt(): current_date AS CREATED_DATE FROM (SELECT GAME_ID, GAME_CCU FROM external_raw_data.table_name_raw_game_ccu - WHERE CAST(collect_time AS timestamp) + WHERE CAST(collect_time AS timestamp) BETWEEN GETDATE() - INTERVAL '1 day' + INTERVAL '6 hours' AND GETDATE() + INTERVAL '6 hours') a JOIN external_raw_data.game_info b diff --git a/dags/elt/anal_ysd_game_rating.py b/dags/elt/anal_ysd_game_rating.py index 50cd80d..a86c949 100644 --- a/dags/elt/anal_ysd_game_rating.py +++ b/dags/elt/anal_ysd_game_rating.py @@ -35,7 +35,7 @@ def elt(): # SELECT 쿼리의 결과를 analytics.ANAL_YSD_GAME_CCU 테이블에 삽입 sql = """ INSERT INTO analytics.ANAL_YSD_GAME_RATING(GAME_NM, POSITIVE_PERCENT, CREATED_DATE) - SELECT + SELECT b.game_nm AS GAME_NM, (a.all_positive_percent)*0.05 AS POSITIVE_PERCENT, current_date AS CREATED_DATE diff --git a/dags/game/game_price_to_s3.py b/dags/game/game_price_to_s3.py index 01e1c41..635c7e7 100644 --- a/dags/game/game_price_to_s3.py +++ b/dags/game/game_price_to_s3.py @@ -100,7 +100,7 @@ def save_to_json(data): start_date=datetime(2024, 1, 1), catchup=False, tags=["Steam_API"], - schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분 + schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분 default_args={ "retries": 3, "retry_delay": timedelta(seconds=15), diff --git a/dags/game/game_rating_to_s3.py b/dags/game/game_rating_to_s3.py index 04ecd01..803e77a 100644 --- a/dags/game/game_rating_to_s3.py +++ b/dags/game/game_rating_to_s3.py @@ -126,7 +126,7 @@ def save_to_json(data): start_date=datetime(2024, 1, 1), catchup=False, tags=["Steam_API"], - schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분 + schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분 default_args={ "retries": 3, "retry_delay": timedelta(minutes=1), diff --git a/dags/glue/live_viewer.py b/dags/glue/live_viewer.py index a966cb9..c15b095 100644 --- a/dags/glue/live_viewer.py +++ b/dags/glue/live_viewer.py @@ -4,7 +4,6 @@ from airflow import DAG from airflow.models import Variable -from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -12,9 +11,11 @@ from jinja2 import Template + def sleep_for_60_seconds(): time.sleep(60) + def upload_rendered_script_to_s3( bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs ): diff --git a/dataCollector/glue&athena/live_viewer_script.py b/dataCollector/glue&athena/live_viewer_script.py index 4bfeb0f..a808ca2 100644 --- a/dataCollector/glue&athena/live_viewer_script.py +++ b/dataCollector/glue&athena/live_viewer_script.py @@ -8,7 +8,13 @@ from awsglue.job import Job from pyspark.sql.functions import * from awsglue.dynamicframe import DynamicFrame -from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType +from pyspark.sql.types import ( + StructType, + StructField, + StringType, + IntegerType, + TimestampType, +) # SparkContext와 GlueContext 초기화 sc = SparkContext() @@ -17,11 +23,11 @@ # Job 초기화 (Job Bookmark 활성화 포함) job = Job(glueContext) -args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path']) +args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_path", "output_path"]) job.init(args["JOB_NAME"], args) -input_path = args['input_path'] -output_path = args['output_path'] +input_path = args["input_path"] +output_path = args["output_path"] # S3에서 데이터를 읽어오는 부분 datasource = glueContext.create_dynamic_frame.from_options( @@ -32,22 +38,24 @@ ) # 공통 스키마 정의 -common_schema = StructType([ - StructField("STREAMER_ID", StringType(), True), - StructField("BROADCAST_ID", StringType(), True), - StructField("LIVE_COLLECT_TIME", TimestampType(), True), - StructField("BROADCAST_TITLE", StringType(), True), - StructField("GAME_CODE", StringType(), True), - StructField("VIEWER_NUM", IntegerType(), True), - StructField("PLATFORM", StringType(), True), -]) +common_schema = StructType( + [ + StructField("STREAMER_ID", StringType(), True), + StructField("BROADCAST_ID", StringType(), True), + StructField("LIVE_COLLECT_TIME", TimestampType(), True), + StructField("BROADCAST_TITLE", StringType(), True), + StructField("GAME_CODE", StringType(), True), + StructField("VIEWER_NUM", IntegerType(), True), + StructField("PLATFORM", StringType(), True), + ] +) # 데이터 가공 datasource_df = datasource.toDF() try: chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk")) - chz_filtered_source = chzzk_source.filter(col("col.content.adult") == 'false') + chz_filtered_source = chzzk_source.filter(col("col.content.adult") == "false") # chzzk_source.printSchema() chzzk_df = chz_filtered_source.select( @@ -60,11 +68,15 @@ ) chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk")) except: - chzzk_df = spark.createDataFrame([],schema = common_schema) + chzzk_df = spark.createDataFrame([], schema=common_schema) try: - afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca")) - afc_filtered_source = afreeca_source.filter(col("col.broad_info.broad.broad_grade") < 19) + afreeca_source = datasource_df.select("stream_data.afreeca").select( + explode("afreeca") + ) + afc_filtered_source = afreeca_source.filter( + col("col.broad_info.broad.broad_grade") < 19 + ) afreeca_df = afc_filtered_source.select( col("col.streamer_id").alias("STREAMER_ID"), @@ -89,7 +101,9 @@ partitioned_df = result_df.repartition("PLATFORM") # 파티션된 Spark DataFrame을 DynamicFrame으로 변환 -partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame") +partitioned_dynamic_frame = DynamicFrame.fromDF( + partitioned_df, glueContext, "partitioned_dynamic_frame" +) # Parquet으로 변환하여 S3에 저장 From fd50ca31b2206c39a8307a18360f15838b5af227 Mon Sep 17 00:00:00 2001 From: lyh Date: Tue, 5 Mar 2024 15:22:13 +0900 Subject: [PATCH 2/3] [fix] cast for column type --- dags/elt/anal_week_game_viewer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/elt/anal_week_game_viewer.py b/dags/elt/anal_week_game_viewer.py index a0bc67d..787dfbb 100644 --- a/dags/elt/anal_week_game_viewer.py +++ b/dags/elt/anal_week_game_viewer.py @@ -35,7 +35,7 @@ def elt(): # SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입 sql = """ INSERT INTO analytics.anal_week_game_viewer - SELECT GAME_NM, HOUR, si.streamer_nm as STREAMER_NM, AVG(viewer_num) AS VIEWER_AVG, TO_CHAR(live_collect_time::DATE, 'YYYY-MM-DD') AS CREATED_DATE + SELECT GAME_NM, CAST(HOUR AS INTEGER), si.streamer_nm as STREAMER_NM, AVG(viewer_num) AS VIEWER_AVG, TO_DATE(TO_CHAR(live_collect_time::DATE, 'YYYY-MM-DD'), 'YYYY-MM-DD') AS CREATED_DATE FROM ( SELECT gi.game_nm ,rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, rlv.hour, rlv.streamer_id FROM external_raw_data.table_name_raw_live_viewer AS rlv From 69e3f83e51a6816b8b9fbc33607c405fc9bc4665 Mon Sep 17 00:00:00 2001 From: lyh Date: Tue, 5 Mar 2024 15:28:19 +0900 Subject: [PATCH 3/3] [fix] fix DAG anal_week_game_viewer --- dags/elt/anal_week_game_viewer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dags/elt/anal_week_game_viewer.py b/dags/elt/anal_week_game_viewer.py index 787dfbb..15cec25 100644 --- a/dags/elt/anal_week_game_viewer.py +++ b/dags/elt/anal_week_game_viewer.py @@ -37,7 +37,7 @@ def elt(): INSERT INTO analytics.anal_week_game_viewer SELECT GAME_NM, CAST(HOUR AS INTEGER), si.streamer_nm as STREAMER_NM, AVG(viewer_num) AS VIEWER_AVG, TO_DATE(TO_CHAR(live_collect_time::DATE, 'YYYY-MM-DD'), 'YYYY-MM-DD') AS CREATED_DATE FROM ( - SELECT gi.game_nm ,rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, rlv.hour, rlv.streamer_id + SELECT gi.game_nm ,rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id FROM external_raw_data.table_name_raw_live_viewer AS rlv INNER JOIN external_raw_data.game_info AS gi ON rlv.game_code = gi.chz_game_code @@ -45,7 +45,7 @@ def elt(): UNION - SELECT gi.game_nm, rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, rlv.hour, rlv.streamer_id + SELECT gi.game_nm, rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id FROM external_raw_data.table_name_raw_live_viewer AS rlv INNER JOIN external_raw_data.game_info AS gi ON rlv.game_code = LPAD(gi.afc_game_code, LENGTH(gi.afc_game_code) + 3, '0')