Skip to content

Commit c6771c1

Browse files
committed
Merge branch 'develop' of https://github.com/zizzic/airflow_repo into develop
2 parents 0fc08ec + 7b91564 commit c6771c1

File tree

4 files changed

+90
-43
lines changed

4 files changed

+90
-43
lines changed

dags/glue/game_price.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from datetime import datetime, timedelta
22

33
from airflow import DAG
4+
from airflow.models import Variable
45

56
# from airflow.operators.python import PythonOperator
67
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
8+
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
79
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
810

911
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
@@ -42,7 +44,7 @@ def upload_rendered_script_to_s3(
4244
"retry_delay": timedelta(seconds=15),
4345
},
4446
max_active_runs=1,
45-
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
47+
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
4648
tags=["glue", "Game_Price"],
4749
catchup=True,
4850
) as dag:
@@ -92,5 +94,22 @@ def upload_rendered_script_to_s3(
9294
aws_conn_id="aws_conn_id",
9395
)
9496

95-
# upload_script >> run_glue_job >> wait_for_job
96-
run_glue_job >> wait_for_job
97+
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
98+
glue_crawler_config = {
99+
"Name": "de-2-1-raw_game_price",
100+
"Role": glue_crawler_arn,
101+
"DatabaseName": "de_2_1_glue",
102+
"Targets": {
103+
"S3Targets": [
104+
{"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_game_price/"}
105+
]
106+
},
107+
}
108+
109+
crawl_s3 = GlueCrawlerOperator(
110+
task_id="crawl_s3",
111+
config=glue_crawler_config,
112+
aws_conn_id="aws_conn_id",
113+
)
114+
115+
run_glue_job >> wait_for_job >> crawl_s3

dags/glue/game_rating.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from datetime import datetime, timedelta
22

33
from airflow import DAG
4+
from airflow.models import Variable
45

56
# from airflow.operators.python import PythonOperator
67
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
8+
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
79
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
810

911
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
@@ -42,7 +44,7 @@ def upload_rendered_script_to_s3(
4244
"retry_delay": timedelta(seconds=15),
4345
},
4446
max_active_runs=1,
45-
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
47+
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
4648
tags=["glue", "Game_Rating"],
4749
catchup=True,
4850
) as dag:
@@ -54,21 +56,6 @@ def upload_rendered_script_to_s3(
5456
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
5557
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul') }}" # before 1 hour
5658

57-
# upload_script = PythonOperator(
58-
# task_id="upload_script_to_s3",
59-
# python_callable=upload_rendered_script_to_s3,
60-
# op_kwargs={
61-
# "bucket_name": bucket_name,
62-
# "aws_conn_id": "aws_conn_id",
63-
# "template_s3_key": "source/script/glue_game_rating_template.py",
64-
# "rendered_s3_key": "source/script/glue_game_rating_script.py",
65-
# # into template
66-
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_rating/year={year}/month={month}/day={day}/",
67-
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/year={year}/month={month}/day={day}/",
68-
# "collect_date": f"{year}-{month}-{day}",
69-
# },
70-
# )
71-
7259
run_glue_job = GlueJobOperator(
7360
task_id="run_glue_job",
7461
job_name="de-2-1_game_rating",
@@ -92,5 +79,24 @@ def upload_rendered_script_to_s3(
9279
aws_conn_id="aws_conn_id",
9380
)
9481

95-
# upload_script >> run_glue_job >> wait_for_job
96-
run_glue_job >> wait_for_job
82+
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
83+
glue_crawler_config = {
84+
"Name": "de-2-1-raw_game_rating",
85+
"Role": glue_crawler_arn,
86+
"DatabaseName": "de_2_1_glue",
87+
"Targets": {
88+
"S3Targets": [
89+
{
90+
"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/"
91+
}
92+
]
93+
},
94+
}
95+
96+
crawl_s3 = GlueCrawlerOperator(
97+
task_id="crawl_s3",
98+
config=glue_crawler_config,
99+
aws_conn_id="aws_conn_id",
100+
)
101+
102+
run_glue_job >> wait_for_job >> crawl_s3

dags/glue/glue_followers.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from datetime import datetime, timedelta
22

33
from airflow import DAG
4+
from airflow.models import Variable
45

56
# from airflow.operators.python import PythonOperator
67
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
8+
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
79
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
810
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
911

@@ -43,7 +45,7 @@ def upload_rendered_script_to_s3(
4345
},
4446
max_active_runs=1,
4547
tags=["glue", "streaming"],
46-
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
48+
schedule_interval="0 16 * * *", # 한국시간 새벽 1시
4749
catchup=True,
4850
) as dag:
4951

@@ -53,19 +55,6 @@ def upload_rendered_script_to_s3(
5355
month = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').month }}"
5456
day = "{{ (data_interval_end - macros.timedelta(days=1)).in_timezone('Asia/Seoul').day }}"
5557

56-
# upload_script = PythonOperator(
57-
# task_id="upload_script_to_s3",
58-
# python_callable=upload_rendered_script_to_s3,
59-
# op_kwargs={
60-
# "bucket_name": bucket_name,
61-
# "aws_conn_id": "aws_conn_id",
62-
# "template_s3_key": "source/script/glue_followers_template.py",
63-
# "rendered_s3_key": "source/script/glue_followers_script.py",
64-
# "input_path": f"s3://de-2-1-bucket/source/json/table_name=followers/year={year}/month={month}/day={day}/",
65-
# "output_path": f"s3://de-2-1-bucket/source/parquet/table_name=followers/year={year}/month={month}/day={day}/",
66-
# },
67-
# )
68-
6958
run_glue_job = GlueJobOperator(
7059
task_id="run_glue_job",
7160
job_name="de-2-1_followers",
@@ -88,5 +77,22 @@ def upload_rendered_script_to_s3(
8877
aws_conn_id="aws_conn_id",
8978
)
9079

80+
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
81+
glue_crawler_config = {
82+
"Name": "de-2-1-followers",
83+
"Role": glue_crawler_arn,
84+
"DatabaseName": "de_2_1_glue",
85+
"Targets": {
86+
"S3Targets": [
87+
{"Path": "s3://de-2-1-bucket/source/parquet/table_name=followers/"}
88+
]
89+
},
90+
}
91+
92+
crawl_s3 = GlueCrawlerOperator(
93+
task_id="crawl_s3",
94+
config=glue_crawler_config,
95+
aws_conn_id="aws_conn_id",
96+
)
9197

92-
run_glue_job >> wait_for_job
98+
run_glue_job >> wait_for_job >> crawl_s3

dags/glue/live_viewer.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@
22
import time
33

44
from airflow import DAG
5+
from airflow.models import Variable
56

67
from airflow.operators.python import PythonOperator
78
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
9+
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
810
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
911
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
10-
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
1112

1213
from jinja2 import Template
1314

@@ -48,7 +49,7 @@ def upload_rendered_script_to_s3(
4849
tags=["glue", "streaming"],
4950
schedule_interval="0 * * * *",
5051
catchup=True,
51-
max_active_runs=1
52+
max_active_runs=1,
5253
) as dag:
5354

5455
bucket_name = "de-2-1-bucket"
@@ -80,10 +81,25 @@ def upload_rendered_script_to_s3(
8081
run_id=run_glue_job.output,
8182
aws_conn_id="aws_conn_id",
8283
)
83-
sleep_task = PythonOperator(
84-
task_id='sleep_for_60_seconds',
85-
python_callable=sleep_for_60_seconds,
86-
dag=dag,
84+
85+
glue_crawler_arn = Variable.get("glue_crawler_arn_secret")
86+
glue_crawler_config = {
87+
"Name": "de-2-1-raw_live_viewer",
88+
"Role": glue_crawler_arn,
89+
"DatabaseName": "de_2_1_glue",
90+
"Targets": {
91+
"S3Targets": [
92+
{
93+
"Path": "s3://de-2-1-bucket/source/parquet/table_name=raw_live_viewer/"
94+
}
95+
]
96+
},
97+
}
98+
99+
crawl_s3 = GlueCrawlerOperator(
100+
task_id="crawl_s3",
101+
config=glue_crawler_config,
102+
aws_conn_id="aws_conn_id",
87103
)
88104

89-
run_glue_job >> wait_for_job >> sleep_task
105+
run_glue_job >> wait_for_job >> crawl_s3

0 commit comments

Comments
 (0)