Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions hindsight-api-slim/hindsight_api/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -3070,7 +3070,31 @@ async def lifespan(app: FastAPI):
if config.worker_enabled and memory._backend.supports_worker_poller:
from ..config import DEFAULT_DATABASE_SCHEMA

if not config.worker_id:
from ..utils import detect_container_runtime

runtime = detect_container_runtime()
if runtime:
logging.warning(
"\n"
"============================================================\n"
" WARNING: HINDSIGHT_API_WORKER_ID is not set and Hindsight\n"
f" appears to be running inside {runtime}.\n"
"\n"
" The worker id is defaulting to the container hostname,\n"
" which CHANGES every time the container is recreated.\n"
" When that happens, tasks left in 'processing' under the\n"
" old hostname are never recovered — consolidation and other\n"
" async operations can get stuck indefinitely.\n"
"\n"
" Set HINDSIGHT_API_WORKER_ID to a STABLE value (e.g. the\n"
" compose service name or StatefulSet pod name) to avoid this.\n"
"============================================================"
)

worker_id = config.worker_id or socket.gethostname()
worker_id_source = "HINDSIGHT_API_WORKER_ID" if config.worker_id else "hostname (default)"
logging.info(f"Worker id: {worker_id} (source: {worker_id_source})")
# Convert default schema to None for SQL compatibility (no schema prefix)
schema = None if config.database_schema == DEFAULT_DATABASE_SCHEMA else config.database_schema
poller = WorkerPoller(
Expand Down
24 changes: 24 additions & 0 deletions hindsight-api-slim/hindsight_api/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
import os
from urllib.parse import urlparse, urlunparse


def detect_container_runtime() -> str | None:
"""Detect whether the process is running inside a container.

Returns "kubernetes", "docker", or None. Used to warn operators that the
default ``socket.gethostname()`` worker id is unstable across container
recreation (the random container id changes on restart, so tasks stuck in
'processing' under the old id are never recovered).
"""
if os.getenv("KUBERNETES_SERVICE_HOST"):
return "kubernetes"
# Docker (and most OCI runtimes) create this marker file in every container.
if os.path.exists("/.dockerenv"):
return "docker"
# cgroup v1 fallback for runtimes that don't write /.dockerenv.
try:
with open("/proc/1/cgroup", encoding="utf-8") as f:
if any(token in f.read() for token in ("docker", "containerd", "kubepods")):
return "docker"
except OSError:
pass
return None


def mask_network_location(url):
if not url:
return url
Expand Down
44 changes: 44 additions & 0 deletions hindsight-api-slim/tests/test_container_detection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
"""Tests for container-runtime detection used to warn about unstable worker ids."""

import builtins

from hindsight_api.utils import detect_container_runtime


def test_detects_kubernetes_via_env(monkeypatch):
monkeypatch.setenv("KUBERNETES_SERVICE_HOST", "10.0.0.1")
assert detect_container_runtime() == "kubernetes"


def test_detects_docker_via_dockerenv(monkeypatch):
monkeypatch.delenv("KUBERNETES_SERVICE_HOST", raising=False)
monkeypatch.setattr("os.path.exists", lambda p: p == "/.dockerenv")
assert detect_container_runtime() == "docker"


def test_detects_docker_via_cgroup(monkeypatch):
monkeypatch.delenv("KUBERNETES_SERVICE_HOST", raising=False)
monkeypatch.setattr("os.path.exists", lambda p: False)

real_open = builtins.open

def fake_open(path, *args, **kwargs):
if path == "/proc/1/cgroup":
import io

return io.StringIO("12:devices:/docker/abcdef123456\n")
return real_open(path, *args, **kwargs)

monkeypatch.setattr("builtins.open", fake_open)
assert detect_container_runtime() == "docker"


def test_returns_none_when_not_containerized(monkeypatch):
monkeypatch.delenv("KUBERNETES_SERVICE_HOST", raising=False)
monkeypatch.setattr("os.path.exists", lambda p: False)

def fake_open(path, *args, **kwargs):
raise OSError("no such file")

monkeypatch.setattr("builtins.open", fake_open)
assert detect_container_runtime() is None
Loading