Skip to content

Commit

Permalink
[feat] passing arguments to Streaming Glue job script
Browse files Browse the repository at this point in the history
  • Loading branch information
mediwind committed Feb 28, 2024
1 parent 52a8fc4 commit 1145d43
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 29 deletions.
34 changes: 20 additions & 14 deletions dags/glue/glue_followers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

# from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
Expand Down Expand Up @@ -51,18 +52,18 @@ def upload_rendered_script_to_s3(
month = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').month }}"
day = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').day }}"

upload_script = PythonOperator(
task_id="upload_script_to_s3",
python_callable=upload_rendered_script_to_s3,
op_kwargs={
"bucket_name": bucket_name,
"aws_conn_id": "aws_conn_id",
"template_s3_key": "source/script/glue_followers_template.py",
"rendered_s3_key": "source/script/glue_followers_script.py",
"input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/",
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/",
},
)
# upload_script = PythonOperator(
# task_id="upload_script_to_s3",
# python_callable=upload_rendered_script_to_s3,
# op_kwargs={
# "bucket_name": bucket_name,
# "aws_conn_id": "aws_conn_id",
# "template_s3_key": "source/script/glue_followers_template.py",
# "rendered_s3_key": "source/script/glue_followers_script.py",
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/",
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/",
# },
# )

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
Expand All @@ -72,7 +73,12 @@ def upload_rendered_script_to_s3(
region_name="ap-northeast-2",
iam_role_name="AWSGlueServiceRole-crawler",
dag=dag,
script_args={ # ~script.py에서 사용할 인자들을 정의
"--input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/",
"--output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/",
},
)

wait_for_job = GlueJobSensor( # trigger
task_id="wait_for_job_followers_job", # task_id 직관적으로 알 수 있도록 변경 권장
job_name="de-2-1_followers",
Expand All @@ -82,4 +88,4 @@ def upload_rendered_script_to_s3(
)


upload_script >> run_glue_job >> wait_for_job
run_glue_job >> wait_for_job
36 changes: 21 additions & 15 deletions dags/glue/live_viewer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator

# from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
Expand Down Expand Up @@ -53,19 +54,19 @@ def upload_rendered_script_to_s3(
day = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').day }}"
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour

upload_script = PythonOperator(
task_id="upload_script_to_s3",
python_callable=upload_rendered_script_to_s3,
op_kwargs={
"bucket_name": bucket_name,
"aws_conn_id": "aws_conn_id",
"template_s3_key": "source/script/live_viewer_template.py",
"rendered_s3_key": "source/script/live_viewer_script.py",
# into template
"input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
},
)
# upload_script = PythonOperator(
# task_id="upload_script_to_s3",
# python_callable=upload_rendered_script_to_s3,
# op_kwargs={
# "bucket_name": bucket_name,
# "aws_conn_id": "aws_conn_id",
# "template_s3_key": "source/script/live_viewer_template.py",
# "rendered_s3_key": "source/script/live_viewer_script.py",
# # into template
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
# },
# )

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
Expand All @@ -75,7 +76,12 @@ def upload_rendered_script_to_s3(
region_name="ap-northeast-2",
iam_role_name="AWSGlueServiceRole-crawler",
dag=dag,
script_args={ # ~script.py에서 사용할 인자들을 정의
"--input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
"--output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
},
)

wait_for_job = GlueJobSensor( # trigger
task_id="wait_for_job_live_viewer_job", # task_id 직관적으로 알 수 있도록 변경 권장
job_name="de-2-1_live_viewer",
Expand All @@ -85,4 +91,4 @@ def upload_rendered_script_to_s3(
)


upload_script >> run_glue_job >> wait_for_job
run_glue_job >> wait_for_job

0 comments on commit 1145d43

Please sign in to comment.