Skip to content

Commit

Permalink
[hotfix] live_viewer.py retries change, add timedelta sensor(wait30s)
Browse files Browse the repository at this point in the history
  • Loading branch information
poriz committed Mar 4, 2024
1 parent da4e881 commit 913b81e
Showing 1 changed file with 7 additions and 17 deletions.
24 changes: 7 additions & 17 deletions dags/glue/live_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.sensors.time_delta_sensor import TimeDeltaSensor

from jinja2 import Template

Expand Down Expand Up @@ -38,7 +39,7 @@ def upload_rendered_script_to_s3(
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 2, 26),
"retries": 0,
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
tags=["glue", "streaming"],
Expand All @@ -55,20 +56,6 @@ def upload_rendered_script_to_s3(
day = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').day }}"
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour

# upload_script = PythonOperator(
# task_id="upload_script_to_s3",
# python_callable=upload_rendered_script_to_s3,
# op_kwargs={
# "bucket_name": bucket_name,
# "aws_conn_id": "aws_conn_id",
# "template_s3_key": "source/script/live_viewer_template.py",
# "rendered_s3_key": "source/script/live_viewer_script.py",
# # into template
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/",
# },
# )

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
job_name="de-2-1_live_viewer", # when launch, plz clean&change glue jobs
Expand All @@ -90,6 +77,9 @@ def upload_rendered_script_to_s3(
run_id=run_glue_job.output,
aws_conn_id="aws_conn_id",
)
wait_30_seconds = TimeDeltaSensor(
task_id= 'wait_30_seconds',
delta=timedelta(seconds=30),
)


run_glue_job >> wait_for_job
run_glue_job >> wait_for_job >> wait_30_seconds

0 comments on commit 913b81e

Please sign in to comment.