Skip to content

Commit

Permalink
Merge pull request #34 from EcoDataFlow/feature/consumption
Browse files Browse the repository at this point in the history
Feature/consumption
  • Loading branch information
eclipse25 authored Jan 11, 2024
2 parents 8e64336 + 2627f81 commit 1c39157
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 23 deletions.
8 changes: 8 additions & 0 deletions dags/energy/consumption/electricity_consumption_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator


def process_data():
Expand Down Expand Up @@ -128,4 +129,11 @@ def process_data():
dag=dag,
)

trigger_target_dag = TriggerDagRunOperator(
task_id="trigger_target_dag",
trigger_dag_id="electricity_consumption_elt", # 트리거하려는 대상 DAG의 ID
dag=dag,
)

process_data >> upload_to_gcs >> create_table_if_not_exist >> gcs_to_bigquery
gcs_to_bigquery >> trigger_target_dag
2 changes: 1 addition & 1 deletion dags/energy/consumption/energy_consumption_bill_elt.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def process_data(**kwargs):

default_args = {
"owner": "airflow",
"start_date": datetime(2024, 1, 2),
"start_date": datetime(2024, 1, 10),
"retries": 1,
}

Expand Down
66 changes: 44 additions & 22 deletions dags/energy/consumption/energy_consumption_bill_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator


def read_csv_from_gcs():
Expand Down Expand Up @@ -44,26 +45,41 @@ def process_data(**kwargs):

for code in region_codes.법정동코드:
params = {'serviceKey': 'IfMicP9ax2V2RmsEiy8nE8UW0OuO4zyv/DINJE/x6H5FVPTFKAFjM5scKDPGlgu9m05/ygawZ9h3egOzpH7usw==', 'sigunguCode': code, 'searchDate': f'{year}{month:02d}'}
response = requests.get(url, params=params)

# XML 응답 파싱
root = ET.fromstring(response.content)
for item in root.findall('.//item'):
electricity = item.find('elect').text if item.find('elect') is not None else None
heat = item.find('heat').text if item.find('heat') is not None else None
water_cool = item.find('waterCool').text if item.find('waterCool') is not None else None
water_hot = item.find('waterHot').text if item.find('waterHot') is not None else None

# 임시 데이터프레임 생성 및 병합
temp_df = pd.DataFrame([{
'year_month': f'{year}{month:02d}',
'district_code': code,
'electricity': electricity,
'heat': heat,
'water_cool': water_cool,
'water_hot': water_hot
}])
energy_consumption_bill = pd.concat([energy_consumption_bill, temp_df], ignore_index=True)

try:
response = requests.get(url, params=params)
response.raise_for_status() # 응답 상태 확인

if response.headers['Content-Type'] != 'application/xml':
print(year, month, code, '응답이 XML 형식이 아닙니다.')
continue # XML이 아니면 건너뛰기

# XML 응답 파싱
root = ET.fromstring(response.content)
for item in root.findall('.//item'):
electricity = item.find('elect').text if item.find('elect') is not None else None
heat = item.find('heat').text if item.find('heat') is not None else None
water_cool = item.find('waterCool').text if item.find('waterCool') is not None else None
water_hot = item.find('waterHot').text if item.find('waterHot') is not None else None

# 임시 데이터프레임 생성 및 병합
temp_df = pd.DataFrame([{
'year_month': f'{year}{month:02d}',
'district_code': code,
'electricity': electricity,
'heat': heat,
'water_cool': water_cool,
'water_hot': water_hot
}])
energy_consumption_bill = pd.concat([energy_consumption_bill, temp_df], ignore_index=True)

except requests.exceptions.HTTPError as http_err:
print(year, month, code, f'HTTP 에러 발생: {http_err}') # HTTP 에러 출력
except ET.ParseError as parse_err:
print(year, month, code, f'XML 파싱 에러 발생: {parse_err}') # 파싱 에러 출력
except Exception as err:
print(year, month, code, f'기타 에러 발생: {err}') # 기타 예외 처리

print("checkpoint", year, month)
print(energy_consumption_bill)
energy_consumption_bill.to_csv("dags/data/energy_consumption_bill.csv", index=False)
Expand All @@ -73,7 +89,7 @@ def process_data(**kwargs):
default_args = {
"owner": "airflow",
"start_date": datetime(2024, 1, 1),
"retries": 1,
"retries": 2,
}

dag = DAG(
Expand Down Expand Up @@ -137,5 +153,11 @@ def process_data(**kwargs):
dag=dag,
)

trigger_target_dag = TriggerDagRunOperator(
task_id="trigger_target_dag",
trigger_dag_id="energy_consumption_bill_elt", # 트리거하려는 대상 DAG의 ID
dag=dag,
)

read_csv >> process_data >> upload_to_gcs
upload_to_gcs >> create_table_if_not_exist >> gcs_to_bigquery
upload_to_gcs >> create_table_if_not_exist >> gcs_to_bigquery >> trigger_target_dag

0 comments on commit 1c39157

Please sign in to comment.