Skip to content

Commit

Permalink
test25
Browse files Browse the repository at this point in the history
  • Loading branch information
lsh0107 committed Jan 6, 2024
1 parent 374658c commit b98f3e5
Show file tree
Hide file tree
Showing 73 changed files with 2,469 additions and 222 deletions.
4 changes: 4 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[flake8]
ignore = E501, F401
exclude=
__pyache__,
20 changes: 10 additions & 10 deletions dags/Learn_LatestOnlyOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
from datetime import timedelta

with DAG(
dag_id='Learn_LatestOnlyOperator',
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True) as dag:
dag_id="Learn_LatestOnlyOperator",
schedule=timedelta(hours=48), # 매 48시간마다 실행되는 DAG로 설정
start_date=datetime(2023, 6, 14),
catchup=True,
) as dag:
t1 = EmptyOperator(task_id="task1")
t2 = LatestOnlyOperator(task_id="latest_only")
t3 = EmptyOperator(task_id="task3")
t4 = EmptyOperator(task_id="task4")

t1 = EmptyOperator(task_id='task1')
t2 = LatestOnlyOperator(task_id='latest_only')
t3 = EmptyOperator(task_id='task3')
t4 = EmptyOperator(task_id='task4')

t1 >> t2 >> [t3, t4]
t1 >> t2 >> [t3, t4]
16 changes: 11 additions & 5 deletions dags/Learn_TaskGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
from airflow.utils.task_group import TaskGroup
import pendulum

with DAG(dag_id="Learn_Task_Group", start_date=pendulum.today('UTC').add(days=-2), tags=["example"]) as dag:
with DAG(
dag_id="Learn_Task_Group",
start_date=pendulum.today("UTC").add(days=-2),
tags=["example"],
) as dag:
start = EmptyOperator(task_id="start")

# Task Group #1
with TaskGroup("Download", tooltip="Tasks for downloading data") as section_1:
task_1 = EmptyOperator(task_id="task_1")
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")

task_1 >> [task_2, task_3]
Expand All @@ -19,13 +23,15 @@
with TaskGroup("Process", tooltip="Tasks for processing data") as section_2:
task_1 = EmptyOperator(task_id="task_1")

with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
with TaskGroup(
"inner_section_2", tooltip="Tasks for inner_section2"
) as inner_section_2:
task_2 = BashOperator(task_id="task_2", bash_command="echo 1")
task_3 = EmptyOperator(task_id="task_3")
task_4 = EmptyOperator(task_id="task_4")

[task_2, task_3] >> task_4

end = EmptyOperator(task_id='end')
end = EmptyOperator(task_id="end")

start >> section_1 >> section_2 >> end
22 changes: 10 additions & 12 deletions dags/Learn_TriggerRule.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta

default_args = {
'start_date': datetime(2023, 6, 15)
}
default_args = {"start_date": datetime(2023, 6, 15)}

with DAG("Learn_TriggerRule", default_args=default_args, schedule=timedelta(1)) as dag:
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id='final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1, t2, t3] >> t4
t1 = BashOperator(task_id="print_date", bash_command="date")
t2 = BashOperator(task_id="sleep", bash_command="sleep 5")
t3 = BashOperator(task_id="exit", bash_command="exit 1")
t4 = BashOperator(
task_id="final_task",
bash_command="echo DONE!",
trigger_rule=TriggerRule.ALL_DONE,
)
[t1, t2, t3] >> t4
58 changes: 29 additions & 29 deletions dags/MySQL_to_Redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@


dag = DAG(
dag_id = 'MySQL_to_Redshift',
start_date = datetime(2022,8,24), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
dag_id="MySQL_to_Redshift",
start_date=datetime(2022, 8, 24), # 날짜가 미래인 경우 실행이 안됨
schedule="0 9 * * *", # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
)

schema = "zippoo94"
Expand All @@ -31,29 +31,29 @@
s3_key = schema + "-" + table

mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = "SELECT * FROM prod.nps",
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
task_id="mysql_to_s3_nps",
query="SELECT * FROM prod.nps",
s3_bucket=s3_bucket,
s3_key=s3_key,
sql_conn_id="mysql_conn_id",
aws_conn_id="aws_conn_id",
verify=False,
replace=True,
pd_kwargs={"index": False, "header": False},
dag=dag,
)

s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
method = 'REPLACE',
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
dag = dag
task_id="s3_to_redshift_nps",
s3_bucket=s3_bucket,
s3_key=s3_key,
schema=schema,
table=table,
copy_options=["csv"],
method="REPLACE",
redshift_conn_id="redshift_dev_db",
aws_conn_id="aws_conn_id",
dag=dag,
)

mysql_to_s3_nps >> s3_to_redshift_nps
62 changes: 31 additions & 31 deletions dags/MySQL_to_Redshift_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,49 +13,49 @@
import json

dag = DAG(
dag_id = 'MySQL_to_Redshift_v2',
start_date = datetime(2023,1,1), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 9 * * *', # 적당히 조절
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
dag_id="MySQL_to_Redshift_v2",
start_date=datetime(2023, 1, 1), # 날짜가 미래인 경우 실행이 안됨
schedule="0 9 * * *", # 적당히 조절
max_active_runs=1,
catchup=False,
default_args={
"retries": 1,
"retry_delay": timedelta(minutes=3),
},
)

schema = "keeyong"
table = "nps"
s3_bucket = "grepp-data-engineering"
s3_key = schema + "-" + table # s3_key = schema + "/" + table
s3_key = schema + "-" + table # s3_key = schema + "/" + table

sql = "SELECT * FROM prod.nps WHERE DATE(created_at) = DATE('{{ execution_date }}')"
print(sql)
mysql_to_s3_nps = SqlToS3Operator(
task_id = 'mysql_to_s3_nps',
query = sql,
s3_bucket = s3_bucket,
s3_key = s3_key,
sql_conn_id = "mysql_conn_id",
aws_conn_id = "aws_conn_id",
verify = False,
replace = True,
pd_kwargs={"index": False, "header": False},
dag = dag
task_id="mysql_to_s3_nps",
query=sql,
s3_bucket=s3_bucket,
s3_key=s3_key,
sql_conn_id="mysql_conn_id",
aws_conn_id="aws_conn_id",
verify=False,
replace=True,
pd_kwargs={"index": False, "header": False},
dag=dag,
)

s3_to_redshift_nps = S3ToRedshiftOperator(
task_id = 's3_to_redshift_nps',
s3_bucket = s3_bucket,
s3_key = s3_key,
schema = schema,
table = table,
copy_options=['csv'],
redshift_conn_id = "redshift_dev_db",
aws_conn_id = "aws_conn_id",
method = "UPSERT",
upsert_keys = ["id"],
dag = dag
task_id="s3_to_redshift_nps",
s3_bucket=s3_bucket,
s3_key=s3_key,
schema=schema,
table=table,
copy_options=["csv"],
redshift_conn_id="redshift_dev_db",
aws_conn_id="aws_conn_id",
method="UPSERT",
upsert_keys=["id"],
dag=dag,
)

mysql_to_s3_nps >> s3_to_redshift_nps
35 changes: 19 additions & 16 deletions dags/NameGenderCSVtoRedshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
import logging
import psycopg2


def get_Redshift_connection():
host = "learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com"
user = "keeyong" # 본인 ID 사용
password = "..." # 본인 Password 사용
port = 5439
dbname = "dev"
conn = psycopg2.connect(f"dbname={dbname} user={user} host={host} password={password} port={port}")
conn = psycopg2.connect(
f"dbname={dbname} user={user} host={host} password={password} port={port}"
)
conn.set_session(autocommit=True)
return conn.cursor()

Expand All @@ -20,16 +23,16 @@ def extract(url):
logging.info("Extract started")
f = requests.get(url)
logging.info("Extract done")
return (f.text)
return f.text


def transform(text):
logging.info("Transform started")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
logging.info("Transform started")
lines = text.strip().split("\n")[1:] # 첫 번째 라인을 제외하고 처리
records = []
for l in lines:
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
(name, gender) = l.split(",") # l = "Keeyong,M" -> [ 'keeyong', 'M' ]
records.append([name, gender])
logging.info("Transform ended")
return records

Expand All @@ -48,18 +51,18 @@ def load(records):
cur = get_Redshift_connection()
try:
cur.execute("BEGIN;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
cur.execute(f"DELETE FROM {schema}.name_gender;")
# DELETE FROM을 먼저 수행 -> FULL REFRESH을 하는 형태
for r in records:
name = r[0]
gender = r[1]
print(name, "-", gender)
sql = f"INSERT INTO {schema}.name_gender VALUES ('{name}', '{gender}')"
cur.execute(sql)
cur.execute("COMMIT;") # cur.execute("END;")
cur.execute("COMMIT;") # cur.execute("END;")
except (Exception, psycopg2.DatabaseError) as error:
print(error)
cur.execute("ROLLBACK;")
cur.execute("ROLLBACK;")
logging.info("load done")


Expand All @@ -71,12 +74,12 @@ def etl():


dag_second_assignment = DAG(
dag_id = 'name_gender',
catchup = False,
start_date = datetime(2023,4,6), # 날짜가 미래인 경우 실행이 안됨
schedule = '0 2 * * *') # 적당히 조절
dag_id="name_gender",
catchup=False,
start_date=datetime(2023, 4, 6), # 날짜가 미래인 경우 실행이 안됨
schedule="0 2 * * *",
) # 적당히 조절

task = PythonOperator(
task_id = 'perform_etl',
python_callable = etl,
dag = dag_second_assignment)
task_id="perform_etl", python_callable=etl, dag=dag_second_assignment
)
Loading

0 comments on commit b98f3e5

Please sign in to comment.