Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions celerymon/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ def run():
redis_client, args.queue, args.redis_watch_interval_sec
)
worker_watcher = WorkerWatcher.create_started(app, args.worker_inspect_interval_sec)
event_watcher = EventWatcher.create_started(
app,
# This has a wrong type annotation.
app.events.State(), # type: ignore[attr-defined]
buckets,
)
event_watcher = EventWatcher.create_started(app, buckets)
collector = Collector(redis_watcher, worker_watcher, event_watcher)

registry = prometheus_client.CollectorRegistry()
Expand Down
27 changes: 15 additions & 12 deletions celerymon/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
import logging
import threading
import time
from collections import defaultdict
from collections import OrderedDict, defaultdict
from typing import Any, Sequence

import celery.events # type: ignore[import]
import celery.events.state # type: ignore[import]
import celery # type: ignore[import]

from .timer import RepeatTimer

logger = logging.getLogger(__name__)


class EventWatcher:
_TASK_NAMES_CACHE_LIMIT = 100_000

last_received_timestamp: datetime.datetime | None
last_received_timestamp_per_task_event: dict[tuple[str, str], datetime.datetime]
num_events_per_task_count: dict[tuple[str, str], int]
Expand All @@ -29,10 +30,9 @@ class EventWatcher:
def create_started(
cls,
app: celery.Celery,
state: celery.events.state.State,
buckets: Sequence[float | str],
):
store = cls(state, buckets)
store = cls(buckets)

def run() -> None:
backoff = 1.0
Expand Down Expand Up @@ -67,10 +67,8 @@ def update_enable_event() -> None:

return store

def __init__(
self, state: celery.events.state.State, buckets: Sequence[float | str]
):
self._state = state
def __init__(self, buckets: Sequence[float | str]):
self._task_names_by_uuid: OrderedDict[str, str] = OrderedDict()

self.upper_bounds = [float(b) for b in buckets]
if self.upper_bounds and self.upper_bounds[-1] != float("inf"):
Expand All @@ -90,13 +88,18 @@ def on_event(self, event: dict[str, Any]):
now = datetime.datetime.now(tz=datetime.UTC)
self.last_received_timestamp = now

self._state.event(event)
event_name: str = event["type"]
if not event_name.startswith("task-"):
return

task: celery.events.Task = self._state.get_or_create_task(event["uuid"])[0]
task_name = task.name or "(UNKNOWN)"
uuid: str = event["uuid"]
if "name" in event:
self._task_names_by_uuid.pop(uuid, None)
Comment thread
tulioz marked this conversation as resolved.
self._task_names_by_uuid[uuid] = event["name"]
Comment thread
tulioz marked this conversation as resolved.
while len(self._task_names_by_uuid) > self._TASK_NAMES_CACHE_LIMIT:
Comment thread
tulioz marked this conversation as resolved.
self._task_names_by_uuid.popitem(last=False)

task_name = self._task_names_by_uuid.get(uuid, "(UNKNOWN)")
self.task_names.add(task_name)

self.last_received_timestamp_per_task_event[(task_name, event_name)] = now
Expand Down