From 913b81e58f99893a90891c461f67f45ba4431fb8 Mon Sep 17 00:00:00 2001 From: poriz Date: Mon, 4 Mar 2024 22:36:28 +0900 Subject: [PATCH] [hotfix] live_viewer.py retries change, add timedelta sensor(wait30s) --- dags/glue/live_viewer.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/dags/glue/live_viewer.py b/dags/glue/live_viewer.py index 5d1657b..476562f 100644 --- a/dags/glue/live_viewer.py +++ b/dags/glue/live_viewer.py @@ -6,6 +6,7 @@ from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor +from airflow.sensors.time_delta_sensor import TimeDeltaSensor from jinja2 import Template @@ -38,7 +39,7 @@ def upload_rendered_script_to_s3( "owner": "airflow", "depends_on_past": False, "start_date": datetime(2024, 2, 26), - "retries": 0, + "retries": 1, "retry_delay": timedelta(minutes=5), }, tags=["glue", "streaming"], @@ -55,20 +56,6 @@ def upload_rendered_script_to_s3( day = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').day }}" hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour - # upload_script = PythonOperator( - # task_id="upload_script_to_s3", - # python_callable=upload_rendered_script_to_s3, - # op_kwargs={ - # "bucket_name": bucket_name, - # "aws_conn_id": "aws_conn_id", - # "template_s3_key": "source/script/live_viewer_template.py", - # "rendered_s3_key": "source/script/live_viewer_script.py", - # # into template - # "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/", - # "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/year={year}/month={month}/day={day}/hour={hour}/", - # }, - # ) - run_glue_job = GlueJobOperator( task_id="run_glue_job", job_name="de-2-1_live_viewer", # when launch, plz clean&change glue jobs @@ -90,6 +77,9 @@ def upload_rendered_script_to_s3( run_id=run_glue_job.output, aws_conn_id="aws_conn_id", ) + wait_30_seconds = TimeDeltaSensor( + task_id= 'wait_30_seconds', + delta=timedelta(seconds=30), + ) - -run_glue_job >> wait_for_job +run_glue_job >> wait_for_job >> wait_30_seconds