Skip to content

Commit 6deaa87

Browse files
authored
Merge pull request #111 from zizzic/feature/glue_followers
[feat] make glue job dag for followers
2 parents 9708cb3 + b3a3bcc commit 6deaa87

File tree

1 file changed

+85
-78
lines changed

1 file changed

+85
-78
lines changed
Lines changed: 85 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,85 @@
1-
from datetime import datetime, timedelta
2-
3-
from airflow import DAG
4-
from airflow.operators.python import PythonOperator
5-
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
6-
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7-
8-
from jinja2 import Template
9-
10-
11-
def upload_rendered_script_to_s3(
12-
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
13-
):
14-
# S3Hook 인스턴스 생성
15-
s3_hook = S3Hook(aws_conn_id=aws_conn_id)
16-
17-
# S3에서 Jinja 템플릿 파일 읽기
18-
template_str = s3_hook.read_key(template_s3_key, bucket_name)
19-
20-
# Jinja 템플릿 렌더링
21-
template = Template(template_str)
22-
rendered_script = template.render(**kwargs)
23-
24-
# 렌더링된 스크립트를 S3에 업로드
25-
s3_hook.load_string(
26-
string_data=rendered_script,
27-
bucket_name=bucket_name,
28-
key=rendered_s3_key,
29-
replace=True,
30-
)
31-
32-
33-
with DAG(
34-
"glue_test_dag",
35-
default_args={
36-
"owner": "airflow",
37-
"depends_on_past": False,
38-
"start_date": datetime(2024, 1, 17),
39-
"retries": 0,
40-
"retry_delay": timedelta(minutes=5),
41-
},
42-
schedule_interval="0 * * * *",
43-
tags=["glue", "streaming"],
44-
catchup=False,
45-
) as dag:
46-
47-
bucket_name = "de-2-1-bucket"
48-
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
49-
year = "{{ data_interval_end.year.in_timezone('Asia/Seoul') }}"
50-
month = "{{ data_interval_end.month.in_timezone('Asia/Seoul') }}"
51-
day = "{{ data_interval_end.day.in_timezone('Asia/Seoul') }}"
52-
hour = "{{ data_interval_end.hour.in_timezone('Asia/Seoul') }}"
53-
54-
upload_script = PythonOperator(
55-
task_id="upload_script_to_s3",
56-
python_callable=upload_rendered_script_to_s3,
57-
op_kwargs={
58-
"bucket_name": bucket_name,
59-
"aws_conn_id": "aws_conn_id",
60-
"template_s3_key": "source/script/glue_template.py",
61-
"rendered_s3_key": "source/script/glue_script.py",
62-
# into template
63-
"input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
64-
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
65-
},
66-
)
67-
68-
run_glue_job = GlueJobOperator(
69-
task_id="run_glue_job",
70-
job_name="DE-2-1-testjob",
71-
script_location="s3://de-2-1-bucket/source/script/glue_script.py",
72-
aws_conn_id="aws_conn_id",
73-
region_name="ap-northeast-2",
74-
iam_role_name="AWSGlueServiceRole-crawler",
75-
dag=dag,
76-
)
77-
78-
upload_script >> run_glue_job
1+
from datetime import datetime, timedelta
2+
3+
from airflow import DAG
4+
from airflow.operators.python import PythonOperator
5+
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
6+
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
7+
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
8+
9+
from jinja2 import Template
10+
11+
12+
def upload_rendered_script_to_s3(
13+
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
14+
):
15+
# S3Hook 인스턴스 생성
16+
s3_hook = S3Hook(aws_conn_id=aws_conn_id)
17+
18+
# S3에서 Jinja 템플릿 파일 읽기
19+
template_str = s3_hook.read_key(template_s3_key, bucket_name)
20+
21+
# Jinja 템플릿 렌더링
22+
template = Template(template_str)
23+
rendered_script = template.render(**kwargs)
24+
25+
# 렌더링된 스크립트를 S3에 업로드
26+
s3_hook.load_string(
27+
string_data=rendered_script,
28+
bucket_name=bucket_name,
29+
key=rendered_s3_key,
30+
replace=True,
31+
)
32+
33+
34+
with DAG(
35+
"glue_followers",
36+
default_args={
37+
"owner": "airflow",
38+
"depends_on_past": False,
39+
"start_date": datetime(2024, 2, 27),
40+
"retries": 0,
41+
"retry_delay": timedelta(minutes=5),
42+
},
43+
tags=["glue", "streaming"],
44+
schedule_interval="30 16 * * *", # 한국 기준 새벽 1시 30분
45+
catchup=True,
46+
) as dag:
47+
48+
bucket_name = "de-2-1-bucket"
49+
50+
year = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').year }}"
51+
month = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').month }}"
52+
day = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').day }}"
53+
54+
upload_script = PythonOperator(
55+
task_id="upload_script_to_s3",
56+
python_callable=upload_rendered_script_to_s3,
57+
op_kwargs={
58+
"bucket_name": bucket_name,
59+
"aws_conn_id": "aws_conn_id",
60+
"template_s3_key": "source/script/glue_followers_template.py",
61+
"rendered_s3_key": "source/script/glue_followers_script.py",
62+
"input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/",
63+
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/",
64+
},
65+
)
66+
67+
run_glue_job = GlueJobOperator(
68+
task_id="run_glue_job",
69+
job_name="de-2-1_followers",
70+
script_location="s3://de-2-1-bucket/source/script/glue_followers_script.py",
71+
aws_conn_id="aws_conn_id",
72+
region_name="ap-northeast-2",
73+
iam_role_name="AWSGlueServiceRole-crawler",
74+
dag=dag,
75+
)
76+
wait_for_job = GlueJobSensor( # trigger
77+
task_id="wait_for_job_followers_job", # task_id 직관적으로 알 수 있도록 변경 권장
78+
job_name="de-2-1_followers",
79+
# Job ID extracted from previous Glue Job Operator task
80+
run_id=run_glue_job.output,
81+
aws_conn_id="aws_conn_id",
82+
)
83+
84+
85+
upload_script >> run_glue_job >> wait_for_job

0 commit comments

Comments
 (0)