diff --git a/dags/game/steam_games_banner.py b/dags/game/steam_games_banner.py new file mode 100644 index 0000000..960e335 --- /dev/null +++ b/dags/game/steam_games_banner.py @@ -0,0 +1,83 @@ +from datetime import datetime, timedelta + +import requests + +import slack +import time + +from airflow import DAG +from airflow.providers.mysql.hooks.mysql import MySqlHook +from airflow.models import Variable +from airflow.operators.python import PythonOperator +from requests.exceptions import RequestException + + +def get_game_info_from_api(appid, params): + session = requests.Session() # 세션을 사용하여 연결 재사용 + url = f"http://store.steampowered.com/api/appdetails?appids={appid}&cc=kr" + + for _ in range(3): # 최대 3번 재시도 + try: + response = session.get(url, params=params) + response.raise_for_status() # 상태 코드가 200이 아니면 예외 발생 + data = response.json() + if data[appid]["success"]: + return data[appid]["data"]["header_image"] + except (RequestException, KeyError): + time.sleep(90) # 재시도 전 90초 대기 + return None # 3번 시도 후 성공하지 못하면 None 반환 + + +def update_banner_url_in_db(appid, banner_url): + mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") + update_stmt = """ + UPDATE GAME_INFO + SET GAME_BANNER_URL = %s + WHERE GAME_ID = %s + """ + mysql_hook.run(update_stmt, parameters=(banner_url, appid)) + + +def get_rds_game_info(): + mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") + query = """ + SELECT game_id, game_nm + FROM GAME_INFO + WHERE IS_TOP300 IN ('T', 'F'); + """ + records = mysql_hook.get_records(query) or [] + + api_key = Variable.get("steam_api_key") + params = {"key": api_key, "json": "1"} + + for appid, name in records: + banner_url = get_game_info_from_api(str(appid), params) + if banner_url: + update_banner_url_in_db(str(appid), banner_url) + else: + print(f"Failed to get banner for {name} (AppID: {appid})") + + +# dag codes +with DAG( + "steam_game_banner_raw", + default_args={ + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2024, 1, 17), + "retries": 1, + "retry_delay": timedelta(minutes=5), + }, + schedule_interval="@once", + tags=["game", "banner"], + catchup=False, +) as dag: + + get_steam_game_banner = PythonOperator( + task_id="get_steam_game_banner_task", + python_callable=get_rds_game_info, + on_failure_callback=slack.on_failure_callback, + ) + + +get_steam_game_banner diff --git a/dags/streaming/streamers_banner.py b/dags/streaming/streamers_banner.py new file mode 100644 index 0000000..5beba48 --- /dev/null +++ b/dags/streaming/streamers_banner.py @@ -0,0 +1,125 @@ +from datetime import datetime, timedelta + +import logging +import requests + +import slack + +from requests.exceptions import RequestException + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.mysql.hooks.mysql import MySqlHook +from airflow.exceptions import AirflowException + + +# get streamer_list in rds +def get_s_list(**kwargs): + # RDS 연결 설정 + mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") + query = "SELECT STREAMER_ID, CHZ_ID, AFC_ID FROM STREAMER_INFO;" + try: + result = mysql_hook.get_records(query) or [] + chzzk = [(row[0], row[1]) for row in result if row[1]] + afc = [(row[0], row[2]) for row in result if row[2]] + except Exception as e: + logging.error(f"Error occurred while fetching streamer list: {e}") + raise AirflowException("Failed to fetch streamer list from RDS.") + + kwargs["ti"].xcom_push(key="chzzk", value=chzzk) + kwargs["ti"].xcom_push(key="afc", value=afc) + + +def chzzk_banner(**kwargs): + chzzk_ids = kwargs["ti"].xcom_pull(key="chzzk", task_ids="get_s_list_task") + mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") + + for s_id, chzzk_id in chzzk_ids: + try: + res = requests.get( + f"https://api.chzzk.naver.com/service/v1/channels/{chzzk_id}" + ) + res.raise_for_status() # Raises an HTTPError if the HTTP request returned an unsuccessful status code. + banner_url = res.json()["content"]["channelImageUrl"] + + update_stmt = """ + UPDATE STREAMER_INFO + SET CHZ_BANNER_URL = %s + WHERE CHZ_ID = %s + """ + mysql_hook.run(update_stmt, parameters=(banner_url, chzzk_id)) + except RequestException as e: + logging.error(f"Request failed: {e}") + except KeyError as e: + logging.error(f"Key error: {e}") + except Exception as e: + logging.error(f"Unexpected error: {e}") + raise AirflowException( + f"Failed to update CHZ_BANNER_URL for CHZ_ID {chzzk_id}" + ) + + +def afreeca_banner(**kwargs): + afreeca_ids = kwargs["ti"].xcom_pull(key="afc", task_ids="get_s_list_task") + mysql_hook = MySqlHook(mysql_conn_id="aws_rds_conn_id") + headers = { + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + } + + for s_id, bjid in afreeca_ids: + try: + res = requests.get( + f"https://bjapi.afreecatv.com/api/{bjid}/station", headers=headers + ) + res.raise_for_status() + banner_url = f'https:{res.json().get("profile_image")}' + + update_stmt = """ + UPDATE STREAMER_INFO + SET AFC_BANNER_URL = %s + WHERE AFC_ID = %s + """ + mysql_hook.run(update_stmt, parameters=(banner_url, bjid)) + except RequestException as e: + logging.error(f"Request failed: {e}") + except KeyError as e: + logging.error(f"Key error: {e}") + except Exception as e: + logging.error(f"Unexpected error: {e}") + raise AirflowException(f"Failed to update AFC_BANNER_URL for AFC_ID {bjid}") + + +# dag codes +with DAG( + "streamer_banner_raw", + default_args={ + "owner": "airflow", + "depends_on_past": False, + "start_date": datetime(2024, 1, 17), + "retries": 1, + "retry_delay": timedelta(minutes=5), + }, + schedule_interval="@once", + tags=["Streamer", "banner"], + catchup=False, +) as dag: + + # load + + task_get_s_list = PythonOperator( + task_id="get_s_list_task", + python_callable=get_s_list, + on_failure_callback=slack.on_failure_callback, + ) + task_raw_chzzk = PythonOperator( + task_id="chzzk_raw_task", + python_callable=chzzk_banner, + on_failure_callback=slack.on_failure_callback, + ) + task_raw_afreeca = PythonOperator( + task_id="afreeca_raw_task", + python_callable=afreeca_banner, + on_failure_callback=slack.on_failure_callback, + ) + +task_get_s_list >> [task_raw_chzzk, task_raw_afreeca]