Skip to content

Commit

Permalink
Merge pull request #132 from zizzic/develop_to_fix_anal_week_game_vie…
Browse files Browse the repository at this point in the history
…wer_by_YOUNG_HO

[fix] fix anal week game viewer
  • Loading branch information
mediwind authored Mar 5, 2024
2 parents ae99443 + 69e3f83 commit 70f2f8f
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 29 deletions.
6 changes: 3 additions & 3 deletions dags/elt/anal_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def elt():
FROM external_raw_data.table_name_raw_live_viewer
),
RankedData AS(
SELECT
SELECT
*,
LAG(parsed_time, 1) OVER (
PARTITION BY streamer_id, broadcast_id, game_code
Expand Down Expand Up @@ -83,9 +83,9 @@ def elt():
ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code
WHERE g_ids.broadcast_id IS NOT NULL
AND g_ids.parsed_time >= (CURRENT_DATE - INTERVAL '7 days' + INTERVAL '6 hours')
AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours')
AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours')
GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code
ORDER BY STREAMER_NM, BROADCAST_ID, start_time;
ORDER BY STREAMER_NM, BROADCAST_ID, start_time;
"""
cur.execute(sql)
print("Successfully inserted data into analytics.ANAL_BROADCAST")
Expand Down
6 changes: 4 additions & 2 deletions dags/elt/anal_week_broadcast_duration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ def elt():
DELETE FROM analytics.anal_week_broadcast_duration;
"""
cur.execute(sql)
print("Successfully deleted all data from analytics.anal_week_broadcast_duration")
print(
"Successfully deleted all data from analytics.anal_week_broadcast_duration"
)

# SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입
sql = """
INSERT INTO analytics.anal_week_broadcast_duration(GAME_NM, STREAMER_NM, BROADCAST_DURATION, CREATED_DATE)
select
select
game_nm AS GAME_NM,
streamer_nm AS STREAMER_NM,
SUM(game_duration) AS BROADCAST_DURATION,
Expand Down
2 changes: 1 addition & 1 deletion dags/elt/anal_week_broadcast_freq.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def elt():
# SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입
sql = """
INSERT INTO analytics.anal_week_broadcast_freq(GAME_NM, STREAMER_NM, BROADCAST_FREQ, CREATED_DATE)
select
select
game_nm AS GAME_NM,
streamer_nm AS STREAMER_NM,
COUNT(distinct broadcast_id) AS BROADCAST_FREQ,
Expand Down
2 changes: 2 additions & 0 deletions dags/elt/anal_week_game_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ def elt():
FROM external_raw_data.table_name_raw_live_viewer AS rlv
INNER JOIN external_raw_data.game_info AS gi
ON rlv.game_code = gi.chz_game_code
WHERE gi.chz_game_code <> ''
UNION
SELECT gi.game_nm, rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id
FROM external_raw_data.table_name_raw_live_viewer AS rlv
INNER JOIN external_raw_data.game_info AS gi
ON rlv.game_code = LPAD(gi.afc_game_code, LENGTH(gi.afc_game_code) + 3, '0')
WHERE gi.afc_game_code <> ''
) AS subquery
INNER JOIN external_raw_data.streamer_info AS si
ON subquery.streamer_id = si.streamer_id
Expand Down
2 changes: 1 addition & 1 deletion dags/elt/anal_ysd_game_ccu.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def elt():
current_date AS CREATED_DATE
FROM (SELECT GAME_ID, GAME_CCU
FROM external_raw_data.table_name_raw_game_ccu
WHERE CAST(collect_time AS timestamp)
WHERE CAST(collect_time AS timestamp)
BETWEEN GETDATE() - INTERVAL '1 day' + INTERVAL '6 hours'
AND GETDATE() + INTERVAL '6 hours') a
JOIN external_raw_data.game_info b
Expand Down
2 changes: 1 addition & 1 deletion dags/elt/anal_ysd_game_rating.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def elt():
# SELECT 쿼리의 결과를 analytics.ANAL_YSD_GAME_CCU 테이블에 삽입
sql = """
INSERT INTO analytics.ANAL_YSD_GAME_RATING(GAME_NM, POSITIVE_PERCENT, CREATED_DATE)
SELECT
SELECT
b.game_nm AS GAME_NM,
(a.all_positive_percent)*0.05 AS POSITIVE_PERCENT,
current_date AS CREATED_DATE
Expand Down
2 changes: 1 addition & 1 deletion dags/game/game_price_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def save_to_json(data):
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["Steam_API"],
schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분
schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분
default_args={
"retries": 3,
"retry_delay": timedelta(seconds=15),
Expand Down
2 changes: 1 addition & 1 deletion dags/game/game_rating_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def save_to_json(data):
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["Steam_API"],
schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분
schedule_interval="10 15 * * *", # 한국시간 새벽 00시 10분
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=1),
Expand Down
3 changes: 2 additions & 1 deletion dags/glue/live_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
from airflow import DAG
from airflow.models import Variable

from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor

from jinja2 import Template


def sleep_for_60_seconds():
time.sleep(60)


def upload_rendered_script_to_s3(
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
):
Expand Down
50 changes: 32 additions & 18 deletions dataCollector/glue&athena/live_viewer_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,13 @@
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.types import (
StructType,
StructField,
StringType,
IntegerType,
TimestampType,
)

# SparkContext와 GlueContext 초기화
sc = SparkContext()
Expand All @@ -17,11 +23,11 @@

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path'])
args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_path", "output_path"])
job.init(args["JOB_NAME"], args)

input_path = args['input_path']
output_path = args['output_path']
input_path = args["input_path"]
output_path = args["output_path"]

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
Expand All @@ -32,22 +38,24 @@
)

# 공통 스키마 정의
common_schema = StructType([
StructField("STREAMER_ID", StringType(), True),
StructField("BROADCAST_ID", StringType(), True),
StructField("LIVE_COLLECT_TIME", TimestampType(), True),
StructField("BROADCAST_TITLE", StringType(), True),
StructField("GAME_CODE", StringType(), True),
StructField("VIEWER_NUM", IntegerType(), True),
StructField("PLATFORM", StringType(), True),
])
common_schema = StructType(
[
StructField("STREAMER_ID", StringType(), True),
StructField("BROADCAST_ID", StringType(), True),
StructField("LIVE_COLLECT_TIME", TimestampType(), True),
StructField("BROADCAST_TITLE", StringType(), True),
StructField("GAME_CODE", StringType(), True),
StructField("VIEWER_NUM", IntegerType(), True),
StructField("PLATFORM", StringType(), True),
]
)

# 데이터 가공
datasource_df = datasource.toDF()
try:
chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk"))

chz_filtered_source = chzzk_source.filter(col("col.content.adult") == 'false')
chz_filtered_source = chzzk_source.filter(col("col.content.adult") == "false")

# chzzk_source.printSchema()
chzzk_df = chz_filtered_source.select(
Expand All @@ -60,11 +68,15 @@
)
chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk"))
except:
chzzk_df = spark.createDataFrame([],schema = common_schema)
chzzk_df = spark.createDataFrame([], schema=common_schema)

try:
afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca"))
afc_filtered_source = afreeca_source.filter(col("col.broad_info.broad.broad_grade") < 19)
afreeca_source = datasource_df.select("stream_data.afreeca").select(
explode("afreeca")
)
afc_filtered_source = afreeca_source.filter(
col("col.broad_info.broad.broad_grade") < 19
)

afreeca_df = afc_filtered_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
Expand All @@ -89,7 +101,9 @@
partitioned_df = result_df.repartition("PLATFORM")

# 파티션된 Spark DataFrame을 DynamicFrame으로 변환
partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame")
partitioned_dynamic_frame = DynamicFrame.fromDF(
partitioned_df, glueContext, "partitioned_dynamic_frame"
)


# Parquet으로 변환하여 S3에 저장
Expand Down

0 comments on commit 70f2f8f

Please sign in to comment.