Skip to content

Commit

Permalink
clean-up metrics scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
adwk67 committed Mar 14, 2024
1 parent 9b7336f commit 0188630
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 28 deletions.
36 changes: 22 additions & 14 deletions tests/templates/kuttl/commons/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ def exception_handler(exception_type, exception, traceback):

sys.excepthook = exception_handler


def assert_metric(role, metric):
response = requests.get(f'http://airflow-{role}-default:9102/metrics')
assert response.status_code == 200, \
metric_response = requests.get(f'http://airflow-{role}-default:9102/metrics')
assert metric_response.status_code == 200, \
f"Metrics could not be retrieved from the {role}."
assert metric in response.text, \
f"The {role} metrics do not contain the metric {metric}."
return metric in metric_response.text


# Trigger a DAG run to create metrics
Expand All @@ -32,13 +32,21 @@ def assert_metric(role, metric):
response = requests.patch(f'{rest_url}/dags/{dag_id}', auth=auth, json={'is_paused': False})
response = requests.post(f'{rest_url}/dags/{dag_id}/dagRuns', auth=auth, json={'conf': dag_conf})

assert response.status_code == 200, "DAG run could not be triggered."

# Wait for the metrics to be consumed by the statsd-exporter
time.sleep(4)

assert_metric('scheduler', 'airflow_scheduler_heartbeat')
assert_metric('webserver', 'airflow_task_instance_created_BashOperator')

# Worker is not deployed with the kubernetes executor so retrieve success metric from scheduler
assert_metric('scheduler', 'airflow_dagrun_duration_success_example_trigger_target_dag_count')
# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid
# or minimize this by looping over the check instead.
iterations = 4
loop = 0
while True:
assert response.status_code == 200, "DAG run could not be triggered."
# Wait for the metrics to be consumed by the statsd-exporter
time.sleep(5)
# (disable line-break flake checks)
if ((assert_metric('scheduler', 'airflow_scheduler_heartbeat'))
and (assert_metric('webserver', 'airflow_task_instance_created_BashOperator')) # noqa: W503, W504
and (assert_metric('scheduler', 'airflow_dagrun_duration_success_example_trigger_target_dag_count'))): # noqa: W503, W504
break
time.sleep(10)
loop += 1
if loop == iterations:
# force re-try of script
sys.exit(1)
36 changes: 22 additions & 14 deletions tests/templates/kuttl/mount-dags-gitsync/dag_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ def exception_handler(exception_type, exception, traceback):


def assert_metric(role, metric):
response = requests.get(f'http://airflow-{role}-default:9102/metrics')
assert response.status_code == 200, \
metric_response = requests.get(f'http://airflow-{role}-default:9102/metrics')
assert metric_response.status_code == 200, \
f"Metrics could not be retrieved from the {role}."
assert metric in response.text, \
f"The {role} metrics do not contain the metric {metric}."
return metric in metric_response.text


# Trigger a DAG run to create metrics
dag_id = 'date_demo'
dag_id = 'sparkapp_dag'

rest_url = 'http://airflow-webserver-default:8080/api/v1'
auth = ('airflow', 'airflow')
Expand All @@ -32,12 +31,21 @@ def assert_metric(role, metric):
response = requests.patch(f'{rest_url}/dags/{dag_id}', auth=auth, json={'is_paused': False})
response = requests.post(f'{rest_url}/dags/{dag_id}/dagRuns', auth=auth, json={})

assert response.status_code == 200, "DAG run could not be triggered."

# Wait for the metrics to be consumed by the statsd-exporter
time.sleep(4)

assert_metric('scheduler', 'airflow_scheduler_heartbeat')

# Worker is not deployed with the kubernetes executor so retrieve success metric from scheduler
assert_metric('scheduler', 'airflow_dagrun_duration_success_date_demo_count')
# Test the DAG in a loop. Each time we call the script a new job will be started: we can avoid
# or minimize this by looping over the check instead.
iterations = 4
loop = 0
while True:
assert response.status_code == 200, "DAG run could not be triggered."
# Wait for the metrics to be consumed by the statsd-exporter
time.sleep(5)
# Worker is not deployed with the kubernetes executor so retrieve success metric from scheduler
# (disable line-break flake checks)
if ((assert_metric('scheduler', 'airflow_scheduler_heartbeat'))
and (assert_metric('scheduler', 'airflow_dagrun_duration_success_sparkapp_dag_count'))): # noqa: W503, W504
break
time.sleep(10)
loop += 1
if loop == iterations:
# force re-try of script
sys.exit(1)

0 comments on commit 0188630

Please sign in to comment.