Skip to content

Commit 1e30667

Browse files
committed
replace celery State with lightweight uuid to name cache
1 parent 1bee059 commit 1e30667

File tree

2 files changed

+16
-18
lines changed

2 files changed

+16
-18
lines changed

celerymon/cli/__init__.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,7 @@ def run():
3737
redis_client, args.queue, args.redis_watch_interval_sec
3838
)
3939
worker_watcher = WorkerWatcher.create_started(app, args.worker_inspect_interval_sec)
40-
event_watcher = EventWatcher.create_started(
41-
app,
42-
# This has a wrong type annotation.
43-
app.events.State(), # type: ignore[attr-defined]
44-
buckets,
45-
)
40+
event_watcher = EventWatcher.create_started(app, buckets)
4641
collector = Collector(redis_watcher, worker_watcher, event_watcher)
4742

4843
registry = prometheus_client.CollectorRegistry()

celerymon/event_watcher.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
import logging
66
import threading
77
import time
8-
from collections import defaultdict
8+
from collections import OrderedDict, defaultdict
99
from typing import Any, Sequence
1010

11-
import celery.events # type: ignore[import]
12-
import celery.events.state # type: ignore[import]
11+
import celery # type: ignore[import]
1312

1413
from .timer import RepeatTimer
1514

1615
logger = logging.getLogger(__name__)
1716

1817

1918
class EventWatcher:
19+
_TASK_NAMES_CACHE_LIMIT = 100_000
20+
2021
last_received_timestamp: datetime.datetime | None
2122
last_received_timestamp_per_task_event: dict[tuple[str, str], datetime.datetime]
2223
num_events_per_task_count: dict[tuple[str, str], int]
@@ -29,10 +30,9 @@ class EventWatcher:
2930
def create_started(
3031
cls,
3132
app: celery.Celery,
32-
state: celery.events.state.State,
3333
buckets: Sequence[float | str],
3434
):
35-
store = cls(state, buckets)
35+
store = cls(buckets)
3636

3737
def run() -> None:
3838
backoff = 1.0
@@ -64,10 +64,8 @@ def update_enable_event() -> None:
6464

6565
return store
6666

67-
def __init__(
68-
self, state: celery.events.state.State, buckets: Sequence[float | str]
69-
):
70-
self._state = state
67+
def __init__(self, buckets: Sequence[float | str]):
68+
self._task_names_by_uuid: OrderedDict[str, str] = OrderedDict()
7169

7270
self.upper_bounds = [float(b) for b in buckets]
7371
if self.upper_bounds and self.upper_bounds[-1] != float("inf"):
@@ -87,13 +85,18 @@ def on_event(self, event: dict[str, Any]):
8785
now = datetime.datetime.now(tz=datetime.UTC)
8886
self.last_received_timestamp = now
8987

90-
self._state.event(event)
9188
event_name: str = event["type"]
9289
if not event_name.startswith("task-"):
9390
return
9491

95-
task: celery.events.Task = self._state.get_or_create_task(event["uuid"])[0]
96-
task_name = task.name or "(UNKNOWN)"
92+
uuid: str = event["uuid"]
93+
if "name" in event:
94+
self._task_names_by_uuid.pop(uuid, None)
95+
self._task_names_by_uuid[uuid] = event["name"]
96+
while len(self._task_names_by_uuid) > self._TASK_NAMES_CACHE_LIMIT:
97+
self._task_names_by_uuid.popitem(last=False)
98+
99+
task_name = self._task_names_by_uuid.get(uuid, "(UNKNOWN)")
97100
self.task_names.add(task_name)
98101

99102
self.last_received_timestamp_per_task_event[(task_name, event_name)] = now

0 commit comments

Comments
 (0)