-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #99 from zizzic/feature/get_image_url_dag
Feature/get image url dag
- Loading branch information
Showing
2 changed files
with
208 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
from datetime import datetime, timedelta | ||
|
||
import requests | ||
|
||
import slack | ||
import time | ||
|
||
from airflow import DAG | ||
from airflow.providers.mysql.hooks.mysql import MySqlHook | ||
from airflow.models import Variable | ||
from airflow.operators.python import PythonOperator | ||
from requests.exceptions import RequestException | ||
|
||
|
||
def get_game_info_from_api(appid, params): | ||
session = requests.Session() # 세션을 사용하여 연결 재사용 | ||
url = f"http://store.steampowered.com/api/appdetails?appids={appid}&cc=kr" | ||
|
||
for _ in range(3): # 최대 3번 재시도 | ||
try: | ||
response = session.get(url, params=params) | ||
response.raise_for_status() # 상태 코드가 200이 아니면 예외 발생 | ||
data = response.json() | ||
if data[appid]["success"]: | ||
return data[appid]["data"]["header_image"] | ||
except (RequestException, KeyError): | ||
time.sleep(90) # 재시도 전 90초 대기 | ||
return None # 3번 시도 후 성공하지 못하면 None 반환 | ||
|
||
|
||
def update_banner_url_in_db(appid, banner_url): | ||
mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") | ||
update_stmt = """ | ||
UPDATE GAME_INFO | ||
SET GAME_BANNER_URL = %s | ||
WHERE GAME_ID = %s | ||
""" | ||
mysql_hook.run(update_stmt, parameters=(banner_url, appid)) | ||
|
||
|
||
def get_rds_game_info(): | ||
mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") | ||
query = """ | ||
SELECT game_id, game_nm | ||
FROM GAME_INFO | ||
WHERE IS_TOP300 IN ('T', 'F'); | ||
""" | ||
records = mysql_hook.get_records(query) or [] | ||
|
||
api_key = Variable.get("steam_api_key") | ||
params = {"key": api_key, "json": "1"} | ||
|
||
for appid, name in records: | ||
banner_url = get_game_info_from_api(str(appid), params) | ||
if banner_url: | ||
update_banner_url_in_db(str(appid), banner_url) | ||
else: | ||
print(f"Failed to get banner for {name} (AppID: {appid})") | ||
|
||
|
||
# dag codes | ||
with DAG( | ||
"steam_game_banner_raw", | ||
default_args={ | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"start_date": datetime(2024, 1, 17), | ||
"retries": 1, | ||
"retry_delay": timedelta(minutes=5), | ||
}, | ||
schedule_interval="@once", | ||
tags=["game", "banner"], | ||
catchup=False, | ||
) as dag: | ||
|
||
get_steam_game_banner = PythonOperator( | ||
task_id="get_steam_game_banner_task", | ||
python_callable=get_rds_game_info, | ||
on_failure_callback=slack.on_failure_callback, | ||
) | ||
|
||
|
||
get_steam_game_banner |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
from datetime import datetime, timedelta | ||
|
||
import logging | ||
import requests | ||
|
||
import slack | ||
|
||
from requests.exceptions import RequestException | ||
|
||
from airflow import DAG | ||
from airflow.operators.python import PythonOperator | ||
from airflow.providers.mysql.hooks.mysql import MySqlHook | ||
from airflow.exceptions import AirflowException | ||
|
||
|
||
# get streamer_list in rds | ||
def get_s_list(**kwargs): | ||
# RDS 연결 설정 | ||
mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") | ||
query = "SELECT STREAMER_ID, CHZ_ID, AFC_ID FROM STREAMER_INFO;" | ||
try: | ||
result = mysql_hook.get_records(query) or [] | ||
chzzk = [(row[0], row[1]) for row in result if row[1]] | ||
afc = [(row[0], row[2]) for row in result if row[2]] | ||
except Exception as e: | ||
logging.error(f"Error occurred while fetching streamer list: {e}") | ||
raise AirflowException("Failed to fetch streamer list from RDS.") | ||
|
||
kwargs["ti"].xcom_push(key="chzzk", value=chzzk) | ||
kwargs["ti"].xcom_push(key="afc", value=afc) | ||
|
||
|
||
def chzzk_banner(**kwargs): | ||
chzzk_ids = kwargs["ti"].xcom_pull(key="chzzk", task_ids="get_s_list_task") | ||
mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") | ||
|
||
for s_id, chzzk_id in chzzk_ids: | ||
try: | ||
res = requests.get( | ||
f"https://api.chzzk.naver.com/service/v1/channels/{chzzk_id}" | ||
) | ||
res.raise_for_status() # Raises an HTTPError if the HTTP request returned an unsuccessful status code. | ||
banner_url = res.json()["content"]["channelImageUrl"] | ||
|
||
update_stmt = """ | ||
UPDATE STREAMER_INFO | ||
SET CHZ_BANNER_URL = %s | ||
WHERE CHZ_ID = %s | ||
""" | ||
mysql_hook.run(update_stmt, parameters=(banner_url, chzzk_id)) | ||
except RequestException as e: | ||
logging.error(f"Request failed: {e}") | ||
except KeyError as e: | ||
logging.error(f"Key error: {e}") | ||
except Exception as e: | ||
logging.error(f"Unexpected error: {e}") | ||
raise AirflowException( | ||
f"Failed to update CHZ_BANNER_URL for CHZ_ID {chzzk_id}" | ||
) | ||
|
||
|
||
def afreeca_banner(**kwargs): | ||
afreeca_ids = kwargs["ti"].xcom_pull(key="afc", task_ids="get_s_list_task") | ||
mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") | ||
headers = { | ||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", | ||
} | ||
|
||
for s_id, bjid in afreeca_ids: | ||
try: | ||
res = requests.get( | ||
f"https://bjapi.afreecatv.com/api/{bjid}/station", headers=headers | ||
) | ||
res.raise_for_status() | ||
banner_url = f'https:{res.json().get("profile_image")}' | ||
|
||
update_stmt = """ | ||
UPDATE STREAMER_INFO | ||
SET AFC_BANNER_URL = %s | ||
WHERE AFC_ID = %s | ||
""" | ||
mysql_hook.run(update_stmt, parameters=(banner_url, bjid)) | ||
except RequestException as e: | ||
logging.error(f"Request failed: {e}") | ||
except KeyError as e: | ||
logging.error(f"Key error: {e}") | ||
except Exception as e: | ||
logging.error(f"Unexpected error: {e}") | ||
raise AirflowException(f"Failed to update AFC_BANNER_URL for AFC_ID {bjid}") | ||
|
||
|
||
# dag codes | ||
with DAG( | ||
"streamer_banner_raw", | ||
default_args={ | ||
"owner": "airflow", | ||
"depends_on_past": False, | ||
"start_date": datetime(2024, 1, 17), | ||
"retries": 1, | ||
"retry_delay": timedelta(minutes=5), | ||
}, | ||
schedule_interval="@once", | ||
tags=["Streamer", "banner"], | ||
catchup=False, | ||
) as dag: | ||
|
||
# load | ||
|
||
task_get_s_list = PythonOperator( | ||
task_id="get_s_list_task", | ||
python_callable=get_s_list, | ||
on_failure_callback=slack.on_failure_callback, | ||
) | ||
task_raw_chzzk = PythonOperator( | ||
task_id="chzzk_raw_task", | ||
python_callable=chzzk_banner, | ||
on_failure_callback=slack.on_failure_callback, | ||
) | ||
task_raw_afreeca = PythonOperator( | ||
task_id="afreeca_raw_task", | ||
python_callable=afreeca_banner, | ||
on_failure_callback=slack.on_failure_callback, | ||
) | ||
|
||
task_get_s_list >> [task_raw_chzzk, task_raw_afreeca] |