Skip to content

Commit 5a8741f

Browse files
authored
Merge pull request #96 from zizzic/feature/glue
Feature/glue
2 parents c749255 + 62954ee commit 5a8741f

File tree

8 files changed

+497
-7
lines changed

8 files changed

+497
-7
lines changed

dags/glue/game_ccu.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from datetime import datetime, timedelta
2+
3+
from airflow import DAG
4+
from airflow.operators.python import PythonOperator
5+
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
6+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7+
8+
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
9+
from jinja2 import Template
10+
11+
12+
def upload_rendered_script_to_s3(
13+
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
14+
):
15+
# S3Hook 인스턴스 생성
16+
s3_hook = S3Hook(aws_conn_id=aws_conn_id)
17+
18+
# S3에서 Jinja 템플릿 파일 읽기
19+
template_str = s3_hook.read_key(template_s3_key, bucket_name)
20+
21+
# Jinja 템플릿 렌더링
22+
template = Template(template_str)
23+
rendered_script = template.render(**kwargs)
24+
25+
# 렌더링된 스크립트를 S3에 업로드
26+
s3_hook.load_string(
27+
string_data=rendered_script,
28+
bucket_name=bucket_name,
29+
key=rendered_s3_key,
30+
replace=True,
31+
)
32+
33+
34+
with DAG(
35+
"glue_game_ccu",
36+
default_args={
37+
"owner": "airflow",
38+
"depends_on_past": False,
39+
"start_date": datetime(2024, 2, 22),
40+
"retries": 0,
41+
"retry_delay": timedelta(minutes=5),
42+
},
43+
schedule_interval="5 * * * *",
44+
tags=["glue", "Game_CCU"],
45+
catchup=False,
46+
) as dag:
47+
48+
bucket_name = "de-2-1-bucket"
49+
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
50+
year = "{{ data_interval_end.in_timezone('Asia/Seoul').year }}"
51+
month = "{{ data_interval_end.in_timezone('Asia/Seoul').month }}"
52+
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
53+
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour
54+
55+
upload_script = PythonOperator(
56+
task_id="upload_script_to_s3",
57+
python_callable=upload_rendered_script_to_s3,
58+
op_kwargs={
59+
"bucket_name": bucket_name,
60+
"aws_conn_id": "aws_conn_id",
61+
"template_s3_key": "source/script/glue_game_ccu_template.py",
62+
"rendered_s3_key": "source/script/glue_game_ccu_script.py",
63+
# into template
64+
"input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_ccu/year={year}/month={month}/day={day}/hour={hour}/",
65+
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_ccu/year={year}/month={month}/day={day}/hour={hour}/",
66+
"collect_time": f"{year}-{month}-{day} {hour}:00",
67+
},
68+
)
69+
70+
run_glue_job = GlueJobOperator(
71+
task_id="run_glue_job",
72+
job_name="de-2-1_game_ccu",
73+
script_location="s3://de-2-1-bucket/source/script/glue_game_ccu_script.py",
74+
aws_conn_id="aws_conn_id",
75+
region_name="ap-northeast-2",
76+
iam_role_name="AWSGlueServiceRole-crawler",
77+
dag=dag,
78+
)
79+
80+
wait_for_job = GlueJobSensor( # trigger
81+
task_id="wait_for_job_game_ccu_glue_job", # task_id 직관적으로 알 수 있도록 변경 권장
82+
job_name="de-2-1_game_ccu",
83+
# Job ID extracted from previous Glue Job Operator task
84+
run_id=run_glue_job.output,
85+
verbose=True, # prints glue job logs in airflow logs
86+
# region_name="ap-northeast-2",
87+
aws_conn_id="aws_conn_id",
88+
)
89+
90+
upload_script >> run_glue_job >> wait_for_job

dags/glue/game_price.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
from datetime import datetime, timedelta
2+
3+
from airflow import DAG
4+
from airflow.operators.python import PythonOperator
5+
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
6+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7+
8+
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
9+
from jinja2 import Template
10+
11+
12+
def upload_rendered_script_to_s3(
13+
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
14+
):
15+
# S3Hook 인스턴스 생성
16+
s3_hook = S3Hook(aws_conn_id=aws_conn_id)
17+
18+
# S3에서 Jinja 템플릿 파일 읽기
19+
template_str = s3_hook.read_key(template_s3_key, bucket_name)
20+
21+
# Jinja 템플릿 렌더링
22+
template = Template(template_str)
23+
rendered_script = template.render(**kwargs)
24+
25+
# 렌더링된 스크립트를 S3에 업로드
26+
s3_hook.load_string(
27+
string_data=rendered_script,
28+
bucket_name=bucket_name,
29+
key=rendered_s3_key,
30+
replace=True,
31+
)
32+
33+
34+
with DAG(
35+
"glue_game_price",
36+
default_args={
37+
"owner": "airflow",
38+
"depends_on_past": False,
39+
"start_date": datetime(2024, 2, 22),
40+
"retries": 0,
41+
"retry_delay": timedelta(minutes=5),
42+
},
43+
schedule_interval="0 1 * * *",
44+
tags=["glue", "Game_Price"],
45+
catchup=False,
46+
) as dag:
47+
48+
bucket_name = "de-2-1-bucket"
49+
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
50+
year = "{{ data_interval_end.in_timezone('Asia/Seoul').year }}"
51+
month = "{{ data_interval_end.in_timezone('Asia/Seoul').month }}"
52+
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
53+
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul') }}" # before 1 hour
54+
55+
upload_script = PythonOperator(
56+
task_id="upload_script_to_s3",
57+
python_callable=upload_rendered_script_to_s3,
58+
op_kwargs={
59+
"bucket_name": bucket_name,
60+
"aws_conn_id": "aws_conn_id",
61+
"template_s3_key": "source/script/glue_game_price_template.py",
62+
"rendered_s3_key": "source/script/glue_game_price_script.py",
63+
# into template
64+
"input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_price/year={year}/month={month}/day={day}/",
65+
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_price/year={year}/month={month}/day={day}/",
66+
"collect_date": f"{year}-{month}-{day}",
67+
},
68+
)
69+
70+
run_glue_job = GlueJobOperator(
71+
task_id="run_glue_job",
72+
job_name="de-2-1_game_price",
73+
script_location="s3://de-2-1-bucket/source/script/glue_game_price_script.py",
74+
aws_conn_id="aws_conn_id",
75+
region_name="ap-northeast-2",
76+
iam_role_name="AWSGlueServiceRole-crawler",
77+
dag=dag,
78+
)
79+
80+
wait_for_job = GlueJobSensor( # trigger
81+
task_id="wait_for_job_game_price_glue_job", # task_id 직관적으로 알 수 있도록 변경 권장
82+
job_name="de-2-1_game_price",
83+
# Job ID extracted from previous Glue Job Operator task
84+
run_id=run_glue_job.output,
85+
verbose=True, # prints glue job logs in airflow logs
86+
# region_name="ap-northeast-2",
87+
aws_conn_id="aws_conn_id",
88+
)
89+
90+
upload_script >> run_glue_job >> wait_for_job

dags/glue/glue_airflow.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,16 @@ def upload_rendered_script_to_s3(
4040
"retry_delay": timedelta(minutes=5),
4141
},
4242
schedule_interval="0 * * * *",
43-
tags=["Glue"],
43+
tags=["glue", "streaming"],
4444
catchup=False,
4545
) as dag:
4646

4747
bucket_name = "de-2-1-bucket"
48-
local_path = "./glue_script.py"
49-
current_time = "{{ data_interval_end.strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
50-
year = "{{ data_interval_end.year }}"
51-
month = "{{ data_interval_end.month }}"
52-
day = "{{ data_interval_end.day }}"
53-
hour = "{{ data_interval_end.hour }}"
48+
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
49+
year = "{{ data_interval_end.year.in_timezone('Asia/Seoul') }}"
50+
month = "{{ data_interval_end.month.in_timezone('Asia/Seoul') }}"
51+
day = "{{ data_interval_end.day.in_timezone('Asia/Seoul') }}"
52+
hour = "{{ data_interval_end.hour.in_timezone('Asia/Seoul') }}"
5453

5554
upload_script = PythonOperator(
5655
task_id="upload_script_to_s3",

dags/glue/live_viewer.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from datetime import datetime, timedelta
2+
3+
from airflow import DAG
4+
from airflow.operators.python import PythonOperator
5+
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
6+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7+
8+
from jinja2 import Template
9+
10+
11+
def upload_rendered_script_to_s3(
12+
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
13+
):
14+
# S3Hook 인스턴스 생성
15+
s3_hook = S3Hook(aws_conn_id=aws_conn_id)
16+
17+
# S3에서 Jinja 템플릿 파일 읽기
18+
template_str = s3_hook.read_key(template_s3_key, bucket_name)
19+
20+
# Jinja 템플릿 렌더링
21+
template = Template(template_str)
22+
rendered_script = template.render(**kwargs)
23+
24+
# 렌더링된 스크립트를 S3에 업로드
25+
s3_hook.load_string(
26+
string_data=rendered_script,
27+
bucket_name=bucket_name,
28+
key=rendered_s3_key,
29+
replace=True,
30+
)
31+
32+
33+
with DAG(
34+
"glue_live_viewer",
35+
default_args={
36+
"owner": "airflow",
37+
"depends_on_past": False,
38+
"start_date": datetime(2024, 1, 17),
39+
"retries": 0,
40+
"retry_delay": timedelta(minutes=5),
41+
},
42+
tags=["glue", "streaming"],
43+
schedule_interval="0 * * * *",
44+
catchup=False,
45+
) as dag:
46+
47+
bucket_name = "de-2-1-bucket"
48+
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
49+
year = "{{ data_interval_end.in_timezone('Asia/Seoul').year }}"
50+
month = "{{ data_interval_end.in_timezone('Asia/Seoul').month }}"
51+
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
52+
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour
53+
54+
upload_script = PythonOperator(
55+
task_id="upload_script_to_s3",
56+
python_callable=upload_rendered_script_to_s3,
57+
op_kwargs={
58+
"bucket_name": bucket_name,
59+
"aws_conn_id": "aws_conn_id",
60+
"template_s3_key": "source/script/live_viewer_template.py",
61+
"rendered_s3_key": "source/script/live_viewer_script.py",
62+
# into template
63+
"input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
64+
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
65+
},
66+
)
67+
68+
run_glue_job = GlueJobOperator(
69+
task_id="run_glue_job",
70+
job_name="de-2-1_live_viewer", # when launch, plz clean&change glue jobs
71+
script_location="s3://de-2-1-bucket/source/script/live_viewer_script.py",
72+
aws_conn_id="aws_conn_id",
73+
region_name="ap-northeast-2",
74+
iam_role_name="AWSGlueServiceRole-crawler",
75+
dag=dag,
76+
)
77+
78+
upload_script >> run_glue_job

dags/streaming/stream_data_raw.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from datetime import datetime, timedelta
23

34
import json
@@ -55,6 +56,9 @@ def chzzk_raw(current_time, **kwargs):
5556

5657
if res.status_code == 200:
5758
live_data = res.json()
59+
if live_data["content"]["liveId"]:
60+
time.sleep(5)
61+
live_data = requests.get(f"https://api.chzzk.naver.com/service/v2/channels/{id}/live-detail").json()
5862
try:
5963
live = live_data["content"]["status"]
6064
if live == "OPEN":
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import sys
2+
3+
from pyspark.context import SparkContext
4+
from pyspark.sql.functions import col, lit, when, udf, explode
5+
from pyspark.sql.types import StringType
6+
7+
from awsglue.context import GlueContext
8+
from awsglue.dynamicframe import DynamicFrame
9+
from awsglue.job import Job
10+
from awsglue.utils import getResolvedOptions
11+
12+
# SparkContext와 GlueContext 초기화
13+
sc = SparkContext()
14+
glueContext = GlueContext(sc)
15+
spark = glueContext.spark_session
16+
17+
# Job 초기화 (Job Bookmark 활성화 포함)
18+
job = Job(glueContext)
19+
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
20+
job.init(args["JOB_NAME"], args)
21+
22+
# S3에서 데이터를 읽어오는 부분
23+
datasource = glueContext.create_dynamic_frame.from_options(
24+
"s3",
25+
{"paths": ["{{ input_path }}"], "recurse": True},
26+
format="json",
27+
transformation_ctx="datasource",
28+
)
29+
30+
# 전처리를 위해 DF로 변환하기
31+
game_ccu_datasource = datasource.toDF()
32+
33+
# 최상위 레벨의 key를 중심으로 explode하기
34+
df = game_ccu_datasource.select(
35+
explode(game_ccu_datasource.raw_game_ccu).alias("raw_game_ccu")
36+
)
37+
38+
df = df.select(
39+
col("raw_game_ccu.game_id").alias("GAME_ID"),
40+
lit("{{ collect_time }}").alias("COLLECT_TIME"),
41+
col("raw_game_ccu.player_count").alias("GAME_CCU"),
42+
)
43+
44+
dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")
45+
46+
# Parquet으로 변환하여 S3에 저장
47+
glueContext.write_dynamic_frame.from_options(
48+
frame=dynamic_frame,
49+
connection_type="s3",
50+
connection_options={"path": "{{ output_path }}"},
51+
format="parquet",
52+
transformation_ctx="dynamic_frame",
53+
)
54+
55+
56+
# Job Bookmark의 상태를 최종적으로 커밋
57+
job.commit()

0 commit comments

Comments
 (0)