Skip to content

Commit 23a77bf

Browse files
authored
Merge pull request #42 from CoronaWhy/airflow
DAG file for coronawhy vm
2 parents 06371ed + c14fb5c commit 23a77bf

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

airflow/load_data_forecast.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
from datetime import timedelta
2+
# The DAG object; we'll need this to instantiate a DAG
3+
from airflow import DAG
4+
# Operators; we need this to operate!
5+
from airflow.operators.bash_operator import BashOperator
6+
from airflow.operators.python_operator import PythonOperator
7+
from airflow.utils.dates import days_ago
8+
from airflow.models import Connection
9+
from airflow import DAG, settings
10+
# Connects to GoogleCloud
11+
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
12+
# These args will get passed on to each operator
13+
import json, os
14+
import pandas as pd
15+
import numpy as np
16+
# Needed for load_data
17+
from corona_ts.data_utils.data_crawler import load_data
18+
19+
# Setting environment key for GCP path
20+
os.environ["GCP_KEY_PATH"] = '/home/efawe/airflow/dags/task-ts-53924e1e3506.json'
21+
22+
def add_gcp_connection(**kwargs):
23+
new_conn = Connection(
24+
conn_id="google_cloud_default",
25+
conn_type='google_cloud_platform',
26+
)
27+
extra_field = {
28+
"extra__google_cloud_platform__scope": "https://www.googleapis.com/auth/cloud-platform",
29+
"extra__google_cloud_platform__project": "task-ts",
30+
"extra__google_cloud_platform__key_path": os.environ["GCP_KEY_PATH"]
31+
}
32+
33+
session = settings.Session()
34+
35+
#checking if connection exist
36+
if session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first():
37+
my_connection = session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).one()
38+
my_connection.set_extra(json.dumps(extra_field))
39+
session.add(my_connection)
40+
session.commit()
41+
else: #if it doesn't exit create one
42+
new_conn.set_extra(json.dumps(extra_field))
43+
session.add(new_conn)
44+
session.commit()
45+
46+
def data_to_GCS(csv_name: str, folder_name: str,
47+
bucket_name="task_ts_data", **kwargs):
48+
hook = GoogleCloudStorageHook()
49+
data = load_data()
50+
df = pd.DataFrame(data=data)
51+
df.to_csv('corona_data.csv', index=False)
52+
53+
hook.upload(bucket_name,
54+
object='{}/{}.csv'.format(folder_name, csv_name),
55+
filename='corona_data.csv',
56+
mime_type='text/csv')
57+
58+
default_args = {
59+
'owner': 'airflow',
60+
'depends_on_past': False,
61+
'start_date': days_ago(1),
62+
'email': ['[email protected]'],
63+
'email_on_failure': False,
64+
'email_on_retry': False,
65+
'retries': 1,
66+
'retry_delay': timedelta(minutes=5),
67+
# 'queue': 'bash_queue',
68+
# 'pool': 'backfill',
69+
# 'priority_weight': 10,
70+
# 'end_date': datetime(2016, 1, 1),
71+
# 'wait_for_downstream': False,
72+
# 'dag': dag,
73+
# 'sla': timedelta(hours=2),
74+
# 'execution_timeout': timedelta(seconds=300),
75+
# 'on_failure_callback': some_function,
76+
# 'on_success_callback': some_other_function,
77+
# 'on_retry_callback': another_function,
78+
# 'sla_miss_callback': yet_another_function,
79+
# 'trigger_rule': 'all_success'
80+
}
81+
dag = DAG(
82+
'load_data_forecast',
83+
default_args=default_args,
84+
description='DAG to populate mobility data for forecast team',
85+
schedule_interval='@daily',
86+
)
87+
88+
activate_GCP = PythonOperator(
89+
task_id='add_gcp_connection_python',
90+
python_callable=add_gcp_connection,
91+
provide_context=True,
92+
dag = dag,
93+
)
94+
95+
data_to_GCS_task = PythonOperator(
96+
task_id='data_to_GCS_python',
97+
python_callable=data_to_GCS,
98+
provide_context=True,
99+
op_kwargs={'csv_name': 'corona_data', 'folder_name': str(datetime.datetime.today().date())},
100+
dag =dag
101+
)
102+
103+
dag.doc_md = __doc__
104+
105+
activate_GCP >> data_to_GCS_task

0 commit comments

Comments
 (0)