Skip to content

Commit

Permalink
Merge pull request #101 from zizzic/feature/glue
Browse files Browse the repository at this point in the history
Feature/glue
  • Loading branch information
mediwind authored Feb 24, 2024
2 parents d79a917 + 421dcc0 commit 4e640f6
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 4 deletions.
2 changes: 1 addition & 1 deletion dags/glue/game_ccu.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def upload_rendered_script_to_s3(
job_name="de-2-1_game_ccu",
# Job ID extracted from previous Glue Job Operator task
run_id=run_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
verbose=False, # prints glue job logs in airflow logs
# region_name="ap-northeast-2",
aws_conn_id="aws_conn_id",
)
Expand Down
2 changes: 1 addition & 1 deletion dags/glue/game_price.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def upload_rendered_script_to_s3(
job_name="de-2-1_game_price",
# Job ID extracted from previous Glue Job Operator task
run_id=run_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
verbose=False, # prints glue job logs in airflow logs
# region_name="ap-northeast-2",
aws_conn_id="aws_conn_id",
)
Expand Down
90 changes: 90 additions & 0 deletions dags/glue/game_rating.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook

from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from jinja2 import Template


def upload_rendered_script_to_s3(
bucket_name, template_s3_key, rendered_s3_key, aws_conn_id, **kwargs
):
# S3Hook 인스턴스 생성
s3_hook = S3Hook(aws_conn_id=aws_conn_id)

# S3에서 Jinja 템플릿 파일 읽기
template_str = s3_hook.read_key(template_s3_key, bucket_name)

# Jinja 템플릿 렌더링
template = Template(template_str)
rendered_script = template.render(**kwargs)

# 렌더링된 스크립트를 S3에 업로드
s3_hook.load_string(
string_data=rendered_script,
bucket_name=bucket_name,
key=rendered_s3_key,
replace=True,
)


with DAG(
"glue_game_rating",
default_args={
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 2, 22),
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
schedule_interval="0 1 * * *",
tags=["glue", "Game_Rating"],
catchup=False,
) as dag:

bucket_name = "de-2-1-bucket"
current_time = "{{ data_interval_end.in_timezone('Asia/Seoul').strftime('%Y-%m-%dT%H:%M:%S+00:00') }}"
year = "{{ data_interval_end.in_timezone('Asia/Seoul').year }}"
month = "{{ data_interval_end.in_timezone('Asia/Seoul').month }}"
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
hour = "{{ (data_interval_end - macros.timedelta(hours=1)).in_timezone('Asia/Seoul') }}" # before 1 hour

upload_script = PythonOperator(
task_id="upload_script_to_s3",
python_callable=upload_rendered_script_to_s3,
op_kwargs={
"bucket_name": bucket_name,
"aws_conn_id": "aws_conn_id",
"template_s3_key": "source/script/glue_game_rating_template.py",
"rendered_s3_key": "source/script/glue_game_rating_script.py",
# into template
"input_path": f"s3://de-2-1-bucket/source/json/table_name=raw_game_rating/year={year}/month={month}/day={day}/",
"output_path": f"s3://de-2-1-bucket/source/parquet/table_name=raw_game_rating/year={year}/month={month}/day={day}/",
"collect_date": f"{year}-{month}-{day}",
},
)

run_glue_job = GlueJobOperator(
task_id="run_glue_job",
job_name="de-2-1_game_rating",
script_location="s3://de-2-1-bucket/source/script/glue_game_rating_script.py",
aws_conn_id="aws_conn_id",
region_name="ap-northeast-2",
iam_role_name="AWSGlueServiceRole-crawler",
dag=dag,
)

wait_for_job = GlueJobSensor( # trigger
task_id="wait_for_job_game_rating_glue_job", # task_id 직관적으로 알 수 있도록 변경 권장
job_name="de-2-1_game_rating",
# Job ID extracted from previous Glue Job Operator task
run_id=run_glue_job.output,
verbose=False, # prints glue job logs in airflow logs
# region_name="ap-northeast-2",
aws_conn_id="aws_conn_id",
)

upload_script >> run_glue_job >> wait_for_job
13 changes: 12 additions & 1 deletion dags/glue/live_viewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor

from jinja2 import Template

Expand Down Expand Up @@ -74,5 +75,15 @@ def upload_rendered_script_to_s3(
iam_role_name="AWSGlueServiceRole-crawler",
dag=dag,
)
wait_for_job = GlueJobSensor( # trigger
task_id="wait_for_job_live_viewer_job", # task_id 직관적으로 알 수 있도록 변경 권장
job_name="de-2-1_live_viewer",
# Job ID extracted from previous Glue Job Operator task
run_id=run_glue_job.output,
verbose=False, # prints glue job logs in airflow logs
# region_name="ap-northeast-2",
aws_conn_id="aws_conn_id",
)


upload_script >> run_glue_job
upload_script >> run_glue_job >> wait_for_job
4 changes: 3 additions & 1 deletion dags/streaming/stream_data_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def chzzk_raw(current_time, **kwargs):
live_data = res.json()
if not live_data["content"]["liveId"]:
time.sleep(5)
live_data = requests.get(f"https://api.chzzk.naver.com/service/v2/channels/{id}/live-detail").json()
live_data = requests.get(
f"https://api.chzzk.naver.com/service/v2/channels/{id}/live-detail"
).json()
try:
live = live_data["content"]["status"]
if live == "OPEN":
Expand Down
59 changes: 59 additions & 0 deletions dataCollector/glue&athena/glue_game_rating_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import sys

from pyspark.context import SparkContext
from pyspark.sql.functions import col, lit, explode

from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from awsglue.utils import getResolvedOptions

# SparkContext와 GlueContext 초기화
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Job 초기화 (Job Bookmark 활성화 포함)
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
job.init(args["JOB_NAME"], args)

# S3에서 데이터를 읽어오는 부분
datasource = glueContext.create_dynamic_frame.from_options(
"s3",
{"paths": ["{{ input_path }}"], "recurse": True},
format="json",
transformation_ctx="datasource",
)

# 전처리를 위해 DF로 변환하기
game_rating_datasource = datasource.toDF()

# 최상위 레벨의 key를 중심으로 explode하기
df = game_rating_datasource.select(
explode(game_rating_datasource.raw_game_rating).alias("raw_game_rating")
)

df = df.select(
col("raw_game_rating.game_id").alias("GAME_ID"),
lit("{{ collect_time }}").alias("COLLECT_DATE"),
col("raw_game_rating.recent_positive_num").alias("RECENT_POSITIVE_NUM"),
col("raw_game_rating.recent_positive_percent").alias("RECENT_POSITIVE_PERCENT"),
col("raw_game_rating.all_positive_num").alias("ALL_POSITIVE_NUM"),
col("raw_game_rating.all_positive_percent").alias("ALL_POSITIVE_PERCENT"),
)

dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame")

# Parquet으로 변환하여 S3에 저장
glueContext.write_dynamic_frame.from_options(
frame=dynamic_frame,
connection_type="s3",
connection_options={"path": "{{ output_path }}"},
format="parquet",
transformation_ctx="dynamic_frame",
)


# Job Bookmark의 상태를 최종적으로 커밋
job.commit()

0 comments on commit 4e640f6

Please sign in to comment.