Skip to content

Commit

Permalink
Merge pull request #129 from zizzic/feature/glue_crawler
Browse files Browse the repository at this point in the history
Feature/glue crawler
  • Loading branch information
srlee056 authored Mar 5, 2024
2 parents dcb8f35 + 23e4179 commit 7b91564
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 43 deletions.
25 changes: 22 additions & 3 deletions dags/glue/game_price.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable

# from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
Expand Down Expand Up @@ -42,7 +44,7 @@ def upload_rendered_script_to_s3(
"retry_delay": timedelta(seconds=15),
},
max_active_runs=1,
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
tags=["glue", "Game_Price"],
catchup=True,
) as dag:
Expand Down Expand Up @@ -92,5 +94,22 @@ def upload_rendered_script_to_s3(
aws_conn_id="aws_conn_id",
)

# upload_script >> run_glue_job >> wait_for_job
run_glue_job >> wait_for_job
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
glue_crawler_config = {
"Name": "de-2-1-raw_game_price",
"Role": glue_crawler_arn,
"DatabaseName": "de_2_1_glue",
"Targets": {
"S3Targets": [
{"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_game_price/"}
]
},
}

crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
aws_conn_id="aws_conn_id",
)

run_glue_job >> wait_for_job >> crawl_s3
42 changes: 24 additions & 18 deletions dags/glue/game_rating.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable

# from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
Expand Down Expand Up @@ -42,7 +44,7 @@ def upload_rendered_script_to_s3(
"retry_delay": timedelta(seconds=15),
},
max_active_runs=1,
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
tags=["glue", "Game_Rating"],
catchup=True,
) as dag:
Expand All @@ -54,21 +56,6 @@ def upload_rendered_script_to_s3(
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul') }}" # before 1 hour

# upload_script = PythonOperator(
# task_id="upload_script_to_s3",
# python_callable=upload_rendered_script_to_s3,
# op_kwargs={
# "bucket_name": bucket_name,
# "aws_conn_id": "aws_conn_id",
# "template_s3_key": "source/script/glue_game_rating_template.py",
# "rendered_s3_key": "source/script/glue_game_rating_script.py",
# # into template
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_rating/year={year}/month={month}/day={day}/",
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/year={year}/month={month}/day={day}/",
# "collect_date": f"{year}-{month}-{day}",
# },
# )

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
job_name="de-2-1_game_rating",
Expand All @@ -92,5 +79,24 @@ def upload_rendered_script_to_s3(
aws_conn_id="aws_conn_id",
)

# upload_script >> run_glue_job >> wait_for_job
run_glue_job >> wait_for_job
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
glue_crawler_config = {
"Name": "de-2-1-raw_game_rating",
"Role": glue_crawler_arn,
"DatabaseName": "de_2_1_glue",
"Targets": {
"S3Targets": [
{
"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/"
}
]
},
}

crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
aws_conn_id="aws_conn_id",
)

run_glue_job >> wait_for_job >> crawl_s3
36 changes: 21 additions & 15 deletions dags/glue/glue_followers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable

# from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor

Expand Down Expand Up @@ -43,7 +45,7 @@ def upload_rendered_script_to_s3(
},
max_active_runs=1,
tags=["glue", "streaming"],
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
catchup=True,
) as dag:

Expand All @@ -53,19 +55,6 @@ def upload_rendered_script_to_s3(
month = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').month }}"
day = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').day }}"

# upload_script = PythonOperator(
# task_id="upload_script_to_s3",
# python_callable=upload_rendered_script_to_s3,
# op_kwargs={
# "bucket_name": bucket_name,
# "aws_conn_id": "aws_conn_id",
# "template_s3_key": "source/script/glue_followers_template.py",
# "rendered_s3_key": "source/script/glue_followers_script.py",
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/",
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/",
# },
# )

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
job_name="de-2-1_followers",
Expand All @@ -88,5 +77,22 @@ def upload_rendered_script_to_s3(
aws_conn_id="aws_conn_id",
)

glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
glue_crawler_config = {
"Name": "de-2-1-followers",
"Role": glue_crawler_arn,
"DatabaseName": "de_2_1_glue",
"Targets": {
"S3Targets": [
{"Path": "s3://de-2-1-bucket/source/parquet/table_name=followers/"}
]
},
}

crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
aws_conn_id="aws_conn_id",
)

run_glue_job >> wait_for_job
run_glue_job >> wait_for_job >> crawl_s3
30 changes: 23 additions & 7 deletions dags/glue/live_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import time

from airflow import DAG
from airflow.models import Variable

from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.sensors.time_delta_sensor import TimeDeltaSensor

from jinja2 import Template

Expand Down Expand Up @@ -48,7 +49,7 @@ def upload_rendered_script_to_s3(
tags=["glue", "streaming"],
schedule_interval="0 * * * *",
catchup=True,
max_active_runs=1
max_active_runs=1,
) as dag:

bucket_name = "de-2-1-bucket"
Expand Down Expand Up @@ -80,10 +81,25 @@ def upload_rendered_script_to_s3(
run_id=run_glue_job.output,
aws_conn_id="aws_conn_id",
)
sleep_task = PythonOperator(
task_id='sleep_for_60_seconds',
python_callable=sleep_for_60_seconds,
dag=dag,

glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
glue_crawler_config = {
"Name": "de-2-1-raw_live_viewer",
"Role": glue_crawler_arn,
"DatabaseName": "de_2_1_glue",
"Targets": {
"S3Targets": [
{
"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/"
}
]
},
}

crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
aws_conn_id="aws_conn_id",
)

run_glue_job >> wait_for_job >> sleep_task
run_glue_job >> wait_for_job >> crawl_s3

0 comments on commit 7b91564

Please sign in to comment.