Skip to content

Commit

Permalink
Merge pull request #110 from zizzic/feature/glue_error_check
Browse files Browse the repository at this point in the history
[hotfix] follow.py change local_file change, schedule_interval change…
  • Loading branch information
poriz authored Feb 27, 2024
2 parents 2aa1706 + ccdecd9 commit dde6455
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions dags/streaming/follower.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def chzzk_raw(current_time, **kwargs):

raise AirflowException(error_msg)

with open(f"./chzzk_{current_time}.json", "w") as f:
with open(f"./chzzk_f_{current_time}.json", "w") as f:
json.dump(live_stream_data, f, indent=4)


Expand Down Expand Up @@ -113,20 +113,20 @@ def get_broad_info(bjid, headers):
logging.error((error_msg))
raise AirflowException(error_msg)

with open(f"./afc_{current_time}.json", "w") as f:
with open(f"./afc_f_{current_time}.json", "w") as f:
json.dump(live_stream_data, f, indent=4)


def merge_json(current_time, local_path, **kwargs):
# 파일 읽고 기존 데이터 로드
try:
with open(f"./chzzk_{current_time}.json", "r") as f:
with open(f"./chzzk_f_{current_time}.json", "r") as f:
chzzk_data = json.load(f)
except FileNotFoundError:
chzzk_data = []

try:
with open(f"./afc_{current_time}.json", "r") as f:
with open(f"./afc_f_{current_time}.json", "r") as f:
afreeca_data = json.load(f)
except FileNotFoundError:
afreeca_data = []
Expand Down Expand Up @@ -190,7 +190,7 @@ def delete_files(**kwargs):
"retries": 1,
"retry_delay": timedelta(minutes=5),
},
schedule_interval="0 * * * *",
schedule_interval="2 * * * *",
tags=["Streaming", "follower"],
catchup=False,
) as dag:
Expand All @@ -202,7 +202,7 @@ def delete_files(**kwargs):
day = "{{ data_interval_end.in_timezone('Asia/Seoul').day }}"
hour = "{{ data_interval_end.in_timezone('Asia/Seoul').hour }}"
table_name = "followers"
local_name = "local_raw_live"
local_name = "local_followers"
local_path = f"./{local_name}_{current_time}.json"

task_get_s_list = PythonOperator(
Expand Down

0 comments on commit dde6455

Please sign in to comment.