airflow 설치부터 운영까지 이모저모
sudo apt-get install postgresql postgresql-contrib -y
mysql, sqlite의 경우 multiprocess불가능 -> postgresql 이동필요
https://jungwoon.github.io/airflow/2019/02/26/Airflow.html
sudo -u postgres psql
https://dorumugs.tistory.com/entry/AirFlow-Manual-on-Docker-stage-install
- docker-compose 2버전 이상설치
- airflow 2.3.2업데이트
airflow users create
--username admin
--firstname FIRST_NAME
--lastname LAST_NAME
--role Admin
--password admin
--email [email protected]
$ export PATH=$PATH:~/.local/bin
airflow webserver -p port_num &! airflow schduler &!
- 3.5달러 (RAM 500MB) // 5달러 (RAM 1GB) -> 모두 webserver, scheduler 동시실행시 서버터짐 ㅋ (최소 4gb의 메모리가 필요)
- user에게 실행 권한 주기 ("can create on DAG Runs")
- slack등의 outcome의 기능은 존재
- 자체 income 기능은 없다
- client api -> 같은 서버 공간에서의 dags 실행을 위한 api.
- 외부 서버 REST api https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html.
api 상에서
c = Client(None, None)
c.trigger_dag(dag_id='dag_id', conf={'target':target}) 으로 api 호출
dag 상에서
def print_arguments(**kwargs):
table_name = kwargs['dag_run'].conf.get('table') -> 포인트
task = PythonOperator(
task_id="sample_task",
python_callable=print_arguments,
provide_context=True, ## 반드시 해당 옵션을 지정해야 함
dag=dag
)
password,secret,passwd,authorization,api_key,apikey,access_token 의 단어들이 key값으로 들어가면 value값이 익명으로 나타난다.
- hook을 이용 하여 sql return value를 핸들링 할 수 있다.
from airflow.providers.postgres.hooks.postgres import PostgresHook
def reorderCheck(**xcompusher):
hook = PostgresHook(postgres_conn_id='db_conn')
#x_com으로 특정 value 추출
userpk = xcompusher['t'].xcom_pull(key='v')['v1']
if userpk =='NotExist':
reorder = 'response Error'
else:
#쿼리 날리기
resultSql = hook.get_records("select exists(select 1 from completedorderlst where userpk='%s');"%userpk)
reorder =str(resultSql[0][0])
#x_com 으로 데이터 업로드
xcompusher['t'].xcom_push(key='vv', value={'vv':reorder})
def fun1(**params):
mergeDF = pd.DataFrame()
params['ti'].xcom_push(key='subdata', value={'data': mergeDF.to_dict()})
return
def fun2(**params):
import pandas as pd
df = pd.DataFrame(params['ti'].xcom_pull(key='subdata')['data'])
retrun
from airflow.providers.postgres.hooks.postgres import PostgresHook
hook = PostgresHook(postgres_conn_id='dbconn')
resultSql = hook.get_records(
"select exists(select 1 from TABLENAME where COLUMNSNALE='%s');" % VALUE)
request = "insert into TABLENAME (p1,p2,p3) values ('%s',%d,'%s');" % (
p1value, p2value, p3value)
pg_hook = PostgresHook(postgres_conn_id='dbconn')
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
connection.commit()
cursor.close()
connection.close()
def function(**parm):
## on webserber {"name":"target_paramter"} -> 큰 따옴표로 찍어야함
parm['dag_run'].conf.get('name')
return
- docker로 운영시 Python pakages -> webserver 도커에 설치
- Failed to fetch log file from worker. Unsupported URL protocol '' 에러의 경우 logs 폴더에 777권한