Skip to content

Commit

Permalink
[feat] live_viewer template change
Browse files Browse the repository at this point in the history
  • Loading branch information
poriz committed Feb 26, 2024
1 parent 75a39bc commit a4ad481
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 22 deletions.
13 changes: 7 additions & 6 deletions dags/glue/live_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,21 @@ def upload_rendered_script_to_s3(
default_args={
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 1, 17),
"start_date": datetime(2024, 2, 26),
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
tags=["glue", "streaming"],
schedule_interval="0 * * * *",
catchup=False,
catchup=True,
) as dag:

bucket_name = "de-2-1-bucket"
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
year = "{{ data_interval_end.in_timezone('Asia/Seoul').year }}"
month = "{{ data_interval_end.in_timezone('Asia/Seoul').month }}"
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"

current_time = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
year = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').year }}"
month = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').month }}"
day = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').day }}"
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour

upload_script = PythonOperator(
Expand Down
4 changes: 3 additions & 1 deletion dataCollector/glue&athena/glue_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
partitioned_df = result_df.repartition("PLATFORM")

# 파티션된 Spark DataFrame을 DynamicFrame으로 변환
partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame")
partitioned_dynamic_frame = DynamicFrame.fromDF(
partitioned_df, glueContext, "partitioned_dynamic_frame"
)


# Parquet으로 변환하여 S3에 저장
Expand Down
22 changes: 7 additions & 15 deletions dataCollector/glue&athena/live_viewer_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
chzzk_df = chzzk_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
col("col.content.liveID").alias("BROADCAST_ID"),
col("col.current_time").alias("LIVE_COLLECT_TIME"),
col("col.content.liveTitle").alias("BROADCAST_TITLE"),
col("col.content.liveCategoryValue").alias("GAME_CODE"),
col("col.content.concurrentUserCount").alias("VIEWER_NUM"),
Expand All @@ -48,26 +49,15 @@
afreeca_df = afreeca_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
col("col.live_status.BNO").alias("BROADCAST_ID"),
col("col.current_time").alias("LIVE_COLLECT_TIME"),
col("col.live_status.TITLE").alias("BROADCAST_TITLE"),
col("col.live_status.CATE").alias("GAME_CODE"),
col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"),
)
afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca"))
# afreeca_df.show(truncate=False)


result_df = chzzk_df.join(
afreeca_df,
[
"STREAMER_ID",
"BROADCAST_ID",
"BROADCAST_TITLE",
"GAME_CODE",
"PLATFORM",
"VIEWER_NUM",
],
"outer",
)
result_df = chzzk_df.union(afreeca_df)

# 스키마 정보를 로깅
print("Schema Information:")
Expand All @@ -77,7 +67,9 @@
partitioned_df = result_df.repartition("PLATFORM")

# 파티션된 Spark DataFrame을 DynamicFrame으로 변환
partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame")
partitioned_dynamic_frame = DynamicFrame.fromDF(
partitioned_df, glueContext, "partitioned_dynamic_frame"
)


# Parquet으로 변환하여 S3에 저장
Expand All @@ -87,4 +79,4 @@
connection_options={"path": "{{ output_path }}"},
format="parquet",
transformation_ctx="datasource",
)
)

0 comments on commit a4ad481

Please sign in to comment.