Skip to content

Commit 13dfe51

Browse files
committed
[feat] ELT_test_DAG
1 parent a756599 commit 13dfe51

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

dags/elt/anal_week_game_viewer.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from airflow import DAG
2+
from airflow.decorators import task
3+
from airflow.providers.postgres.hooks.postgres import PostgresHook
4+
5+
from datetime import datetime, timedelta
6+
import slack
7+
8+
9+
def connect_to_redshift():
10+
db_hook = PostgresHook(postgres_conn_id="aws_redshift_conn_id")
11+
conn = db_hook.get_conn()
12+
conn.autocommit = True
13+
14+
return conn.cursor()
15+
16+
17+
@task
18+
def elt():
19+
# 1. GCE의 RDS의 GAME_INFO 테이블에서 게임 리스트 가져오기
20+
cur = connect_to_redshift()
21+
print("Successfully connected to Redshift")
22+
conn = cur.connection
23+
24+
try:
25+
# Start a new transaction
26+
conn.autocommit = False
27+
28+
# analytics.anal_week_game_viewer 테이블의 모든 데이터 삭제
29+
sql = """
30+
DELETE FROM analytics.anal_week_game_viewer;
31+
"""
32+
cur.execute(sql)
33+
print("Successfully deleted all data from analytics.anal_week_game_viewer")
34+
35+
# SELECT 쿼리의 결과를 analytics.anal_week_game_viewer 테이블에 삽입
36+
sql = """
37+
INSERT INTO analytics.anal_week_game_viewer
38+
SELECT GAME_NM, CAST(HOUR AS INTEGER), si.streamer_nm as STREAMER_NM, AVG(viewer_num) AS VIEWER_AVG, TO_DATE(TO_CHAR(live_collect_time::DATE, 'YYYY-MM-DD'), 'YYYY-MM-DD') AS CREATED_DATE
39+
FROM (
40+
SELECT gi.game_nm ,rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id
41+
FROM external_raw_data.table_name_raw_live_viewer AS rlv
42+
INNER JOIN external_raw_data.game_info AS gi
43+
ON rlv.game_code = gi.chz_game_code
44+
45+
UNION
46+
47+
SELECT gi.game_nm, rlv.game_code, rlv.live_collect_time, rlv.viewer_num, rlv.year, rlv.month, rlv.day, CAST(rlv.hour AS INTEGER), rlv.streamer_id
48+
FROM external_raw_data.table_name_raw_live_viewer AS rlv
49+
INNER JOIN external_raw_data.game_info AS gi
50+
ON rlv.game_code = LPAD(gi.afc_game_code, LENGTH(gi.afc_game_code) + 3, '0')
51+
) AS subquery
52+
INNER JOIN external_raw_data.streamer_info AS si
53+
ON subquery.streamer_id = si.streamer_id
54+
GROUP BY GAME_NM, CREATED_DATE, HOUR, si.streamer_nm
55+
ORDER BY GAME_NM, CREATED_DATE, HOUR, si.streamer_nm;
56+
"""
57+
cur.execute(sql)
58+
print("Successfully inserted data into analytics.anal_week_game_viewer")
59+
60+
# 트랜잭션 commit
61+
conn.commit()
62+
print("Successfully committed the transaction")
63+
64+
except Exception as e:
65+
# Rollback
66+
print("Error occurred. Start to rollback", e)
67+
conn.rollback()
68+
raise
69+
70+
finally:
71+
# Close the cursor and connection
72+
cur.close()
73+
conn.close()
74+
print("Connection to Redshift is closed")
75+
76+
77+
with DAG(
78+
dag_id="ELT_Anal_Week_Game_Viewer",
79+
start_date=datetime(2024, 1, 1),
80+
catchup=False,
81+
tags=["ELT", "analytics", "game_viewer"],
82+
schedule_interval="0 0 * * *", # 매일 자정
83+
default_args={
84+
"retries": 3,
85+
"retry_delay": timedelta(minutes=5),
86+
"on_failure_callback": slack.on_failure_callback,
87+
},
88+
) as dag:
89+
90+
elt()

0 commit comments

Comments
 (0)