diff --git a/dags/glue/game_price.py b/dags/glue/game_price.py index a7d1836..d27201b 100644 --- a/dags/glue/game_price.py +++ b/dags/glue/game_price.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta from airflow import DAG +from airflow.models import Variable # from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator +from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor @@ -42,7 +44,7 @@ def upload_rendered_script_to_s3( "retry_delay": timedelta(seconds=15), }, max_active_runs=1, - schedule_interval="0 16 * * *", # 한국시간 새벽 1시 + schedule_interval="0 16 * * *", # 한국시간 새벽 1시 tags=["glue", "Game_Price"], catchup=True, ) as dag: @@ -92,5 +94,22 @@ def upload_rendered_script_to_s3( aws_conn_id="aws_conn_id", ) -# upload_script >> run_glue_job >> wait_for_job -run_glue_job >> wait_for_job + glue_crawler_arn = Variable.get("glue_crawler_arn_secret") + glue_crawler_config = { + "Name": "de-2-1-raw_game_price", + "Role": glue_crawler_arn, + "DatabaseName": "de_2_1_glue", + "Targets": { + "S3Targets": [ + {"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_game_price/"} + ] + }, + } + + crawl_s3 = GlueCrawlerOperator( + task_id="crawl_s3", + config=glue_crawler_config, + aws_conn_id="aws_conn_id", + ) + +run_glue_job >> wait_for_job >> crawl_s3 diff --git a/dags/glue/game_rating.py b/dags/glue/game_rating.py index e76b49d..78781a0 100644 --- a/dags/glue/game_rating.py +++ b/dags/glue/game_rating.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta from airflow import DAG +from airflow.models import Variable # from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator +from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor @@ -42,7 +44,7 @@ def upload_rendered_script_to_s3( "retry_delay": timedelta(seconds=15), }, max_active_runs=1, - schedule_interval="0 16 * * *", # 한국시간 새벽 1시 + schedule_interval="0 16 * * *", # 한국시간 새벽 1시 tags=["glue", "Game_Rating"], catchup=True, ) as dag: @@ -54,21 +56,6 @@ def upload_rendered_script_to_s3( day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}" hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul') }}" # before 1 hour - # upload_script = PythonOperator( - # task_id="upload_script_to_s3", - # python_callable=upload_rendered_script_to_s3, - # op_kwargs={ - # "bucket_name": bucket_name, - # "aws_conn_id": "aws_conn_id", - # "template_s3_key": "source/script/glue_game_rating_template.py", - # "rendered_s3_key": "source/script/glue_game_rating_script.py", - # # into template - # "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_rating/year={year}/month={month}/day={day}/", - # "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/year={year}/month={month}/day={day}/", - # "collect_date": f"{year}-{month}-{day}", - # }, - # ) - run_glue_job = GlueJobOperator( task_id="run_glue_job", job_name="de-2-1_game_rating", @@ -92,5 +79,24 @@ def upload_rendered_script_to_s3( aws_conn_id="aws_conn_id", ) -# upload_script >> run_glue_job >> wait_for_job -run_glue_job >> wait_for_job + glue_crawler_arn = Variable.get("glue_crawler_arn_secret") + glue_crawler_config = { + "Name": "de-2-1-raw_game_rating", + "Role": glue_crawler_arn, + "DatabaseName": "de_2_1_glue", + "Targets": { + "S3Targets": [ + { + "Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/" + } + ] + }, + } + + crawl_s3 = GlueCrawlerOperator( + task_id="crawl_s3", + config=glue_crawler_config, + aws_conn_id="aws_conn_id", + ) + +run_glue_job >> wait_for_job >> crawl_s3 diff --git a/dags/glue/glue_followers.py b/dags/glue/glue_followers.py index f4ec52f..75c7b06 100644 --- a/dags/glue/glue_followers.py +++ b/dags/glue/glue_followers.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta from airflow import DAG +from airflow.models import Variable # from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator +from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor @@ -43,7 +45,7 @@ def upload_rendered_script_to_s3( }, max_active_runs=1, tags=["glue", "streaming"], - schedule_interval="0 16 * * *", # 한국시간 새벽 1시 + schedule_interval="0 16 * * *", # 한국시간 새벽 1시 catchup=True, ) as dag: @@ -53,19 +55,6 @@ def upload_rendered_script_to_s3( month = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').month }}" day = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').day }}" - # upload_script = PythonOperator( - # task_id="upload_script_to_s3", - # python_callable=upload_rendered_script_to_s3, - # op_kwargs={ - # "bucket_name": bucket_name, - # "aws_conn_id": "aws_conn_id", - # "template_s3_key": "source/script/glue_followers_template.py", - # "rendered_s3_key": "source/script/glue_followers_script.py", - # "input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/", - # "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/", - # }, - # ) - run_glue_job = GlueJobOperator( task_id="run_glue_job", job_name="de-2-1_followers", @@ -88,5 +77,22 @@ def upload_rendered_script_to_s3( aws_conn_id="aws_conn_id", ) + glue_crawler_arn = Variable.get("glue_crawler_arn_secret") + glue_crawler_config = { + "Name": "de-2-1-followers", + "Role": glue_crawler_arn, + "DatabaseName": "de_2_1_glue", + "Targets": { + "S3Targets": [ + {"Path": "s3://de-2-1-bucket/source/parquet/table_name=followers/"} + ] + }, + } + + crawl_s3 = GlueCrawlerOperator( + task_id="crawl_s3", + config=glue_crawler_config, + aws_conn_id="aws_conn_id", + ) -run_glue_job >> wait_for_job +run_glue_job >> wait_for_job >> crawl_s3 diff --git a/dags/glue/live_viewer.py b/dags/glue/live_viewer.py index 56524de..a966cb9 100644 --- a/dags/glue/live_viewer.py +++ b/dags/glue/live_viewer.py @@ -2,12 +2,13 @@ import time from airflow import DAG +from airflow.models import Variable from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator +from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor -from airflow.sensors.time_delta_sensor import TimeDeltaSensor from jinja2 import Template @@ -48,7 +49,7 @@ def upload_rendered_script_to_s3( tags=["glue", "streaming"], schedule_interval="0 * * * *", catchup=True, - max_active_runs=1 + max_active_runs=1, ) as dag: bucket_name = "de-2-1-bucket" @@ -80,10 +81,25 @@ def upload_rendered_script_to_s3( run_id=run_glue_job.output, aws_conn_id="aws_conn_id", ) - sleep_task = PythonOperator( - task_id='sleep_for_60_seconds', - python_callable=sleep_for_60_seconds, - dag=dag, + + glue_crawler_arn = Variable.get("glue_crawler_arn_secret") + glue_crawler_config = { + "Name": "de-2-1-raw_live_viewer", + "Role": glue_crawler_arn, + "DatabaseName": "de_2_1_glue", + "Targets": { + "S3Targets": [ + { + "Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/" + } + ] + }, + } + + crawl_s3 = GlueCrawlerOperator( + task_id="crawl_s3", + config=glue_crawler_config, + aws_conn_id="aws_conn_id", ) -run_glue_job >> wait_for_job >> sleep_task +run_glue_job >> wait_for_job >> crawl_s3