Skip to content

Commit eaaee7a

Browse files
Merge pull request #2 from MRoci/master
Add CustomState instead of celery.event.State & refac task collection logic
2 parents 964abad + a418288 commit eaaee7a

File tree

3 files changed

+83
-32
lines changed

3 files changed

+83
-32
lines changed

celery_exporter/monitor.py

Lines changed: 13 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import celery.events
1010

1111
from .metrics import TASKS, TASKS_RUNTIME, LATENCY, WORKERS
12+
from .state import CeleryState
1213

1314
class TaskThread(threading.Thread):
1415
"""
@@ -20,7 +21,7 @@ def __init__(self, app, namespace, max_tasks_in_memory, *args, **kwargs):
2021
self._app = app
2122
self._namespace = namespace
2223
self.log = logging.getLogger('task-thread')
23-
self._state = self._app.events.State(max_tasks_in_memory=max_tasks_in_memory)
24+
self._state = CeleryState(max_tasks_in_memory=max_tasks_in_memory)
2425
self._known_states = set()
2526
self._known_states_names = set()
2627
self._tasks_started = dict()
@@ -32,13 +33,12 @@ def run(self): # pragma: no cover
3233
def _process_event(self, evt):
3334
# Events might come in in parallel. Celery already has a lock
3435
# that deals with this exact situation so we'll use that for now.
35-
with self._state._mutex:
36-
if celery.events.group_from(evt['type']) == 'task':
37-
evt_state = evt['type'][5:]
38-
state = celery.events.state.TASK_EVENT_TO_STATE[evt_state]
39-
if state == celery.states.STARTED:
40-
self._observe_latency(evt)
41-
self._collect_tasks(evt, state)
36+
if celery.events.group_from(evt['type']) == 'task':
37+
evt_state = evt['type'][5:]
38+
state = celery.events.state.TASK_EVENT_TO_STATE[evt_state]
39+
if state == celery.states.STARTED:
40+
self._observe_latency(evt)
41+
self._collect_tasks(evt, state)
4242

4343
def _observe_latency(self, evt):
4444
try:
@@ -54,31 +54,12 @@ def _observe_latency(self, evt):
5454
evt['local_received'] - prev_evt.local_received)
5555

5656
def _collect_tasks(self, evt, state):
57-
if state in celery.states.READY_STATES:
58-
self._incr_ready_task(evt, state)
59-
else:
60-
self._incr_unready_task(evt, state)
57+
(name, state, runtime) = self._state.collect(evt, state)
58+
if runtime is not None:
59+
TASKS_RUNTIME.labels(namespace=self._namespace, name=name).observe(runtime)
60+
61+
TASKS.labels(namespace=self._namespace, name=name, state=state).inc()
6162

62-
def _incr_ready_task(self, evt, state):
63-
try:
64-
# remove event from list of in-progress tasks
65-
name = self._state.tasks.pop(evt['uuid']).name or ''
66-
except (KeyError, AttributeError): # pragma: no cover
67-
name = ''
68-
finally:
69-
TASKS.labels(namespace=self._namespace, name=name, state=state).inc()
70-
if 'runtime' in evt:
71-
TASKS_RUNTIME.labels(namespace=self._namespace, name=name) \
72-
.observe(evt['runtime'])
73-
74-
def _incr_unready_task(self, evt, state):
75-
self._state._event(evt)
76-
try:
77-
name = self._state.tasks[evt['uuid']].name or ''
78-
except (KeyError, AttributeError): # pragma: no cover
79-
name = ''
80-
finally:
81-
TASKS.labels(namespace=self._namespace, name=name, state=state).inc()
8263

8364
def _monitor(self): # pragma: no cover
8465
while True:

celery_exporter/state.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import threading
2+
from operator import itemgetter
3+
4+
from celery.states import READY_STATES
5+
from celery.utils.functional import LRUCache
6+
from celery.events.state import Task
7+
8+
9+
class CeleryState:
10+
event_count = 0
11+
task_count = 0
12+
13+
def __init__(self, max_tasks_in_memory=10000):
14+
self.tasks = LRUCache(max_tasks_in_memory)
15+
self._mutex = threading.Lock()
16+
17+
def _event(self, evt):
18+
tfields = itemgetter("uuid", "hostname", "timestamp", "local_received", "clock")
19+
get_task = self.tasks.__getitem__
20+
self.event_count += 1
21+
group, _, subject = evt["type"].partition("-")
22+
if group == "task":
23+
(uuid, hostname, timestamp, local_received, clock) = tfields(evt)
24+
is_client_event = subject == "sent"
25+
try:
26+
task, task_created = get_task(uuid), False
27+
except KeyError:
28+
task = self.tasks[uuid] = Task(uuid, cluster_state=None)
29+
task_created = True
30+
if is_client_event:
31+
task.client = hostname
32+
33+
if subject == "received":
34+
self.task_count += 1
35+
36+
task.event(subject, timestamp, local_received, evt)
37+
38+
return (task, task_created), subject
39+
40+
def event(self, evt):
41+
with self._mutex:
42+
return self._event(evt)
43+
44+
def collect(self, evt, state):
45+
runtime = None
46+
if state in READY_STATES:
47+
try:
48+
with self._mutex:
49+
name = self.tasks.pop(evt["uuid"]).name or ""
50+
except (KeyError, AttributeError): # pragma: no cover
51+
name = ""
52+
finally:
53+
if "runtime" in evt:
54+
runtime = evt["runtime"]
55+
return (name, state, runtime)
56+
else:
57+
self.event(evt)
58+
try:
59+
name = self.tasks[evt["uuid"]].name or ""
60+
except (KeyError, AttributeError): # pragma: no cover
61+
name = ""
62+
finally:
63+
return (name, state, runtime)

test/test_unit.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ def test_tasks_events(self):
9292
runtime=runtime, hostname=hostname, clock=2,
9393
local_received=local_received + latency_before_started + runtime))
9494
self._assert_all_states({celery.states.RECEIVED, celery.states.STARTED, celery.states.SUCCESS})
95+
96+
m._process_event(Event(
97+
'task-sent', uuid=task_uuid, name=self.task,
98+
args='()', kwargs='{}', retries=0, eta=None, hostname=hostname,
99+
clock=0,
100+
local_received=local_received))
101+
self._assert_task_states({celery.states.PENDING}, 1)
95102

96103
assert REGISTRY.get_sample_value('celery_tasks_latency_seconds_count',
97104
labels=dict(namespace=self.namespace)) == 1

0 commit comments

Comments
 (0)