Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/consumption #34

Merged
merged 4 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dags/energy/consumption/electricity_consumption_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator


def process_data():
Expand Down Expand Up @@ -128,4 +129,11 @@ def process_data():
dag=dag,
)

trigger_target_dag = TriggerDagRunOperator(
task_id="trigger_target_dag",
trigger_dag_id="electricity_consumption_elt", # 트리거하려는 대상 DAG의 ID
dag=dag,
)

process_data >> upload_to_gcs >> create_table_if_not_exist >> gcs_to_bigquery
gcs_to_bigquery >> trigger_target_dag
2 changes: 1 addition & 1 deletion dags/energy/consumption/energy_consumption_bill_elt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def process_data(**kwargs):

default_args = {
"owner": "airflow",
"start_date": datetime(2024, 1, 2),
"start_date": datetime(2024, 1, 10),
"retries": 1,
}

Expand Down
66 changes: 44 additions & 22 deletions dags/energy/consumption/energy_consumption_bill_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator


def read_csv_from_gcs():
Expand Down Expand Up @@ -44,26 +45,41 @@ def process_data(**kwargs):

for code in region_codes.법정동코드:
params = {'serviceKey': 'IfMicP9ax2V2RmsEiy8nE8UW0OuO4zyv/DINJE/x6H5FVPTFKAFjM5scKDPGlgu9m05/ygawZ9h3egOzpH7usw==', 'sigunguCode': code, 'searchDate': f'{year}{month:02d}'}
response = requests.get(url, params=params)

# XML 응답 파싱
root = ET.fromstring(response.content)
for item in root.findall('.//item'):
electricity = item.find('elect').text if item.find('elect') is not None else None
heat = item.find('heat').text if item.find('heat') is not None else None
water_cool = item.find('waterCool').text if item.find('waterCool') is not None else None
water_hot = item.find('waterHot').text if item.find('waterHot') is not None else None

# 임시 데이터프레임 생성 및 병합
temp_df = pd.DataFrame([{
'year_month': f'{year}{month:02d}',
'district_code': code,
'electricity': electricity,
'heat': heat,
'water_cool': water_cool,
'water_hot': water_hot
}])
energy_consumption_bill = pd.concat([energy_consumption_bill, temp_df], ignore_index=True)

try:
response = requests.get(url, params=params)
response.raise_for_status() # 응답 상태 확인

if response.headers['Content-Type'] != 'application/xml':
print(year, month, code, '응답이 XML 형식이 아닙니다.')
continue # XML이 아니면 건너뛰기

# XML 응답 파싱
root = ET.fromstring(response.content)
for item in root.findall('.//item'):
electricity = item.find('elect').text if item.find('elect') is not None else None
heat = item.find('heat').text if item.find('heat') is not None else None
water_cool = item.find('waterCool').text if item.find('waterCool') is not None else None
water_hot = item.find('waterHot').text if item.find('waterHot') is not None else None

# 임시 데이터프레임 생성 및 병합
temp_df = pd.DataFrame([{
'year_month': f'{year}{month:02d}',
'district_code': code,
'electricity': electricity,
'heat': heat,
'water_cool': water_cool,
'water_hot': water_hot
}])
energy_consumption_bill = pd.concat([energy_consumption_bill, temp_df], ignore_index=True)

except requests.exceptions.HTTPError as http_err:
print(year, month, code, f'HTTP 에러 발생: {http_err}') # HTTP 에러 출력
except ET.ParseError as parse_err:
print(year, month, code, f'XML 파싱 에러 발생: {parse_err}') # 파싱 에러 출력
except Exception as err:
print(year, month, code, f'기타 에러 발생: {err}') # 기타 예외 처리

print("checkpoint", year, month)
print(energy_consumption_bill)
energy_consumption_bill.to_csv("dags/data/energy_consumption_bill.csv", index=False)
Expand All @@ -73,7 +89,7 @@ def process_data(**kwargs):
default_args = {
"owner": "airflow",
"start_date": datetime(2024, 1, 1),
"retries": 1,
"retries": 2,
}

dag = DAG(
Expand Down Expand Up @@ -137,5 +153,11 @@ def process_data(**kwargs):
dag=dag,
)

trigger_target_dag = TriggerDagRunOperator(
task_id="trigger_target_dag",
trigger_dag_id="energy_consumption_bill_elt", # 트리거하려는 대상 DAG의 ID
dag=dag,
)

read_csv >> process_data >> upload_to_gcs
upload_to_gcs >> create_table_if_not_exist >> gcs_to_bigquery
upload_to_gcs >> create_table_if_not_exist >> gcs_to_bigquery >> trigger_target_dag