Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: ci

on: pull_request

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref }}
cancel-in-progress: true

permissions:
contents: read

jobs:
pre-commit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: actions/setup-python@v6
with:
python-version-file: ".python-version"
- uses: pre-commit/action@v3.0.1

typecheck:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: actions/setup-python@v6
with:
python-version-file: ".python-version"
- uses: astral-sh/setup-uv@v7
- run: uv sync --locked
- run: uv run mypy celerymon/

zizmor:
runs-on: ubuntu-latest
permissions:
security-events: write
steps:
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: zizmorcore/zizmor-action@v0.5.2
10 changes: 6 additions & 4 deletions .github/workflows/image-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,19 @@ jobs:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- uses: docker/login-action@v3
- uses: actions/checkout@v6
with:
persist-credentials: false
- uses: docker/login-action@v4
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@v6
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- uses: docker/build-push-action@v5
- uses: docker/build-push-action@v7
with:
context: .
push: true
Expand Down
9 changes: 9 additions & 0 deletions .github/zizmor.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
rules:
unpinned-uses:
config:
policies:
actions/*: ref-pin
docker/*: ref-pin
pre-commit/action: ref-pin
astral-sh/setup-uv: ref-pin
zizmorcore/zizmor-action: ref-pin
34 changes: 23 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ There are three ways to observe Celery:
* Using the Celery events.

Worker and task senders are optionally configured to send events. This
requires an opt-in, but is real-time. Since this is an event stream, if an
event consumer (like this monitoring tool) restarts, it won't be able to see
the previous or current stats.
requires an opt-in, but is real-time. celerymon maintains a lightweight
UUID to task name cache (rather than using Celery's built-in State object)
to keep memory usage bounded. If the connection drops, celerymon
automatically reconnects with exponential backoff.

These three can cover different areas. By querying the broker directly, we can
get the queued items stats. By using the worker inspection API, we can get the
Expand All @@ -47,6 +48,10 @@ on the finished tasks. celerymon uses all these three to get the data.
During this gap, the worker won't send events, which makes a hole in
monitoring. If possible, configure workers to send events from the beginning.

* If the event stream connection drops, celerymon will automatically reconnect
with exponential backoff (up to 60s). During the reconnection window, events
will be missed.

* This is not specific to celerymon, but in general, this type of monitoring
tools are not suited for short-lived ephemeral containers, such as Cloud Run.
The background threads need to run continuously instead of just at the time a
Expand Down Expand Up @@ -79,23 +84,30 @@ See the source code for the metric descriptions as well.

```
celerymon --broker-url=BROKER_URL
--queue=QUEUE_NAME
--queue=QUEUE_NAME [--queue=QUEUE_NAME ...]
[--worker-inspect-interval-sec=10]
[--redis-watch-interval-sec=10]
[--healthz-unhealthy-threshold-sec=300]
[--success-task-runtime-buckets=BUCKETS]
[--port=8000]
```

To monitor multiple queues, repeat the `--queue` flag:

```
celerymon --broker-url=redis://localhost:6379/0 --queue=celery --queue=agent
```

`--success-task-runtime-buckets` accepts a comma-separated list of bucket
boundaries in seconds (e.g. `0.1,0.5,1,5,10`). If not specified, the default
Prometheus histogram buckets are used.

### Note on /healthz

We recognize that Celery event stops working at some point for reasons unknown.
To address this, celerymon serves /healthz that you can use as a health check
endpoint that reacts to the inactivity. As described above, celerymon uses three
celerymon serves /healthz as a health check endpoint that reacts to data source
inactivity. While the event watcher now auto-reconnects, other data sources
(Redis, worker inspection) may still stall. celerymon uses three
Comment on lines +107 to +109
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current phrasing implies that the event watcher is no longer a concern for stalling and that /healthz primarily exists for the other data sources. However, the health check still monitors the event watcher, which can still stall if it fails to reconnect or if the broker stops sending events. It would be clearer to state that all sources are monitored, regardless of the new auto-reconnect feature.

Suggested change
celerymon serves /healthz as a health check endpoint that reacts to data source
inactivity. While the event watcher now auto-reconnects, other data sources
(Redis, worker inspection) may still stall. celerymon uses three
celerymon serves /healthz as a health check endpoint that monitors data source
activity. Even though the event watcher now auto-reconnects, any of the three
data sources (events, Redis, or worker inspection) may still stall. celerymon uses three

data sources for providing the data. If one of them cannot be updated for
`--healthz-unhealthy-threshold-sec`, this endpoint returns 500. You can
configure your container management tool to restart the container based on this
endpoint.

## Stability

This is created Sep 2023. It's a new effort.
7 changes: 1 addition & 6 deletions celerymon/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ def run():
redis_client, args.queue, args.redis_watch_interval_sec
)
worker_watcher = WorkerWatcher.create_started(app, args.worker_inspect_interval_sec)
event_watcher = EventWatcher.create_started(
app,
# This has a wrong type annotation.
app.events.State(), # type: ignore[attr-defined]
buckets,
)
event_watcher = EventWatcher.create_started(app, buckets)
collector = Collector(redis_watcher, worker_watcher, event_watcher)

registry = prometheus_client.CollectorRegistry()
Expand Down
61 changes: 43 additions & 18 deletions celerymon/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@
# SPDX-License-Identifier: MIT

import datetime
import logging
import threading
from collections import defaultdict
import time
from collections import OrderedDict, defaultdict
from typing import Any, Sequence

import celery.events # type: ignore[import]
import celery.events.state # type: ignore[import]
import celery

from .timer import RepeatTimer

logger = logging.getLogger(__name__)


class EventWatcher:
_TASK_NAMES_CACHE_LIMIT = 100_000

last_received_timestamp: datetime.datetime | None
last_received_timestamp_per_task_event: dict[tuple[str, str], datetime.datetime]
num_events_per_task_count: dict[tuple[str, str], int]
Expand All @@ -25,32 +30,47 @@ class EventWatcher:
def create_started(
cls,
app: celery.Celery,
state: celery.events.state.State,
buckets: Sequence[float | str],
):
store = cls(state, buckets)
store = cls(buckets)

def run() -> None:
with app.connection() as conn:
# This has a wrong type annotation.
recv = app.events.Receiver(conn, handlers={"*": store.on_event}) # type: ignore[attr-defined]
recv.capture(limit=None)
backoff = 1.0
max_backoff = 60.0
while True:
try:
with app.connection() as conn:
recv = app.events.Receiver(conn, handlers={"*": store.on_event})
logger.info("EventWatcher connected, capturing events")
backoff = 1.0
recv.capture(limit=None)
except Exception:
logger.exception(
"EventWatcher connection lost, reconnecting in %.1fs",
backoff,
)
time.sleep(backoff)
backoff = min(backoff * 2, max_backoff)

def update_enable_event() -> None:
app.control.enable_events()
try:
app.control.enable_events()
except Exception:
logger.exception("Failed to enable events")

timer = RepeatTimer(10, update_enable_event)
timer.daemon = True
timer.start()

thread = threading.Thread(target=run)
thread = threading.Thread(
target=run, daemon=True, name="celerymon-event-watcher"
)
thread.start()

return store

def __init__(
self, state: celery.events.state.State, buckets: Sequence[float | str]
):
self._state = state
def __init__(self, buckets: Sequence[float | str]):
self._task_names_by_uuid: OrderedDict[str, str] = OrderedDict()

self.upper_bounds = [float(b) for b in buckets]
if self.upper_bounds and self.upper_bounds[-1] != float("inf"):
Expand All @@ -70,13 +90,18 @@ def on_event(self, event: dict[str, Any]):
now = datetime.datetime.now(tz=datetime.UTC)
self.last_received_timestamp = now

self._state.event(event)
event_name: str = event["type"]
if not event_name.startswith("task-"):
return

task: celery.events.Task = self._state.get_or_create_task(event["uuid"])[0]
task_name = task.name or "(UNKNOWN)"
uuid: str = event["uuid"]
if "name" in event:
self._task_names_by_uuid.pop(uuid, None)
self._task_names_by_uuid[uuid] = event["name"]
while len(self._task_names_by_uuid) > self._TASK_NAMES_CACHE_LIMIT:
self._task_names_by_uuid.popitem(last=False)

task_name = self._task_names_by_uuid.get(uuid, "(UNKNOWN)")
self.task_names.add(task_name)

self.last_received_timestamp_per_task_event[(task_name, event_name)] = now
Expand Down
2 changes: 1 addition & 1 deletion celerymon/redis_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ def _format_key(self, queue_name: str, priority: int) -> str:
"""
if not priority:
return queue_name
return "{0}{1}{2}".format(queue_name, PRIORITY_SEP, priority)
return f"{queue_name}{PRIORITY_SEP}{priority}"
7 changes: 3 additions & 4 deletions celerymon/worker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ def _update(self) -> None:
oldest_timestamp: dict[str, datetime.datetime] = dict()
task_count: dict[tuple[str, str], int] = defaultdict(int)

# active(), reserved() and scheduled() have wrong type annotations.
for tasks in (self._inspect.active() or {}).values(): # type: ignore[union-attr]
for tasks in (self._inspect.active() or {}).values():
for task in tasks:
if isinstance(task["time_start"], str):
start_time = datetime.datetime.fromisoformat(task["time_start"])
Expand All @@ -54,10 +53,10 @@ def _update(self) -> None:
oldest_timestamp[task_name] = min(
oldest_timestamp[task_name], start_time
)
for tasks in (self._inspect.reserved() or {}).values(): # type: ignore[union-attr]
for tasks in (self._inspect.reserved() or {}).values():
for task in tasks:
task_count[("reserved", task["type"])] += 1
for scheduled_tasks in (self._inspect.scheduled() or {}).values(): # type: ignore[union-attr]
for scheduled_tasks in (self._inspect.scheduled() or {}).values():
for scheduled_task in scheduled_tasks:
task_count[("scheduled", scheduled_task["request"]["type"])] += 1

Expand Down
17 changes: 14 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
name = "celerymon"
version = "0.1.3"
description = ""
authors = [{ name = "Masaya Suzuki", email = "masaya@aviator.co" }]
dependencies = ["celery[redis]>=5.4.0", "prometheus-client>=0.21.0"]
authors = [
{ name = "Masaya Suzuki", email = "masaya@aviator.co" },
{ name = "Ofer Goldstein", email = "ofer@aviator.co" },
]
dependencies = [
"celery[redis]>=5.6.3",
"prometheus-client>=0.24.1",
]
readme = "README.md"
license = "MIT"
requires-python = ">= 3.13"
Expand All @@ -12,7 +18,12 @@ requires-python = ">= 3.13"
celerymon = 'celerymon.cli:run'

[dependency-groups]
dev = ["celery-types>=0.22.0", "pyright", "types-redis>=4.6.0.6"]
dev = [
"celery-types>=0.26.0",
"mypy>=1.20.0",
"pyright>=1.1.408",
"types-redis>=4.6.0.6",
]

[build-system]
requires = ["hatchling"]
Expand Down
Loading
Loading