Skip to content

Commit 264c3f8

Browse files
Merge pull request #4 from MRoci/master
Add queue label in TASKS & TASKS_RUNTIME metrics
2 parents 0ca13ba + 3a0e380 commit 264c3f8

File tree

12 files changed

+600
-304
lines changed

12 files changed

+600
-304
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@ __pycache__
1010
.cache/
1111
.python-version
1212
.pytest_cache
13+
htmlcov/
1314

1415
.vscode

README.md

Lines changed: 50 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -83,59 +83,59 @@ If you then look at the exposed metrics, you should see something like this:
8383
celery_workers{namespace="celery"} 1.0
8484
# HELP celery_tasks_total Number of tasks per state
8585
# TYPE celery_tasks_total gauge
86-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="RECEIVED"} 0.0
87-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="PENDING"} 0.0
88-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="STARTED"} 0.0
89-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="RETRY"} 0.0
90-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="FAILURE"} 0.0
91-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="REVOKED"} 0.0
92-
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",state="SUCCESS"} 1.0
93-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="RECEIVED"} 3.0
94-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="PENDING"} 0.0
95-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="STARTED"} 1.0
96-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="RETRY"} 2.0
97-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="FAILURE"} 1.0
98-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="REVOKED"} 0.0
99-
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",state="SUCCESS"} 7.0
86+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="RECEIVED"} 0.0
87+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="PENDING"} 0.0
88+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="STARTED"} 0.0
89+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="RETRY"} 0.0
90+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="FAILURE"} 0.0
91+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="REVOKED"} 0.0
92+
celery_tasks_total{name="my_app.tasks.calculate_something",namespace="celery",queue="celery",state="SUCCESS"} 1.0
93+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="RECEIVED"} 3.0
94+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="PENDING"} 0.0
95+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="STARTED"} 1.0
96+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="RETRY"} 2.0
97+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="FAILURE"} 1.0
98+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="REVOKED"} 0.0
99+
celery_tasks_total{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery",state="SUCCESS"} 7.0
100100
# HELP celery_tasks_runtime_seconds Task runtime (seconds)
101101
# TYPE celery_tasks_runtime_seconds histogram
102-
celery_tasks_runtime_seconds_bucket{le="0.005",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
103-
celery_tasks_runtime_seconds_bucket{le="0.01",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
104-
celery_tasks_runtime_seconds_bucket{le="0.025",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
105-
celery_tasks_runtime_seconds_bucket{le="0.05",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
106-
celery_tasks_runtime_seconds_bucket{le="0.075",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
107-
celery_tasks_runtime_seconds_bucket{le="0.1",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
108-
celery_tasks_runtime_seconds_bucket{le="0.25",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
109-
celery_tasks_runtime_seconds_bucket{le="0.5",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
110-
celery_tasks_runtime_seconds_bucket{le="0.75",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
111-
celery_tasks_runtime_seconds_bucket{le="1.0",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
112-
celery_tasks_runtime_seconds_bucket{le="2.5",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
113-
celery_tasks_runtime_seconds_bucket{le="5.0",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
114-
celery_tasks_runtime_seconds_bucket{le="7.5",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
115-
celery_tasks_runtime_seconds_bucket{le="10.0",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
116-
celery_tasks_runtime_seconds_bucket{le="+Inf",name="my_app.tasks.calculate_something",namespace="celery"} 29.0
117-
celery_tasks_runtime_seconds_count{name="my_app.tasks.calculate_something",namespace="celery"} 29.0
118-
celery_tasks_runtime_seconds_sum{name="my_app.tasks.calculate_something",namespace="celery"} 0.04020289977779612
119-
celery_tasks_runtime_seconds_bucket{le="0.005",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
120-
celery_tasks_runtime_seconds_bucket{le="0.01",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
121-
celery_tasks_runtime_seconds_bucket{le="0.025",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
122-
celery_tasks_runtime_seconds_bucket{le="0.05",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
123-
celery_tasks_runtime_seconds_bucket{le="0.075",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
124-
celery_tasks_runtime_seconds_bucket{le="0.1",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
125-
celery_tasks_runtime_seconds_bucket{le="0.25",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
126-
celery_tasks_runtime_seconds_bucket{le="0.5",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
127-
celery_tasks_runtime_seconds_bucket{le="0.75",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
128-
celery_tasks_runtime_seconds_bucket{le="1.0",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
129-
celery_tasks_runtime_seconds_bucket{le="2.5",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
130-
celery_tasks_runtime_seconds_bucket{le="5.0",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
131-
celery_tasks_runtime_seconds_bucket{le="7.5",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
132-
celery_tasks_runtime_seconds_bucket{le="10.0",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
133-
celery_tasks_runtime_seconds_bucket{le="+Inf",name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
134-
celery_tasks_runtime_seconds_count{name="my_app.tasks.fetch_some_data",namespace="celery"} 2.0
135-
celery_tasks_runtime_seconds_sum{name="my_app.tasks.fetch_some_data",namespace="celery"} 0.00402028997777961
102+
celery_tasks_runtime_seconds_bucket{le="0.005",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
103+
celery_tasks_runtime_seconds_bucket{le="0.01",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
104+
celery_tasks_runtime_seconds_bucket{le="0.025",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
105+
celery_tasks_runtime_seconds_bucket{le="0.05",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
106+
celery_tasks_runtime_seconds_bucket{le="0.075",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
107+
celery_tasks_runtime_seconds_bucket{le="0.1",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
108+
celery_tasks_runtime_seconds_bucket{le="0.25",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
109+
celery_tasks_runtime_seconds_bucket{le="0.5",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
110+
celery_tasks_runtime_seconds_bucket{le="0.75",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
111+
celery_tasks_runtime_seconds_bucket{le="1.0",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
112+
celery_tasks_runtime_seconds_bucket{le="2.5",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
113+
celery_tasks_runtime_seconds_bucket{le="5.0",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
114+
celery_tasks_runtime_seconds_bucket{le="7.5",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
115+
celery_tasks_runtime_seconds_bucket{le="10.0",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
116+
celery_tasks_runtime_seconds_bucket{le="+Inf",name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
117+
celery_tasks_runtime_seconds_count{name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 29.0
118+
celery_tasks_runtime_seconds_sum{name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 0.04020289977779612
119+
celery_tasks_runtime_seconds_bucket{le="0.005",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
120+
celery_tasks_runtime_seconds_bucket{le="0.01",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
121+
celery_tasks_runtime_seconds_bucket{le="0.025",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
122+
celery_tasks_runtime_seconds_bucket{le="0.05",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
123+
celery_tasks_runtime_seconds_bucket{le="0.075",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
124+
celery_tasks_runtime_seconds_bucket{le="0.1",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
125+
celery_tasks_runtime_seconds_bucket{le="0.25",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
126+
celery_tasks_runtime_seconds_bucket{le="0.5",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
127+
celery_tasks_runtime_seconds_bucket{le="0.75",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
128+
celery_tasks_runtime_seconds_bucket{le="1.0",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
129+
celery_tasks_runtime_seconds_bucket{le="2.5",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
130+
celery_tasks_runtime_seconds_bucket{le="5.0",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
131+
celery_tasks_runtime_seconds_bucket{le="7.5",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
132+
celery_tasks_runtime_seconds_bucket{le="10.0",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
133+
celery_tasks_runtime_seconds_bucket{le="+Inf",name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
134+
celery_tasks_runtime_seconds_count{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 2.0
135+
celery_tasks_runtime_seconds_sum{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 0.00402028997777961
136136
# TYPE celery_tasks_runtime_seconds_created gauge
137-
celery_tasks_runtime_seconds_created{name="my_app.tasks.calculate_something",namespace="celery"} 1.548944949810905e+09
138-
celery_tasks_runtime_seconds_created{name="my_app.tasks.fetch_some_data",namespace="celery"} 1.5489449550243628e+09
137+
celery_tasks_runtime_seconds_created{name="my_app.tasks.calculate_something",namespace="celery",queue="celery"} 1.548944949810905e+09
138+
celery_tasks_runtime_seconds_created{name="my_app.tasks.fetch_some_data",namespace="celery",queue="celery"} 1.5489449550243628e+09
139139
# HELP celery_tasks_latency_seconds Seconds between a task is received and started.
140140
# TYPE celery_tasks_latency_seconds histogram
141141
celery_tasks_latency_seconds_bucket{namespace="celery",le="0.005"} 2.0

celery_exporter/__main__.py

Lines changed: 88 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,54 +10,107 @@
1010

1111
__VERSION__ = (2, 0, 0)
1212

13-
LOG_FORMAT = '[%(asctime)s] %(name)s:%(levelname)s: %(message)s'
13+
LOG_FORMAT = "[%(asctime)s] %(name)s:%(levelname)s: %(message)s"
1414

15-
@click.command(context_settings={
16-
'auto_envvar_prefix':'CELERY_EXPORTER'
17-
})
18-
@click.option('--broker-url', '-b', type=str, show_default=True, show_envvar=True,
19-
default='redis://redis:6379/0',
20-
help='URL to the Celery broker.')
21-
@click.option('--listen-address', '-l', type=str, show_default=True, show_envvar=True,
22-
default='0.0.0.0:9540',
23-
help='Address the HTTPD should listen on.')
24-
@click.option('--max-tasks', '-m', type=int, show_default=True, show_envvar=True,
25-
default='10000',
26-
help='Tasks cache size.')
27-
@click.option('--namespace', '-n', type=str, show_default=True, show_envvar=True,
28-
default='celery',
29-
help='Namespace for metrics.')
30-
@click.option('--transport-options', type=str, allow_from_autoenv=False,
31-
help='JSON object with additional options passed to the underlying transport.')
32-
@click.option('--enable-events', is_flag=True, allow_from_autoenv=False,
33-
help='Periodically enable Celery events.')
34-
@click.option('--tz', type=str, allow_from_autoenv=False,
35-
help='Timezone used by the celery app.')
36-
@click.option('--verbose', is_flag=True, allow_from_autoenv=False,
37-
help='Enable verbose logging.')
38-
@click.version_option(version='.'.join([str(x) for x in __VERSION__]))
39-
def main(broker_url, listen_address, max_tasks, namespace, transport_options, enable_events, tz, verbose): # pragma: no cover
15+
16+
@click.command(context_settings={"auto_envvar_prefix": "CELERY_EXPORTER"})
17+
@click.option(
18+
"--broker-url",
19+
"-b",
20+
type=str,
21+
show_default=True,
22+
show_envvar=True,
23+
default="redis://redis:6379/0",
24+
help="URL to the Celery broker.",
25+
)
26+
@click.option(
27+
"--listen-address",
28+
"-l",
29+
type=str,
30+
show_default=True,
31+
show_envvar=True,
32+
default="0.0.0.0:9540",
33+
help="Address the HTTPD should listen on.",
34+
)
35+
@click.option(
36+
"--max-tasks",
37+
"-m",
38+
type=int,
39+
show_default=True,
40+
show_envvar=True,
41+
default="10000",
42+
help="Tasks cache size.",
43+
)
44+
@click.option(
45+
"--namespace",
46+
"-n",
47+
type=str,
48+
show_default=True,
49+
show_envvar=True,
50+
default="celery",
51+
help="Namespace for metrics.",
52+
)
53+
@click.option(
54+
"--transport-options",
55+
type=str,
56+
allow_from_autoenv=False,
57+
help="JSON object with additional options passed to the underlying transport.",
58+
)
59+
@click.option(
60+
"--enable-events",
61+
is_flag=True,
62+
allow_from_autoenv=False,
63+
help="Periodically enable Celery events.",
64+
)
65+
@click.option(
66+
"--tz", type=str, allow_from_autoenv=False, help="Timezone used by the celery app."
67+
)
68+
@click.option(
69+
"--verbose", is_flag=True, allow_from_autoenv=False, help="Enable verbose logging."
70+
)
71+
@click.version_option(version=".".join([str(x) for x in __VERSION__]))
72+
def main(
73+
broker_url,
74+
listen_address,
75+
max_tasks,
76+
namespace,
77+
transport_options,
78+
enable_events,
79+
tz,
80+
verbose,
81+
): # pragma: no cover
4082

4183
if verbose:
4284
logging.basicConfig(level=logging.DEBUG, format=LOG_FORMAT)
4385
else:
4486
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
4587

4688
if tz:
47-
os.environ['TZ'] = tz
89+
os.environ["TZ"] = tz
4890
time.tzset()
4991

5092
if transport_options:
5193
try:
5294
transport_options = json.loads(transport_options)
5395
except ValueError:
54-
print("Error parsing broker transport options from JSON '{}'"
55-
.format(transport_options), file=sys.stderr)
56-
sys.exit(1)
96+
print(
97+
"Error parsing broker transport options from JSON '{}'".format(
98+
transport_options
99+
),
100+
file=sys.stderr,
101+
)
102+
sys.exit(1)
57103

58-
celery_exporter = CeleryExporter(broker_url, listen_address, max_tasks, namespace, transport_options, enable_events)
104+
celery_exporter = CeleryExporter(
105+
broker_url,
106+
listen_address,
107+
max_tasks,
108+
namespace,
109+
transport_options,
110+
enable_events,
111+
)
59112
celery_exporter.start()
60-
113+
61114
def shutdown(signum, frame): # pragma: no cover
62115
"""
63116
Shutdown is called if the process receives a TERM/INT signal.
@@ -71,5 +124,6 @@ def shutdown(signum, frame): # pragma: no cover
71124
while True:
72125
signal.pause()
73126

74-
if __name__ == '__main__': # pragma: no cover
75-
main() # pylint: disable=E1120
127+
128+
if __name__ == "__main__": # pragma: no cover
129+
main() # pylint: disable=E1120

celery_exporter/core.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,26 @@
11
import celery
22
import logging
33
import prometheus_client
4-
from .monitor import TaskThread, WorkerMonitoringThread, EnableEventsThread, setup_metrics
5-
6-
__all__ = ('CeleryExporter',)
7-
8-
class CeleryExporter():
9-
10-
def __init__(self, broker_url, listen_address, max_tasks=10000, namespace='celery', transport_options=None, enable_events=False):
4+
from .monitor import (
5+
TaskThread,
6+
WorkerMonitoringThread,
7+
EnableEventsThread,
8+
setup_metrics,
9+
)
10+
11+
__all__ = ("CeleryExporter",)
12+
13+
14+
class CeleryExporter:
15+
def __init__(
16+
self,
17+
broker_url,
18+
listen_address,
19+
max_tasks=10000,
20+
namespace="celery",
21+
transport_options=None,
22+
enable_events=False,
23+
):
1124
self._listen_address = listen_address
1225
self._max_tasks = max_tasks
1326
self._namespace = namespace
@@ -22,11 +35,15 @@ def start(self):
2235

2336
self._start_httpd()
2437

25-
t = TaskThread(app=self._app, namespace=self._namespace, max_tasks_in_memory=self._max_tasks)
38+
t = TaskThread(
39+
app=self._app,
40+
namespace=self._namespace,
41+
max_tasks_in_memory=self._max_tasks,
42+
)
2643
t.daemon = True
2744
t.start()
2845

29-
w = WorkerMonitoringThread(app=self._app, namespace=self._namespace,)
46+
w = WorkerMonitoringThread(app=self._app, namespace=self._namespace)
3047
w.daemon = True
3148
w.start()
3249

@@ -40,8 +57,6 @@ def _start_httpd(self): # pragma: no cover
4057
Starts the exposing HTTPD using the addr provided in a separate
4158
thread.
4259
"""
43-
host, port = self._listen_address.split(':')
44-
logging.info('Starting HTTPD on {}:{}'.format(host, port))
60+
host, port = self._listen_address.split(":")
61+
logging.info("Starting HTTPD on {}:{}".format(host, port))
4562
prometheus_client.start_http_server(int(port), host)
46-
47-

celery_exporter/metrics.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import prometheus_client
22

33
TASKS = prometheus_client.Counter(
4-
'celery_tasks_total', 'Number of task events.',
5-
['namespace', 'name', 'state'])
4+
"celery_tasks_total",
5+
"Number of task events.",
6+
["namespace", "name", "state", "queue"],
7+
)
68
TASKS_RUNTIME = prometheus_client.Histogram(
7-
'celery_tasks_runtime_seconds', 'Task runtime.',
8-
['namespace', 'name'])
9+
"celery_tasks_runtime_seconds", "Task runtime.", ["namespace", "name", "queue"]
10+
)
911
LATENCY = prometheus_client.Histogram(
10-
'celery_tasks_latency_seconds', 'Time between a task is received and started.',
11-
['namespace'])
12+
"celery_tasks_latency_seconds",
13+
"Time between a task is received and started.",
14+
["namespace"],
15+
)
1216
WORKERS = prometheus_client.Gauge(
13-
'celery_workers', 'Number of alive workers',
14-
['namespace'])
17+
"celery_workers", "Number of alive workers", ["namespace"]
18+
)

0 commit comments

Comments
 (0)