Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/anal broadcast #130

Merged
merged 6 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions dags/elt/anal_broadcast.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime, timedelta
import slack


def connect_to_redshift():
db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id")
conn = db_hook.get_conn()
conn.autocommit = True

return conn.cursor()


@task
def elt():
# 1. reshift external_raw_data 스키마
cur = connect_to_redshift()
print("Successfully connected to Redshift")
conn = cur.connection

try:
# Start a new transaction
conn.autocommit = False

# analytics.ANAL_YSD_GAME_CCU 테이블의 모든 데이터 삭제
sql = """
DELETE FROM analytics.ANAL_BROADCAST;
"""
cur.execute(sql)
print("Successfully deleted all data from analytics.ANAL_BROADCAST")

# SELECT 쿼리의 결과를 analytics.ANAL_YSD_GAME_CCU 테이블에 삽입
sql = """
INSERT INTO analytics.ANAL_BROADCAST(STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, AVG_VIEWER_NUM, BROADCAST_START_TIME, BROADCAST_END_TIME, GAME_DURATION, CREATED_DATE)
WITH ParsedData AS (
SELECT *, live_collect_time::TIMESTAMPTZ AS parsed_time
FROM external_raw_data.table_name_raw_live_viewer
),
RankedData AS(
SELECT
*,
LAG(parsed_time, 1) OVER (
PARTITION BY streamer_id, broadcast_id, game_code
ORDER BY parsed_time
) AS prev_timestamp
FROM ParsedData
),
GroupedData AS(
SELECT *,
CASE
WHEN parsed_time - INTERVAL '5' MINUTE > prev_timestamp THEN 1
ELSE 0
END AS is_new_group,
COALESCE(NULLIF(game_code, ''), 'talk') AS normalized_game_code
FROM RankedData
),
GroupIDs AS (
SELECT *,
SUM(is_new_group) OVER (
PARTITION BY streamer_id, broadcast_id, normalized_game_code
ORDER BY parsed_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS group_id,
normalized_game_code AS n_game_code
FROM GroupedData
)
SELECT
s_info.streamer_nm AS STREAMER_NM,
g_ids.broadcast_id AS BROADCAST_ID,
COALESCE(g_info.game_nm,g_ids.n_game_code) AS GAME_NM,
g_ids.platform AS PLATFORM,
AVG(g_ids.viewer_num)::integer AS AVG_VIEWER_NUM,
MIN(g_ids.parsed_time) AS start_time,
MAX(g_ids.parsed_time) AS end_time,
(EXTRACT(EPOCH FROM MAX(g_ids.parsed_time)) - EXTRACT(EPOCH FROM MIN(g_ids.parsed_time))) / 60 AS GAME_DURATION,
CURRENT_DATE AS created_date
FROM GroupIDs AS g_ids
LEFT JOIN external_raw_data.streamer_info AS s_info ON s_info.streamer_id = g_ids.streamer_id
LEFT JOIN external_raw_data.game_info AS g_info
ON g_ids.n_game_code = g_info.chz_game_code OR LTRIM(g_ids.n_game_code, '0') = g_info.afc_game_code
WHERE g_ids.broadcast_id IS NOT NULL
AND g_ids.parsed_time >= (CURRENT_DATE - INTERVAL '7 days' + INTERVAL '6 hours')
AND g_ids.parsed_time < (CURRENT_DATE + INTERVAL '6 hours')
GROUP BY STREAMER_NM, BROADCAST_ID, GAME_NM, PLATFORM, g_ids.group_id, g_ids.n_game_code
ORDER BY STREAMER_NM, BROADCAST_ID, start_time;
"""
cur.execute(sql)
print("Successfully inserted data into analytics.ANAL_BROADCAST")

# 트랜잭션 commit
conn.commit()
print("Successfully committed the transaction")

except Exception as e:
# Rollback
print("Error occurred. Start to rollback", e)
conn.rollback()
raise

finally:
# Close the cursor and connection
cur.close()
conn.close()
print("Connection to Redshift is closed")


with DAG(
dag_id="ELT_ANAL_BROADCAST",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ELT", "analytics", "broadcast"],
schedule_interval="0 22 * * *", # 매일 07시
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": slack.on_failure_callback,
},
) as dag:

elt()
79 changes: 79 additions & 0 deletions dags/elt/anal_week_broadcast_duration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime, timedelta
import slack


def connect_to_redshift():
db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id")
conn = db_hook.get_conn()
conn.autocommit = True

return conn.cursor()


@task
def elt():
# 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기
cur = connect_to_redshift()
print("Successfully connected to Redshift")
conn = cur.connection

try:
# Start a new transaction
conn.autocommit = False

# analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제
sql = """
DELETE FROM analytics.anal_week_broadcast_duration;
"""
cur.execute(sql)
print("Successfully deleted all data from analytics.anal_week_broadcast_duration")

# SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입
sql = """
INSERT INTO analytics.anal_week_broadcast_duration(GAME_NM, STREAMER_NM, BROADCAST_DURATION, CREATED_DATE)
select
game_nm AS GAME_NM,
streamer_nm AS STREAMER_NM,
SUM(game_duration) AS BROADCAST_DURATION,
current_date as created_date
from dev.analytics.anal_broadcast
group by 1,2;
"""
cur.execute(sql)
print("Successfully inserted data into analytics.anal_week_broadcast_duration")

# 트랜잭션 commit
conn.commit()
print("Successfully committed the transaction")

except Exception as e:
# Rollback
print("Error occurred. Start to rollback", e)
conn.rollback()
raise

finally:
# Close the cursor and connection
cur.close()
conn.close()
print("Connection to Redshift is closed")


with DAG(
dag_id="ELT_Anal_Week_Broadcast_Duration",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ELT", "analytics", "broadcast"],
schedule_interval="0 23 * * *", # 08:00(KST)
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": slack.on_failure_callback,
},
) as dag:

elt()
80 changes: 80 additions & 0 deletions dags/elt/anal_week_broadcast_freq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from airflow import DAG
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook

from datetime import datetime, timedelta
import slack


def connect_to_redshift():
db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id")
conn = db_hook.get_conn()
conn.autocommit = True

return conn.cursor()


@task
def elt():
# 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기
cur = connect_to_redshift()
print("Successfully connected to Redshift")
conn = cur.connection

try:
# Start a new transaction
conn.autocommit = False

# analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제
sql = """
DELETE FROM analytics.anal_week_broadcast_freq;
"""
cur.execute(sql)
print("Successfully deleted all data from analytics.anal_week_broadcast_freq")

# SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입
sql = """
INSERT INTO analytics.anal_week_broadcast_freq(GAME_NM, STREAMER_NM, BROADCAST_FREQ, CREATED_DATE)
select
game_nm AS GAME_NM,
streamer_nm AS STREAMER_NM,
COUNT(distinct broadcast_id) AS BROADCAST_FREQ,
current_date as created_date
from dev.analytics.anal_broadcast
group by 1,2
order by 4 DESC;
"""
cur.execute(sql)
print("Successfully inserted data into analytics.anal_week_broadcast_freq")

# 트랜잭션 commit
conn.commit()
print("Successfully committed the transaction")

except Exception as e:
# Rollback
print("Error occurred. Start to rollback", e)
conn.rollback()
raise

finally:
# Close the cursor and connection
cur.close()
conn.close()
print("Connection to Redshift is closed")


with DAG(
dag_id="ELT_Anal_Week_Broadcast_Freq",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ELT", "analytics", "broadcast"],
schedule_interval="0 23 * * *", # 08:00(KST)
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": slack.on_failure_callback,
},
) as dag:

elt()
2 changes: 1 addition & 1 deletion dags/elt/anal_week_game_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def elt():
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["ELT", "analytics", "game_viewer"],
schedule_interval="0 23 * * *", # 매일 자정
schedule_interval="0 23 * * *", # 매일 08시
default_args={
"retries": 3,
"retry_delay": timedelta(minutes=5),
Expand Down
102 changes: 102 additions & 0 deletions dataCollector/glue&athena/live_viewer_script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
## live_viewer glue script template

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# SparkContext와 GlueContext 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'input_path', 'output_path'])
job.init(args["JOB_NAME"], args)

input_path = args['input_path']
output_path = args['output_path']

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": [input_path], "recurse": True},
format="json",
transformation_ctx="datasource",
)

# 공통 스키마 정의
common_schema = StructType([
StructField("STREAMER_ID", StringType(), True),
StructField("BROADCAST_ID", StringType(), True),
StructField("LIVE_COLLECT_TIME", TimestampType(), True),
StructField("BROADCAST_TITLE", StringType(), True),
StructField("GAME_CODE", StringType(), True),
StructField("VIEWER_NUM", IntegerType(), True),
StructField("PLATFORM", StringType(), True),
])

# 데이터 가공
datasource_df = datasource.toDF()
try:
chzzk_source = datasource_df.select("stream_data.chzzk").select(explode("chzzk"))

chz_filtered_source = chzzk_source.filter(col("col.content.adult") == 'false')

# chzzk_source.printSchema()
chzzk_df = chz_filtered_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
col("col.content.liveID").alias("BROADCAST_ID"),
col("col.current_time").alias("LIVE_COLLECT_TIME"),
col("col.content.liveTitle").alias("BROADCAST_TITLE"),
col("col.content.liveCategoryValue").alias("GAME_CODE"),
col("col.content.concurrentUserCount").alias("VIEWER_NUM"),
)
chzzk_df = chzzk_df.withColumn("PLATFORM", lit("chzzk"))
except:
chzzk_df = spark.createDataFrame([],schema = common_schema)

try:
afreeca_source = datasource_df.select("stream_data.afreeca").select(explode("afreeca"))
afc_filtered_source = afreeca_source.filter(col("col.broad_info.broad.broad_grade") < 19)

afreeca_df = afc_filtered_source.select(
col("col.streamer_id").alias("STREAMER_ID"),
col("col.live_status.BNO").alias("BROADCAST_ID"),
col("col.current_time").alias("LIVE_COLLECT_TIME"),
col("col.live_status.TITLE").alias("BROADCAST_TITLE"),
col("col.live_status.CATE").alias("GAME_CODE"),
col("col.broad_info.broad.current_sum_viewer").alias("VIEWER_NUM"),
)
afreeca_df = afreeca_df.withColumn("PLATFORM", lit("afreeca"))
except:
afreeca_df = spark.createDataFrame([], schema=common_schema)


result_df = chzzk_df.union(afreeca_df)

# 스키마 정보를 로깅
print("Schema Information:")
result_df.printSchema()

# "PLATFORM" 컬럼을 기준으로 파티션을 구성
partitioned_df = result_df.repartition("PLATFORM")

# 파티션된 Spark DataFrame을 DynamicFrame으로 변환
partitioned_dynamic_frame = DynamicFrame.fromDF(partitioned_df, glueContext, "partitioned_dynamic_frame")


# Parquet으로 변환하여 S3에 저장
glueContext.write_dynamic_frame.from_options(
frame=partitioned_dynamic_frame,
connection_type="s3",
connection_options={"path": output_path},
format="parquet",
transformation_ctx="datasource",
)
Loading