diff --git a/dags/glue/game_ccu.py b/dags/glue/game_ccu.py index 3588726..c26e8e0 100644 --- a/dags/glue/game_ccu.py +++ b/dags/glue/game_ccu.py @@ -1,9 +1,11 @@ from datetime import datetime, timedelta from airflow import DAG +from airflow.models import Variable # from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.glue import GlueJobOperator +from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor @@ -54,20 +56,6 @@ def upload_rendered_script_to_s3( day = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').day }}" hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour - # upload_script = PythonOperator( - # task_id="upload_script_to_s3", - # python_callable=upload_rendered_script_to_s3, - # op_kwargs={ - # "bucket_name": bucket_name, - # "aws_conn_id": "aws_conn_id", - # "template_s3_key": "source/script/glue_game_ccu_template.py", - # "rendered_s3_key": "source/script/glue_game_ccu_script.py", - # # into template - # "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_ccu/year={year}/month={month}/day={day}/hour={hour}/", - # "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_ccu/year={year}/month={month}/day={day}/hour={hour}/", - # }, - # ) - run_glue_job = GlueJobOperator( task_id="run_glue_job", job_name="de-2-1_game_ccu", @@ -89,6 +77,24 @@ def upload_rendered_script_to_s3( run_id=run_glue_job.output, aws_conn_id="aws_conn_id", ) + glue_crawler_arn = Variable.get("glue_crawler_arn_secret") + glue_crawler_config = { + "Name": "de-2-1-raw_game_ccu", + "Role": glue_crawler_arn, + "DatabaseName": "de_2_1_glue", + "Targets": { + "S3Targets": [ + { + "Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/" + } + ] + }, + } + crawl_s3 = GlueCrawlerOperator( + task_id="crawl_s3", + config=glue_crawler_config, + aws_conn_id="aws_conn_id", + ) # upload_script >> run_glue_job >> wait_for_job -run_glue_job >> wait_for_job +run_glue_job >> wait_for_job >> crawl_s3 diff --git a/tests/dags/test_dag_integrity.py b/tests/dags/test_dag_integrity.py index 70e9163..b4b9b61 100644 --- a/tests/dags/test_dag_integrity.py +++ b/tests/dags/test_dag_integrity.py @@ -3,6 +3,7 @@ import glob import os import pytest +from unittest.mock import patch # from airflow.models import DAG from airflow.models.dagbag import DagBag @@ -21,11 +22,10 @@ def test_dag_integrity(dag_file): Args: dag_file (str): The path to a DAG file to be tested. """ - dag_bag = DagBag(dag_folder=os.path.dirname(dag_file), include_examples=False) + with patch("airflow.models.Variable.get", return_value="dummy_value"): + dag_bag = DagBag(dag_folder=os.path.dirname(dag_file), include_examples=False) + dag_bag.process_file(dag_file, only_if_updated=True) - dag_bag.process_file(dag_file, only_if_updated=True) - - # dag_id, dag in dag_bag.dags.items() - for dag_id, _ in dag_bag.dags.items(): - assert dag_id in dag_bag.dags, "DAG ID not found in dag_bag.dags" - assert not dag_bag.import_errors, "Import errors found in DagBag" + for dag_id, _ in dag_bag.dags.items(): + assert dag_id in dag_bag.dags, "DAG ID not found in dag_bag.dags" + assert not dag_bag.import_errors, "Import errors found in DagBag"