Skip to content

Commit

Permalink
Merge pull request #127 from zizzic/feature/glue_crawler
Browse files Browse the repository at this point in the history
[feat]: add crawler-operator to dag
  • Loading branch information
srlee056 authored Mar 4, 2024
2 parents cf2e68d + 6964eac commit 04fcb6d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 22 deletions.
36 changes: 21 additions & 15 deletions dags/glue/game_ccu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable

# from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
Expand Down Expand Up @@ -54,20 +56,6 @@ def upload_rendered_script_to_s3(
day = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').day }}"
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul').hour }}" # before 1 hour

# upload_script = PythonOperator(
# task_id="upload_script_to_s3",
# python_callable=upload_rendered_script_to_s3,
# op_kwargs={
# "bucket_name": bucket_name,
# "aws_conn_id": "aws_conn_id",
# "template_s3_key": "source/script/glue_game_ccu_template.py",
# "rendered_s3_key": "source/script/glue_game_ccu_script.py",
# # into template
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_ccu/year={year}/month={month}/day={day}/hour={hour}/",
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_ccu/year={year}/month={month}/day={day}/hour={hour}/",
# },
# )

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
job_name="de-2-1_game_ccu",
Expand All @@ -89,6 +77,24 @@ def upload_rendered_script_to_s3(
run_id=run_glue_job.output,
aws_conn_id="aws_conn_id",
)
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
glue_crawler_config = {
"Name": "de-2-1-raw_game_ccu",
"Role": glue_crawler_arn,
"DatabaseName": "de_2_1_glue",
"Targets": {
"S3Targets": [
{
"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/"
}
]
},
}

crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
aws_conn_id="aws_conn_id",
)
# upload_script >> run_glue_job >> wait_for_job
run_glue_job >> wait_for_job
run_glue_job >> wait_for_job >> crawl_s3
14 changes: 7 additions & 7 deletions tests/dags/test_dag_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import glob
import os
import pytest
from unittest.mock import patch

# from airflow.models import DAG
from airflow.models.dagbag import DagBag
Expand All @@ -21,11 +22,10 @@ def test_dag_integrity(dag_file):
Args:
dag_file (str): The path to a DAG file to be tested.
"""
dag_bag = DagBag(dag_folder=os.path.dirname(dag_file), include_examples=False)
with patch("airflow.models.Variable.get", return_value="dummy_value"):
dag_bag = DagBag(dag_folder=os.path.dirname(dag_file), include_examples=False)
dag_bag.process_file(dag_file, only_if_updated=True)

dag_bag.process_file(dag_file, only_if_updated=True)

# dag_id, dag in dag_bag.dags.items()
for dag_id, _ in dag_bag.dags.items():
assert dag_id in dag_bag.dags, "DAG ID not found in dag_bag.dags"
assert not dag_bag.import_errors, "Import errors found in DagBag"
for dag_id, _ in dag_bag.dags.items():
assert dag_id in dag_bag.dags, "DAG ID not found in dag_bag.dags"
assert not dag_bag.import_errors, "Import errors found in DagBag"

0 comments on commit 04fcb6d

Please sign in to comment.