Skip to content

Commit

Permalink
Merge branch 'master' of github.com:epoch8/airflow-exporter
Browse files Browse the repository at this point in the history
  • Loading branch information
elephantum committed Nov 27, 2020
2 parents 2305ba3 + 4400dea commit 75191f5
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions airflow_exporter/prometheus_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ def get_task_state_info():
'''
task_status_query = Session.query(
TaskInstance.dag_id, TaskInstance.task_id,
TaskInstance.state, TaskInstance.hostname, func.count(TaskInstance.dag_id).label('value')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state, TaskInstance.hostname).subquery()
TaskInstance.state, func.count(TaskInstance.dag_id).label('value')
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()

return Session.query(
task_status_query.c.dag_id, task_status_query.c.task_id, task_status_query.c.hostname,
task_status_query.c.dag_id, task_status_query.c.task_id,
task_status_query.c.state, task_status_query.c.value, DagModel.owners
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).order_by(task_status_query.c.dag_id).all()

Expand Down Expand Up @@ -82,12 +82,12 @@ def get_dag_labels(dag_id):

if dag is None:
return [], []

labels = dag.params.get('labels')

if labels is None:
return [], []

return list(labels.keys()), list(labels.values())


Expand All @@ -101,7 +101,7 @@ def collect(self):
'''collect metrics'''

# Task metrics
# Each *MetricFamily generates two lines of comments in /metrics, try to minimize noise
# Each *MetricFamily generates two lines of comments in /metrics, try to minimize noise
# by creating new group for each dag
task_info = get_task_state_info()
for dag_id, tasks in itertools.groupby(task_info, lambda x: x.dag_id):
Expand All @@ -110,11 +110,11 @@ def collect(self):
t_state = GaugeMetricFamily(
'airflow_task_status',
'Shows the number of task starts with this status',
labels=['dag_id', 'task_id', 'owner', 'status', 'hostname'] + k
labels=['dag_id', 'task_id', 'owner', 'status'] + k
)
for task in tasks:
t_state.add_metric([task.dag_id, task.task_id, task.owners, task.state or 'none', task.hostname or 'none'] + v, task.value)
t_state.add_metric([task.dag_id, task.task_id, task.owners, task.state or 'none'] + v, task.value)

yield t_state

# Dag Metrics
Expand Down

0 comments on commit 75191f5

Please sign in to comment.