From e37fb305188d8bc3b0b01e732081db020d40b8cf Mon Sep 17 00:00:00 2001 From: serversidehannes Date: Mon, 8 Jun 2026 15:24:54 +0200 Subject: [PATCH] feat(admin): Redis-backed cluster-wide stats, chart/log/encryption polish (#47) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Admin dashboard overhaul (issue #47), all six areas plus fixes surfaced while validating against a live ScyllaDB backup e2e. Stats / metrics - New StatsStore abstraction (s3proxy/admin/stats_store.py): MemoryStatsStore (per-pod, single-instance default) and RedisStatsStore (cluster-wide). - Cluster-wide counters are written per-request on every pod, so the dashboard reports the same totals regardless of which replica serves the view (fixes per-pod numbers jumping on every hard refresh under replicaCount > 1). - Request log is a Redis capped list (LPUSH + LTRIM to request_log_cap, default 10k) with a 24h sliding TTL; bounded so it never threatens multipart-upload state in the shared Redis. Paginated /api/logs (offset/limit). Chart / UI - Selectable time ranges (1h/3h/7h/24h/7d) with server-side bucketing (/api/series); throughput split into Encrypted (PUT) / Decrypted (GET) lines via direction sub-tabs (/api/throughput). - Chart polish: larger plot, aspect-ratio wrap (no axis squish), bigger axis text, p98 y-scaling so a single spike doesn't flatten the series. - Absolute timestamps with millisecond precision (relative form as tooltip). - Wide tables wrapped to scroll instead of clipping; long object keys wrap. Encryption status (multipart correctness) - Object detail and the bucket-listing rows now detect encryption with the same logic as the GET path: on-object isec tag, else the multipart sidecar (load_multipart_metadata). Per-object badge in the explorer. - Explorer paginates objects (20/page) so the per-object HEAD fan-out is bounded. - Removed the misleading hard-coded "Encrypted" column from the buckets card. Other fixes - Admin-path requests no longer fall through the S3 catch-all and pollute stats as a phantom "admin" bucket. - chart: HAProxy front proxy binds an unprivileged container port (8080) instead of 80 — the container runs as non-root with all caps dropped, so binding 80 fails (EACCES) and CrashLoops. Service still exposes port 80. - e2e: assert the admin API reports multipart backup objects as encrypted. Config: request_log_cap, request_log_ttl_hours, stats_ttl_hours, stats_series_ttl_hours (+ derived *_seconds). --- chart/templates/frontproxy-configmap.yaml | 2 +- chart/templates/frontproxy-deployment.yaml | 2 +- chart/values.yaml | 4 + e2e/scripts/verify-encryption-k8s.sh | 55 +++ e2e/scylla/test.sh | 4 + s3proxy/admin/collectors.py | 539 ++++++++++---------- s3proxy/admin/router.py | 54 +- s3proxy/admin/stats_store.py | 546 +++++++++++++++++++++ s3proxy/admin/templates.py | 311 +++++++++--- s3proxy/app.py | 8 + s3proxy/config.py | 30 ++ s3proxy/request_handler.py | 24 +- tests/unit/test_admin.py | 298 +++++++++-- tests/unit/test_admin_encryption.py | 103 ++++ tests/unit/test_admin_path_filter.py | 58 +++ tests/unit/test_chart_frontproxy.py | 22 + 16 files changed, 1706 insertions(+), 354 deletions(-) create mode 100644 s3proxy/admin/stats_store.py create mode 100644 tests/unit/test_admin_encryption.py create mode 100644 tests/unit/test_admin_path_filter.py diff --git a/chart/templates/frontproxy-configmap.yaml b/chart/templates/frontproxy-configmap.yaml index 568536e..315d125 100644 --- a/chart/templates/frontproxy-configmap.yaml +++ b/chart/templates/frontproxy-configmap.yaml @@ -22,7 +22,7 @@ data: timeout server {{ .Values.frontproxy.timeouts.server }} frontend http_in - bind *:{{ .Values.frontproxy.service.port }} + bind *:{{ .Values.frontproxy.containerPort }} default_backend s3proxy_pods backend s3proxy_pods diff --git a/chart/templates/frontproxy-deployment.yaml b/chart/templates/frontproxy-deployment.yaml index 615b605..be620d0 100644 --- a/chart/templates/frontproxy-deployment.yaml +++ b/chart/templates/frontproxy-deployment.yaml @@ -26,7 +26,7 @@ spec: imagePullPolicy: {{ .Values.frontproxy.image.pullPolicy }} ports: - name: http - containerPort: {{ .Values.frontproxy.service.port }} + containerPort: {{ .Values.frontproxy.containerPort }} protocol: TCP volumeMounts: - name: config diff --git a/chart/values.yaml b/chart/values.yaml index e1c9726..5144e88 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -133,6 +133,10 @@ frontproxy: service: type: ClusterIP port: 80 + # Port HAProxy binds inside the container. Must be > 1024: the container runs + # as non-root with all capabilities dropped, so it cannot bind a privileged + # port (e.g. 80). The Service still exposes `service.port` and targets this. + containerPort: 8080 # Per-request timeouts must tolerate large S3 transfers. timeouts: client: "1h" diff --git a/e2e/scripts/verify-encryption-k8s.sh b/e2e/scripts/verify-encryption-k8s.sh index 20edecd..4501785 100755 --- a/e2e/scripts/verify-encryption-k8s.sh +++ b/e2e/scripts/verify-encryption-k8s.sh @@ -2,6 +2,61 @@ # Kubernetes wrapper for encryption verification # Source this script and call: verify_encryption +# Assert the admin dashboard reports a MULTIPART object as encrypted (issue #47 +# #6). Multipart objects keep their wrapped DEK in a sidecar, not an on-object +# tag, so the admin API must consult the sidecar. The byte-level verify_encryption +# above can't catch a mislabel in the dashboard — this closes that gap. +# +# verify_admin_encryption +# Picks a real multipart backup object (ETag ending in "-") and asserts the +# admin object-detail API returns "encrypted": true for it. +verify_admin_encryption() { + local BUCKET="$1" + local NAMESPACE="${2:-default}" + + echo "=== Admin Encryption-Status Check (multipart) ===" + + # Find a multipart object via MinIO: a *-big-Data.db SSTable, which Scylla + # uploads as a multipart object (ETag ends in -). Skip sidecars. + local KEY + KEY=$(kubectl run admin-enc-find --namespace minio --rm -i --restart=Never \ + --image=mc:latest --image-pull-policy=Never --command -- /bin/sh -c " + mc alias set m http://minio.minio.svc.cluster.local:9000 minioadmin minioadmin >/dev/null 2>&1 + mc ls -r m/$BUCKET 2>/dev/null | awk '{print \$NF}' \ + | grep -v '^\.s3proxy-internal/' | grep -- '-big-Data.db' | head -1 + " 2>/dev/null | tr -d '\r' | tail -1) + + if [ -z "$KEY" ]; then + echo "✗ No multipart object found to check" + return 1 + fi + echo "Multipart object: $KEY" + + # Query the admin object-detail API from inside an s3proxy pod (which has + # Python + reaches its own admin API on localhost). The {key:path} route + # takes the slashed key verbatim. + local POD + POD=$(kubectl get pod -n "$NAMESPACE" -l app.kubernetes.io/name=s3proxy-python \ + -o jsonpath='{.items[0].metadata.name}' 2>/dev/null) + local RESP + RESP=$(kubectl exec -n "$NAMESPACE" "$POD" -- python -c " +import base64, json, urllib.request +key = '''$KEY''' +url = 'http://localhost:4433/admin/api/objects/$BUCKET/' + key +req = urllib.request.Request(url) +req.add_header('Authorization', 'Basic ' + base64.b64encode(b'admin:admin').decode()) +print(urllib.request.urlopen(req, timeout=15).read().decode()) +" 2>&1) + echo "Admin response: $RESP" + + if echo "$RESP" | grep -q '"encrypted": *true'; then + echo "✓ Admin API reports multipart object encrypted" + return 0 + fi + echo "✗ Admin API reported multipart object as NOT encrypted" + return 1 +} + verify_encryption() { local BUCKET="$1" local PATH_PREFIX="${2:-}" diff --git a/e2e/scylla/test.sh b/e2e/scylla/test.sh index 63aa232..1ffb001 100755 --- a/e2e/scylla/test.sh +++ b/e2e/scylla/test.sh @@ -200,6 +200,10 @@ log_info "Backup snapshot tag: $SNAPSHOT_TAG" # Verify encryption verify_encryption "scylla-backups" "" "$NAMESPACE" || log_warn "Encryption check skipped" +# Verify the admin dashboard correctly reports multipart backup objects as +# encrypted (issue #47 #6 — sidecar-aware detection). +verify_admin_encryption "scylla-backups" "$NAMESPACE" || log_warn "Admin encryption check skipped" + # ============================================================================ # STEP 4: Delete cluster # ============================================================================ diff --git a/s3proxy/admin/collectors.py b/s3proxy/admin/collectors.py index 513b9eb..ed4bddd 100644 --- a/s3proxy/admin/collectors.py +++ b/s3proxy/admin/collectors.py @@ -5,130 +5,22 @@ import hashlib import os import time -from collections import defaultdict, deque -from dataclasses import asdict, dataclass +from collections import defaultdict from typing import TYPE_CHECKING from .. import metrics +from .stats_store import ( + LocalCounters, + RequestSample, + StatsStore, + get_store, +) if TYPE_CHECKING: from ..config import Settings -# --------------------------------------------------------------------------- -# Sliding-window rate tracker over Prometheus counters -# --------------------------------------------------------------------------- - - -class RateTracker: - """Sample counter values on a schedule, then compute deltas over the window.""" - - def __init__(self, window_seconds: int = 3600, max_samples: int = 180): - self._window = window_seconds - self._max_samples = max_samples - self._snapshots: deque[tuple[float, dict[str, float]]] = deque(maxlen=max_samples) - - def record(self, counters: dict[str, float]) -> None: - now = time.monotonic() - self._snapshots.append((now, dict(counters))) - cutoff = now - self._window - while len(self._snapshots) > 2 and self._snapshots[0][0] < cutoff: - self._snapshots.popleft() - - def rate_per_second(self, key: str) -> float: - if len(self._snapshots) < 2: - return 0.0 - t0, v0 = self._snapshots[0] - t1, v1 = self._snapshots[-1] - elapsed = t1 - t0 - if elapsed < 0.5: - return 0.0 - delta = v1.get(key, 0.0) - v0.get(key, 0.0) - return max(0.0, delta / elapsed) - - def total(self, key: str) -> float: - if not self._snapshots: - return 0.0 - _, v0 = self._snapshots[0] - _, v1 = self._snapshots[-1] - return max(0.0, v1.get(key, 0.0) - v0.get(key, 0.0)) - - def sparkline(self, key: str, points: int = 30) -> list[float]: - """Return per-bucket deltas suitable for a sparkline.""" - _, values = self.sparkline_series(key, points) - return values - - def sparkline_series(self, key: str, points: int = 30) -> tuple[list[float], list[float]]: - """Return (wall_clock_timestamps, per-bucket deltas) in parallel lists.""" - if len(self._snapshots) < 2: - return [], [] - snaps = list(self._snapshots) - # Map monotonic samples → wall-clock by using the current offset. - mono_now = time.monotonic() - wall_now = time.time() - offset = wall_now - mono_now - pairs: list[tuple[float, float]] = [] - for prev, curr in zip(snaps, snaps[1:], strict=False): - elapsed = curr[0] - prev[0] - if elapsed <= 0: - continue - pairs.append( - (curr[0] + offset, max(0.0, curr[1].get(key, 0.0) - prev[1].get(key, 0.0))) - ) - if len(pairs) > points: - step = len(pairs) / points - pairs = [pairs[int(i * step)] for i in range(points)] - times = [round(p[0], 3) for p in pairs] - values = [round(p[1], 2) for p in pairs] - return times, values - - def earliest_value(self, key: str) -> float | None: - if not self._snapshots: - return None - return self._snapshots[0][1].get(key, 0.0) - - -_rate_tracker = RateTracker() - - -# --------------------------------------------------------------------------- -# Request log — ring buffer for the activity feed -# --------------------------------------------------------------------------- - - -@dataclass(slots=True, frozen=True) -class RequestEntry: - timestamp: float - method: str - operation: str - bucket: str - key: str - status: int - duration_ms: float - size: int - client_ip: str - - -class RequestLog: - def __init__(self, maxlen: int = 200): - self._entries: deque[RequestEntry] = deque(maxlen=maxlen) - - def record(self, entry: RequestEntry) -> None: - self._entries.append(entry) - - def recent(self, limit: int = 10) -> list[dict]: - entries = list(self._entries) - entries.reverse() - return [asdict(e) for e in entries[:limit]] - - def all(self) -> list[RequestEntry]: - return list(self._entries) - - -_request_log = RequestLog(maxlen=200) - - -def record_request( +async def record_request( method: str, path: str, operation: str, @@ -137,10 +29,13 @@ def record_request( size: int, client_ip: str = "", ) -> None: - """Append a completed request to the ring buffer.""" + """Append a completed request to the stats store (no-op if unset).""" + store = get_store() + if store is None: + return bucket, key = _split_bucket_key(path) - _request_log.record( - RequestEntry( + await store.record( + RequestSample( timestamp=time.time(), method=method, operation=operation, @@ -219,20 +114,42 @@ def _read_method_breakdown() -> dict[str, float]: return out -def _latency_percentiles() -> dict[str, float]: - """Approximate p50/p95/p99 by walking the histogram cumulative buckets.""" - buckets: list[tuple[float, float]] = [] - total = 0.0 +def _read_latency_buckets() -> dict[str, float]: + """Read the Prometheus histogram as a {le-string: cumulative count} map. + + Includes the synthetic "+Inf" bucket (== total observations). Matches the + shape the Redis latency hash stores so percentiles can be computed from + either source. + """ + out: dict[str, float] = {} for sample in metrics.REQUEST_DURATION.collect()[0].samples: - if sample.name.endswith("_bucket"): - le = sample.labels.get("le", "") - if le == "+Inf": - total = sample.value - else: - try: - buckets.append((float(le), sample.value)) - except ValueError: - continue + if not sample.name.endswith("_bucket"): + continue + le = sample.labels.get("le", "") + out[le] = out.get(le, 0.0) + sample.value + return out + + +def _latency_percentiles(cumulative: dict[str, float] | None = None) -> dict[str, float]: + """Approximate p50/p95/p99 by walking the histogram cumulative buckets. + + ``cumulative`` is a {le-string: cumulative count} map. When None, reads the + per-pod Prometheus histogram. + """ + if cumulative is None: + cumulative = _read_latency_buckets() + + buckets: list[tuple[float, float]] = [] + total = float(cumulative.get("+Inf", 0.0)) + for le, count in cumulative.items(): + if le == "+Inf": + continue + try: + buckets.append((float(le), float(count))) + except ValueError, TypeError: + continue + if total < 1 and buckets: + total = max(c for _, c in buckets) if total < 1 or not buckets: return {"p50_ms": 0.0, "p95_ms": 0.0, "p99_ms": 0.0, "count": 0} buckets.sort(key=lambda b: b[0]) @@ -252,6 +169,19 @@ def _pct(p: float) -> float: } +def _local_counters() -> LocalCounters: + """Snapshot this pod's Prometheus-derived counters for delta-syncing.""" + return LocalCounters( + requests=_read_labeled_counter_sum(metrics.REQUEST_COUNT), + errors=_read_errors_total(), + bytes_encrypted=_read_counter(metrics.BYTES_ENCRYPTED), + bytes_decrypted=_read_counter(metrics.BYTES_DECRYPTED), + methods=_read_method_breakdown(), + errors_by_class=_read_error_breakdown(), + latency_buckets=_read_latency_buckets(), + ) + + # --------------------------------------------------------------------------- # Formatters # --------------------------------------------------------------------------- @@ -294,6 +224,12 @@ def _format_relative(ts: float, now: float | None = None) -> str: return f"{int(delta // 86400)}d ago" +def _format_absolute(ts: float) -> str: + """Render a Unix epoch as a local 'YYYY-MM-DD HH:MM:SS.mmm' timestamp.""" + ms = int((ts - int(ts)) * 1000) + return f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(ts))}.{ms:03d}" + + def _format_size(n: int) -> str: if n <= 0: return "—" @@ -306,20 +242,21 @@ def _format_size(n: int) -> str: # --------------------------------------------------------------------------- -def _derive_buckets(entries: list[RequestEntry]) -> list[dict]: +def _derive_buckets(entries: list[dict]) -> list[dict]: by_bucket: dict[str, dict] = defaultdict( lambda: {"objects": set(), "bytes": 0, "last_seen": 0.0} ) for e in entries: - if not e.bucket: + bucket = e.get("bucket") + if not bucket: continue - info = by_bucket[e.bucket] - if e.key: - info["objects"].add(e.key) - if e.size > 0: - info["bytes"] += e.size - if e.timestamp > info["last_seen"]: - info["last_seen"] = e.timestamp + info = by_bucket[bucket] + if e.get("key"): + info["objects"].add(e["key"]) + if e.get("size", 0) > 0: + info["bytes"] += e["size"] + if e.get("timestamp", 0.0) > info["last_seen"]: + info["last_seen"] = e["timestamp"] out: list[dict] = [] for name, info in by_bucket.items(): @@ -327,7 +264,6 @@ def _derive_buckets(entries: list[RequestEntry]) -> list[dict]: out.append( { "name": name, - "encrypted": True, "objects": len(info["objects"]), "size": f"{num} {unit}" if info["bytes"] > 0 else "—", "last_seen": info["last_seen"], @@ -359,36 +295,58 @@ def _derive_keys(settings: Settings) -> list[dict]: # --------------------------------------------------------------------------- -def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") -> dict: - """Gather everything the dashboard renders in a single JSON blob.""" +async def collect_all( + store: StatsStore, + settings: Settings, + start_time: float, + version: str = "1.0.0", + range_key: str = "live", +) -> dict: + """Gather everything the dashboard renders in a single JSON blob. + + Reads cluster-wide aggregates from the Redis store when available, else + falls back to this pod's local Prometheus counters. + """ now = time.time() uptime_s = max(0.0, time.monotonic() - start_time) - total_requests = _read_labeled_counter_sum(metrics.REQUEST_COUNT) - bytes_encrypted = _read_counter(metrics.BYTES_ENCRYPTED) - bytes_decrypted = _read_counter(metrics.BYTES_DECRYPTED) - errors_total = _read_errors_total() - - counters = { - "requests": total_requests, - "bytes_crypto": bytes_encrypted + bytes_decrypted, - "errors": errors_total, - } - _rate_tracker.record(counters) - - req_rate = _rate_tracker.rate_per_second("requests") - crypto_rate = _rate_tracker.rate_per_second("bytes_crypto") + # Fold this pod's local Prometheus deltas into the shared store, then read + # the cluster-wide aggregate. MemoryStatsStore.aggregate() returns None, + # so single-instance mode keeps reading Prometheus directly below. + local = _local_counters() + await store.sync_local(local) + agg = await store.aggregate() + + if agg is not None: + total_requests = agg.requests + bytes_encrypted = agg.bytes_encrypted + bytes_decrypted = agg.bytes_decrypted + errors_total = agg.errors + method_breakdown = agg.methods + error_breakdown = agg.errors_by_class or {"4xx": 0.0, "5xx": 0.0, "503": 0.0} + latency = _latency_percentiles(agg.latency_buckets) + else: + total_requests = local.requests + bytes_encrypted = local.bytes_encrypted + bytes_decrypted = local.bytes_decrypted + errors_total = local.errors + method_breakdown = local.methods + error_breakdown = local.errors_by_class + latency = _latency_percentiles(local.latency_buckets) + + req_times, req_values = await store.series("requests", range_key) + crypto_times, crypto_values = await store.series("crypto", range_key) + err_times, err_values = await store.series("errors", range_key) + + req_rate = (req_values[-1] / 60.0) if req_values else 0.0 + crypto_rate = (crypto_values[-1] / 60.0) if crypto_values else 0.0 num_enc, unit_enc = _format_bytes(bytes_encrypted) num_thr, unit_thr = _format_bytes(crypto_rate) - req_times, req_values = _rate_tracker.sparkline_series("requests") - crypto_times, crypto_values = _rate_tracker.sparkline_series("bytes_crypto") - err_times, err_values = _rate_tracker.sparkline_series("errors") - - entries = _request_log.all() - buckets = _derive_buckets(entries) - last_error_ts = next((e.timestamp for e in reversed(entries) if e.status >= 400), None) + activity = await store.recent(10) + buckets = _derive_buckets(activity) + last_error_ts = next((e["timestamp"] for e in activity if e["status"] >= 400), None) return { "header": { @@ -397,6 +355,8 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - "uptime": _format_uptime(uptime_s), "pod": os.environ.get("HOSTNAME", "local"), "version": version, + "cluster_wide": store.cluster_wide, + "range": range_key, }, "cards": { "requests": { @@ -405,10 +365,10 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - "unit": "", "spark": req_values, "spark_times": req_times, - "y_label": "req / sample", + "y_label": "req / bucket", "breakdown": [ {"label": m, "value": f"{int(v):,}", "weight": float(v)} - for m, v in sorted(_read_method_breakdown().items(), key=lambda kv: -kv[1]) + for m, v in sorted(method_breakdown.items(), key=lambda kv: -kv[1]) if v > 0 ], }, @@ -418,7 +378,7 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - "unit": unit_enc, "spark": crypto_values, "spark_times": crypto_times, - "y_label": "bytes / sample", + "y_label": "bytes / bucket", "breakdown": [ { "label": "Encrypted (PUT)", @@ -438,17 +398,17 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - "unit": "", "spark": err_values, "spark_times": err_times, - "y_label": "errors / sample", + "y_label": "errors / bucket", "breakdown": [ {"label": k, "value": f"{int(v):,}", "weight": float(v)} - for k, v in _read_error_breakdown().items() + for k, v in error_breakdown.items() ], }, "active_buckets": { "label": "Active Buckets", "value": str(len(buckets)), "unit": "", - "detail": f"seen in last {len(entries)} reqs", + "detail": f"seen in last {len(activity)} reqs", "breakdown": [ { "label": b["name"], @@ -459,10 +419,12 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - ], }, }, - "latency": _latency_percentiles(), + "latency": latency, "activity": [ { - "time": _format_relative(e["timestamp"], now), + "time": _format_absolute(e["timestamp"]), + "time_relative": _format_relative(e["timestamp"], now), + "timestamp": e["timestamp"], "operation": _operation_display(e["method"], e["operation"]), "bucket": e["bucket"] or "—", "object": e["key"] or "—", @@ -472,12 +434,11 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - "client_ip": e["client_ip"] or "—", "latency": f"{e['duration_ms']:.0f} ms", } - for e in _request_log.recent(10) + for e in activity ], "buckets": [ { "name": b["name"], - "encrypted": b["encrypted"], "objects": f"{b['objects']:,}", "size": b["size"], } @@ -488,11 +449,30 @@ def collect_all(settings: Settings, start_time: float, version: str = "1.0.0") - "version": version, "req_per_s": f"{req_rate:.0f}", "throughput": f"{num_thr} {unit_thr}/s" if crypto_rate > 0 else f"0 {unit_thr}/s", - "last_error": _format_relative(last_error_ts, now) if last_error_ts else "never", + "last_error": _format_absolute(last_error_ts) if last_error_ts else "never", }, } +async def collect_series(store: StatsStore, metric: str, range_key: str) -> dict: + """Return a single metric's time-series for the chart range selector.""" + times, values = await store.series(metric, range_key) + return {"metric": metric, "range": range_key, "spark_times": times, "spark": values} + + +async def collect_throughput(store: StatsStore, range_key: str) -> dict: + """Return PUT (encrypted) and GET (decrypted) byte series as two lines.""" + put_times, put_values = await store.series("bytes_put", range_key) + get_times, get_values = await store.series("bytes_get", range_key) + return { + "range": range_key, + "series": [ + {"label": "Encrypted (PUT)", "spark_times": put_times, "spark": put_values}, + {"label": "Decrypted (GET)", "spark_times": get_times, "spark": get_values}, + ], + } + + def _operation_display(method: str, operation: str) -> str: """Shorten operation names for the feed (GET, PUT, DELETE, etc.).""" return method or operation @@ -509,13 +489,19 @@ async def list_bucket_objects( bucket: str, prefix: str = "", delimiter: str | None = "/", - max_keys: int = 500, + max_keys: int = 1000, + offset: int = 0, + page_size: int = 20, ) -> dict: """List a "directory" in a bucket using ListObjectsV2 with a delimiter. When delimiter is "/" (default), the response is split into sub-prefixes (folders) and objects at the current level — the standard S3-console file-explorer shape. + + Objects are paginated client-side (offset/page_size). The expensive + per-object encryption HEAD fan-out runs only on the current page slice, so a + huge folder stays cheap (≈page_size HEADs, not one per object in the bucket). """ from ..client import S3Client, S3Credentials @@ -532,95 +518,113 @@ async def list_bucket_objects( max_keys=max_keys, ) - prefix_len = len(prefix) - folders: list[dict] = [] - for cp in result.get("CommonPrefixes", []) or []: - full = cp.get("Prefix", "") - name = full[prefix_len:].rstrip("/") - folders.append({"prefix": full, "name": name}) - - objects: list[dict] = [] - for o in result.get("Contents", []) or []: - full = o.get("Key", "") - # Skip the "directory marker" object that some tools create at the prefix itself - if full == prefix: - continue - size = int(o.get("Size", 0)) - lm = o.get("LastModified") - objects.append( - { - "key": full, - "name": full[prefix_len:], - "size": size, - "size_h": _format_size(size), - "last_modified": lm.isoformat() if lm else "", - "etag": (o.get("ETag") or "").strip('"'), - } - ) + prefix_len = len(prefix) + folders: list[dict] = [] + for cp in result.get("CommonPrefixes", []) or []: + full = cp.get("Prefix", "") + name = full[prefix_len:].rstrip("/") + folders.append({"prefix": full, "name": name}) + + all_objects: list[dict] = [] + for o in result.get("Contents", []) or []: + full = o.get("Key", "") + # Skip the "directory marker" object that some tools create at the prefix itself + if full == prefix: + continue + size = int(o.get("Size", 0)) + lm = o.get("LastModified") + all_objects.append( + { + "key": full, + "name": full[prefix_len:], + "size": size, + "size_h": _format_size(size), + "last_modified": lm.isoformat() if lm else "", + "etag": (o.get("ETag") or "").strip('"'), + } + ) + + total_objects = len(all_objects) + objects = all_objects[offset : offset + page_size] + + # Per-object encryption status, the same check the object-detail view uses + # (on-object isec tag, else multipart sidecar). Run only on the current + # page (≈page_size HEADs). These admin HEADs use this S3Client directly + # (not the proxy), so they don't pollute the dashboard stats. + await _annotate_encryption(settings, client, bucket, objects) + return { "bucket": bucket, "prefix": prefix, "delimiter": delimiter or "", "folders": folders, "objects": objects, - "count": len(folders) + len(objects), + "offset": offset, + "page_size": page_size, + "total_objects": total_objects, + "has_more": offset + len(objects) < total_objects, + "count": len(folders) + total_objects, "is_truncated": bool(result.get("IsTruncated", False)), } -def list_logs( - limit: int = 200, +async def _annotate_encryption(settings, client, bucket: str, objects: list[dict]) -> None: + """Set obj['encrypted'] for each listed object using the GET-path detection.""" + import asyncio + + sem = asyncio.Semaphore(32) + + async def check(obj: dict) -> None: + async with sem: + try: + md = await client.head_object(bucket, obj["key"]) + except Exception: + obj["encrypted"] = None # couldn't determine + return + user_md = dict(md.get("Metadata") or {}) + has_tag = user_md.get(settings.dektag_name) is not None + # `or` short-circuits: skip the sidecar lookup when an on-object tag is present. + obj["encrypted"] = has_tag or await _has_multipart_sidecar(client, bucket, obj["key"]) + + if objects: + await asyncio.gather(*(check(o) for o in objects)) + + +async def list_logs( + store: StatsStore, + limit: int = 50, + offset: int = 0, query: str = "", operation: str = "", status: str = "", ) -> dict: - """Return filtered request-log entries for the /logs view.""" + """Return a filtered, paginated page of request-log entries for /logs.""" now = time.time() - q = (query or "").strip().lower() - op_filter = (operation or "").strip().upper() - status_filter = (status or "").strip().lower() - - entries = list(_request_log.all()) - entries.reverse() - out: list[dict] = [] - for e in entries: - if op_filter and (e.method or e.operation).upper() != op_filter: - continue - if status_filter: - is_err = e.status >= 400 - if status_filter == "success" and is_err: - continue - if status_filter == "error" and not is_err: - continue - if q: - blob = f"{e.bucket} {e.key} {e.client_ip} {e.method} {e.operation} {e.status}".lower() - if q not in blob: - continue - out.append( - { - "time": _format_relative(e.timestamp, now), - "timestamp": e.timestamp, - "operation": _operation_display(e.method, e.operation), - "bucket": e.bucket or "", - "object": e.key or "", - "status": "Success" if e.status < 400 else "Error", - "status_code": e.status, - "size": _format_size(e.size), - "client_ip": e.client_ip or "", - "latency": f"{e.duration_ms:.0f} ms", - } - ) - if len(out) >= limit: - break - - all_ops = sorted( - {(e.method or e.operation) for e in _request_log.all() if e.method or e.operation} - ) + page = await store.page(offset, limit, query, operation, status) + entries = [ + { + "time": _format_absolute(e["timestamp"]), + "time_relative": _format_relative(e["timestamp"], now), + "timestamp": e["timestamp"], + "operation": _operation_display(e["method"], e["operation"]), + "bucket": e["bucket"] or "", + "object": e["key"] or "", + "status": "Success" if e["status"] < 400 else "Error", + "status_code": e["status"], + "size": _format_size(e["size"]), + "client_ip": e["client_ip"] or "", + "latency": f"{e['duration_ms']:.0f} ms", + } + for e in page["entries"] + ] return { - "count": len(out), - "total": len(_request_log.all()), - "entries": out, - "operations": all_ops, + "count": page["count"], + "offset": page["offset"], + "limit": page["limit"], + "total": page["total"], + "has_more": page["has_more"], + "entries": entries, + "operations": page["operations"], } @@ -641,11 +645,20 @@ async def head_object_detail( async with S3Client(settings, creds) as client: md = await client.head_object(bucket, key) - user_metadata = dict(md.get("Metadata") or {}) - # Redact the binary envelope (encrypted DEK) — it's opaque to humans. - isec = user_metadata.pop(settings.dektag_name, None) - if isec is not None: - user_metadata["_encrypted_dek"] = f"<{len(isec)} bytes>" + user_metadata = dict(md.get("Metadata") or {}) + # Redact the binary envelope (encrypted DEK) — it's opaque to humans. + isec = user_metadata.pop(settings.dektag_name, None) + if isec is not None: + user_metadata["_encrypted_dek"] = f"<{len(isec)} bytes>" + enc_via = "metadata" + elif await _has_multipart_sidecar(client, bucket, key): + # Multipart objects store the wrapped DEK in a sidecar object, not an + # on-object tag — the create-time metadata doesn't survive + # CompleteMultipartUpload. Consult the sidecar before concluding + # "not encrypted" (mirrors the GET read path). + enc_via = "sidecar" + else: + enc_via = "" lm = md.get("LastModified") return { @@ -657,5 +670,19 @@ async def head_object_detail( "etag": (md.get("ETag") or "").strip('"'), "last_modified": lm.isoformat() if lm else "", "metadata": user_metadata, - "encrypted": isec is not None, + "encrypted": bool(enc_via), + "encryption_source": enc_via, } + + +async def _has_multipart_sidecar(client, bucket: str, key: str) -> bool: + """True if the object has an s3proxy multipart-metadata sidecar (= encrypted). + + Reuses the GET path's unified lookup so detection matches decryption exactly. + """ + from ..state.metadata import load_multipart_metadata + + try: + return await load_multipart_metadata(client, bucket, key) is not None + except Exception: + return False diff --git a/s3proxy/admin/router.py b/s3proxy/admin/router.py index f8f00e0..8c3ad8b 100644 --- a/s3proxy/admin/router.py +++ b/s3proxy/admin/router.py @@ -19,7 +19,15 @@ make_verify_html, set_session_cookie, ) -from .collectors import collect_all, head_object_detail, list_bucket_objects, list_logs +from .collectors import ( + collect_all, + collect_series, + collect_throughput, + head_object_detail, + list_bucket_objects, + list_logs, +) +from .stats_store import DEFAULT_RANGE, RANGE_SPECS from .templates import render_dashboard, render_login if TYPE_CHECKING: @@ -80,19 +88,43 @@ async def logout() -> RedirectResponse: async def dashboard() -> HTMLResponse: return HTMLResponse(render_dashboard(admin_path=settings.admin_path)) + def _range(request: Request) -> str: + r = request.query_params.get("range", DEFAULT_RANGE) + return r if r in RANGE_SPECS else DEFAULT_RANGE + @router.get("/api/status", dependencies=[Depends(verify_api)]) async def status_api(request: Request) -> JSONResponse: - data = collect_all( + data = await collect_all( + request.app.state.stats_store, request.app.state.settings, request.app.state.start_time, version=version, + range_key=_range(request), ) return JSONResponse(data) + @router.get("/api/series", dependencies=[Depends(verify_api)]) + async def series_api( + request: Request, metric: str = "requests", range: str = DEFAULT_RANGE + ) -> JSONResponse: + allowed = ("requests", "crypto", "errors", "bytes_put", "bytes_get") + metric = metric if metric in allowed else "requests" + range_key = range if range in RANGE_SPECS else DEFAULT_RANGE + data = await collect_series(request.app.state.stats_store, metric, range_key) + return JSONResponse(data) + + @router.get("/api/throughput", dependencies=[Depends(verify_api)]) + async def throughput_api(request: Request, range: str = DEFAULT_RANGE) -> JSONResponse: + range_key = range if range in RANGE_SPECS else DEFAULT_RANGE + data = await collect_throughput(request.app.state.stats_store, range_key) + return JSONResponse(data) + @router.get("/api/stream", dependencies=[Depends(verify_api)]) async def status_stream(request: Request) -> StreamingResponse: """Push status updates via SSE — only emits when the payload changes.""" + range_key = _range(request) + async def event_gen(): last_payload: str | None = None last_heartbeat = time.monotonic() @@ -100,10 +132,12 @@ async def event_gen(): while True: if await request.is_disconnected(): return - data = collect_all( + data = await collect_all( + request.app.state.stats_store, request.app.state.settings, request.app.state.start_time, version=version, + range_key=range_key, ) payload = _json.dumps(data) now = time.monotonic() @@ -130,13 +164,17 @@ async def event_gen(): @router.get("/api/logs", dependencies=[Depends(verify_api)]) async def logs_api( + request: Request, q: str = "", operation: str = "", status: str = "", - limit: int = 200, + limit: int = 50, + offset: int = 0, ) -> JSONResponse: - data = list_logs( + data = await list_logs( + request.app.state.stats_store, limit=min(max(limit, 1), 500), + offset=max(offset, 0), query=q, operation=operation, status=status, @@ -148,7 +186,9 @@ async def list_bucket( bucket: str, prefix: str = "", delimiter: str = "/", - limit: int = 500, + limit: int = 1000, + offset: int = 0, + page_size: int = 20, ) -> JSONResponse: try: data = await list_bucket_objects( @@ -158,6 +198,8 @@ async def list_bucket( prefix=prefix, delimiter=delimiter or None, max_keys=min(limit, 1000), + offset=max(offset, 0), + page_size=min(max(page_size, 1), 100), ) except ClientError as exc: code = exc.response.get("Error", {}).get("Code", "Error") diff --git a/s3proxy/admin/stats_store.py b/s3proxy/admin/stats_store.py new file mode 100644 index 0000000..490076c --- /dev/null +++ b/s3proxy/admin/stats_store.py @@ -0,0 +1,546 @@ +"""Stats store backends for the admin dashboard. + +Mirrors the Strategy Pattern in ``state/storage.py``: a ``StatsStore`` ABC with +an in-memory implementation (per-pod, the single-instance default) and a +Redis-backed implementation that makes the dashboard numbers cluster-wide. + +The Redis path keeps everything under the ``s3proxy:stats:`` prefix with sliding +TTLs and a hard entry cap so it never threatens the multipart-upload state that +shares the same (``maxmemory 200mb, noeviction``) Redis. +""" + +from __future__ import annotations + +import os +import time +from abc import ABC, abstractmethod +from collections import deque +from dataclasses import asdict, dataclass +from typing import TYPE_CHECKING + +import orjson +import structlog +from structlog.stdlib import BoundLogger + +if TYPE_CHECKING: + from redis.asyncio import Redis + + from ..config import Settings + +logger: BoundLogger = structlog.get_logger(__name__) + +STATS_PREFIX = "s3proxy:stats:" + +# Latency histogram bucket upper-bounds — mirror metrics.REQUEST_DURATION so the +# percentile walk is identical whether reading Prometheus or Redis. +LATENCY_BUCKETS = (0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0) + +# Range -> (window_seconds, bucket_seconds). Bucket sizes chosen so each range +# downsamples to ~60-120 points and stays readable. +RANGE_SPECS: dict[str, tuple[int, int]] = { + "1h": (3600, 60), + "3h": (3 * 3600, 120), + "7h": (7 * 3600, 300), + "24h": (24 * 3600, 900), + "7d": (7 * 86400, 7200), +} +DEFAULT_RANGE = "1h" + + +@dataclass(slots=True, frozen=True) +class RequestSample: + """A completed request, the unit written to the request log + counters.""" + + timestamp: float + method: str + operation: str + bucket: str + key: str + status: int + duration_ms: float + size: int + client_ip: str + + +# A snapshot of this pod's local Prometheus-derived counters, used by the Redis +# store to compute deltas to fold into the shared counters. +@dataclass(slots=True, frozen=True) +class LocalCounters: + requests: float + errors: float + bytes_encrypted: float + bytes_decrypted: float + methods: dict[str, float] + errors_by_class: dict[str, float] + latency_buckets: dict[str, float] # le-string -> cumulative count, plus "count" + + +def _latency_bucket_le(duration_seconds: float) -> str: + for le in LATENCY_BUCKETS: + if duration_seconds <= le: + return str(le) + return "+Inf" + + +def bucket_series( + points: list[tuple[int, float]], + window_seconds: int, + bucket_seconds: int, + now: float | None = None, +) -> tuple[list[float], list[float]]: + """Downsample ``(minute_epoch, value)`` pairs into a fixed bucket grid. + + Returns parallel ``(times, values)`` lists with one entry per bucket across + the window, zero-filling empty buckets so the chart axis stays continuous. + """ + now = now if now is not None else time.time() + end = int(now // bucket_seconds) * bucket_seconds + start = end - window_seconds + bucket_seconds + n = window_seconds // bucket_seconds + times = [float(start + i * bucket_seconds) for i in range(n)] + values = [0.0] * n + for minute_ts, value in points: + if minute_ts < start or minute_ts > end: + continue + idx = int((minute_ts - start) // bucket_seconds) + if 0 <= idx < n: + values[idx] += value + return times, [round(v, 2) for v in values] + + +# --------------------------------------------------------------------------- +# In-memory rate tracker + request log (single-instance / per-pod path) +# --------------------------------------------------------------------------- + + +class RateTracker: + """Sample counter values on a schedule, then compute deltas over the window.""" + + def __init__(self, window_seconds: int = 3600, max_samples: int = 180): + self._window = window_seconds + self._max_samples = max_samples + self._snapshots: deque[tuple[float, dict[str, float]]] = deque(maxlen=max_samples) + + def record(self, counters: dict[str, float]) -> None: + now = time.monotonic() + self._snapshots.append((now, dict(counters))) + cutoff = now - self._window + while len(self._snapshots) > 2 and self._snapshots[0][0] < cutoff: + self._snapshots.popleft() + + def rate_per_second(self, key: str) -> float: + if len(self._snapshots) < 2: + return 0.0 + t0, v0 = self._snapshots[0] + t1, v1 = self._snapshots[-1] + elapsed = t1 - t0 + if elapsed < 0.5: + return 0.0 + delta = v1.get(key, 0.0) - v0.get(key, 0.0) + return max(0.0, delta / elapsed) + + def sparkline_series(self, key: str, points: int = 30) -> tuple[list[float], list[float]]: + """Return (wall_clock_timestamps, per-bucket deltas) in parallel lists.""" + if len(self._snapshots) < 2: + return [], [] + snaps = list(self._snapshots) + mono_now = time.monotonic() + wall_now = time.time() + offset = wall_now - mono_now + pairs: list[tuple[float, float]] = [] + for prev, curr in zip(snaps, snaps[1:], strict=False): + elapsed = curr[0] - prev[0] + if elapsed <= 0: + continue + pairs.append( + (curr[0] + offset, max(0.0, curr[1].get(key, 0.0) - prev[1].get(key, 0.0))) + ) + if len(pairs) > points: + step = len(pairs) / points + pairs = [pairs[int(i * step)] for i in range(points)] + times = [round(p[0], 3) for p in pairs] + values = [round(p[1], 2) for p in pairs] + return times, values + + +class RequestLog: + def __init__(self, maxlen: int = 10000): + self._entries: deque[RequestSample] = deque(maxlen=maxlen) + + def record(self, entry: RequestSample) -> None: + self._entries.append(entry) + + def all(self) -> list[RequestSample]: + return list(self._entries) + + +# --------------------------------------------------------------------------- +# Aggregates returned to the collector +# --------------------------------------------------------------------------- + + +@dataclass(slots=True) +class StatsAggregate: + """Cluster-wide totals + breakdowns the dashboard cards render.""" + + requests: float + errors: float + bytes_encrypted: float + bytes_decrypted: float + methods: dict[str, float] + errors_by_class: dict[str, float] + latency_buckets: dict[str, float] + + +class StatsStore(ABC): + """Abstract dashboard stats backend.""" + + cluster_wide: bool = False + + @abstractmethod + async def record(self, sample: RequestSample) -> None: + """Append a completed request to the log and bump counters.""" + ... + + @abstractmethod + async def sync_local(self, local: LocalCounters) -> None: + """Fold this pod's local Prometheus deltas into the shared store. + + No-op for the in-memory store (it reads Prometheus directly). + """ + ... + + @abstractmethod + async def aggregate(self) -> StatsAggregate | None: + """Return cluster-wide totals/breakdowns, or None to use the local path.""" + ... + + @abstractmethod + async def series(self, metric: str, range_key: str) -> tuple[list[float], list[float]]: + """Return (times, values) for a metric over the requested range.""" + ... + + @abstractmethod + async def recent(self, limit: int) -> list[dict]: + """Return the most recent N request entries (newest first) as dicts.""" + ... + + @abstractmethod + async def page(self, offset: int, limit: int, query: str, operation: str, status: str) -> dict: + """Return a filtered, paginated slice of the request log.""" + ... + + +class MemoryStatsStore(StatsStore): + """Per-pod in-memory store — the single-instance default (today's behavior).""" + + cluster_wide = False + + def __init__(self, settings: Settings): + self._rate = RateTracker() + self._log = RequestLog(maxlen=max(200, settings.request_log_cap)) + + @property + def rate(self) -> RateTracker: + return self._rate + + async def record(self, sample: RequestSample) -> None: + self._log.record(sample) + + async def sync_local(self, local: LocalCounters) -> None: + # In-memory path reads Prometheus directly in collect_all; just feed the + # rate tracker so live sparklines work. + self._rate.record( + { + "requests": local.requests, + "bytes_crypto": local.bytes_encrypted + local.bytes_decrypted, + "errors": local.errors, + } + ) + + async def aggregate(self) -> StatsAggregate | None: + return None # signal collect_all to use the Prometheus/local path + + async def series(self, metric: str, range_key: str) -> tuple[list[float], list[float]]: + # Memory mode is single-pod; per-direction byte series aren't tracked + # separately, so they fall back to the combined crypto sparkline. + key = { + "requests": "requests", + "crypto": "bytes_crypto", + "errors": "errors", + "bytes_put": "bytes_crypto", + "bytes_get": "bytes_crypto", + }.get(metric, "requests") + return self._rate.sparkline_series(key) + + async def recent(self, limit: int) -> list[dict]: + entries = self._log.all() + entries.reverse() + return [asdict(e) for e in entries[:limit]] + + async def page(self, offset: int, limit: int, query: str, operation: str, status: str) -> dict: + entries = self._log.all() + entries.reverse() + return _filter_and_paginate(entries, offset, limit, query, operation, status) + + +class RedisStatsStore(StatsStore): + """Redis-backed cluster-wide store. Degrades gracefully on Redis errors.""" + + cluster_wide = True + + def __init__(self, client: Redis, settings: Settings): + self._client = client + self._log_cap = settings.request_log_cap + self._log_ttl = settings.request_log_ttl_seconds + self._stats_ttl = settings.stats_ttl_seconds + self._series_ttl = settings.stats_series_ttl_seconds + self._pod = os.environ.get("HOSTNAME", "local") + + def _k(self, name: str) -> str: + return f"{STATS_PREFIX}{name}" + + async def record(self, sample: RequestSample) -> None: + try: + entry = orjson.dumps(asdict(sample)) + minute = str(int(sample.timestamp // 60) * 60) + is_err = sample.status >= 400 + async with self._client.pipeline(transaction=False) as pipe: + pipe.lpush(self._k("reqlog"), entry) + pipe.ltrim(self._k("reqlog"), 0, self._log_cap - 1) + pipe.expire(self._k("reqlog"), self._log_ttl) + # Per-request cluster-wide counters — incremented on every pod for + # every request, so the dashboard totals are correct regardless of + # which pod serves the admin view (no per-pod Prometheus sampling). + c = self._k("counters") + pipe.hincrby(c, "requests", 1) + pipe.expire(c, self._stats_ttl) + pipe.hincrby(self._k("methods"), sample.method or "?", 1) + pipe.expire(self._k("methods"), self._stats_ttl) + pipe.hincrby(self._k("ts:requests"), minute, 1) + pipe.expire(self._k("ts:requests"), self._series_ttl) + # Latency histogram (same bucket bounds as Prometheus) so p50/p95/p99 + # are cluster-wide, not just the serving pod's. + lat = self._k("latency") + pipe.hincrby(lat, _latency_bucket_le(sample.duration_ms / 1000.0), 1) + pipe.hincrby(lat, "+Inf", 1) + pipe.expire(lat, self._stats_ttl) + # Bytes moved, bucketed for the throughput charts. PUT bodies are + # encrypted on the way in; GET bodies decrypted on the way out. + # Combined series feeds the "Data Encrypted" card; per-direction + # series feed the throughput chart's two lines. + if sample.size > 0 and sample.method in ("PUT", "POST"): + pipe.hincrbyfloat(c, "bytes_encrypted", sample.size) + pipe.hincrbyfloat(self._k("ts:crypto"), minute, sample.size) + pipe.hincrbyfloat(self._k("ts:bytes_put"), minute, sample.size) + pipe.expire(self._k("ts:crypto"), self._series_ttl) + pipe.expire(self._k("ts:bytes_put"), self._series_ttl) + elif sample.size > 0 and sample.method == "GET": + pipe.hincrbyfloat(c, "bytes_decrypted", sample.size) + pipe.hincrbyfloat(self._k("ts:crypto"), minute, sample.size) + pipe.hincrbyfloat(self._k("ts:bytes_get"), minute, sample.size) + pipe.expire(self._k("ts:crypto"), self._series_ttl) + pipe.expire(self._k("ts:bytes_get"), self._series_ttl) + if is_err: + pipe.hincrby(c, "errors", 1) + pipe.hincrby(self._k("ts:errors"), minute, 1) + pipe.expire(self._k("ts:errors"), self._series_ttl) + eb = self._k("errors") + sc = str(sample.status) + if sc == "503": + pipe.hincrby(eb, "503", 1) + pipe.hincrby(eb, "5xx", 1) + elif sc.startswith("5"): + pipe.hincrby(eb, "5xx", 1) + elif sc.startswith("4"): + pipe.hincrby(eb, "4xx", 1) + pipe.expire(eb, self._stats_ttl) + await pipe.execute() + except Exception as exc: # never break the request path on a stats write + logger.warning("STATS_RECORD_FAILED", error=str(exc)) + + async def sync_local(self, local: LocalCounters) -> None: + # No-op: all cluster-wide stats are written per-request in record() on + # every pod, so there's no per-pod Prometheus delta to fold in. (Kept on + # the interface for the in-memory store, which uses it for live sparklines.) + return + + async def aggregate(self) -> StatsAggregate | None: + try: + async with self._client.pipeline(transaction=False) as pipe: + pipe.hgetall(self._k("counters")) + pipe.hgetall(self._k("methods")) + pipe.hgetall(self._k("errors")) + pipe.hgetall(self._k("latency")) + counters, methods, errors, latency = await pipe.execute() + return StatsAggregate( + requests=_hfloat(counters, b"requests"), + errors=_hfloat(counters, b"errors"), + bytes_encrypted=_hfloat(counters, b"bytes_encrypted"), + bytes_decrypted=_hfloat(counters, b"bytes_decrypted"), + methods=_decode_float_map(methods), + errors_by_class=_decode_float_map(errors), + latency_buckets=_decode_float_map(latency), + ) + except Exception as exc: + logger.warning("STATS_AGGREGATE_FAILED", error=str(exc)) + return None + + async def series(self, metric: str, range_key: str) -> tuple[list[float], list[float]]: + window, bucket = RANGE_SPECS.get(range_key, RANGE_SPECS[DEFAULT_RANGE]) + hkey = self._k(f"ts:{metric}") + try: + raw = await self._client.hgetall(hkey) + except Exception as exc: + logger.warning("STATS_SERIES_FAILED", error=str(exc)) + return [], [] + points: list[tuple[int, float]] = [] + for k, v in raw.items(): + try: + points.append((int(k), float(v))) + except ValueError, TypeError: + continue + return bucket_series(points, window, bucket) + + async def recent(self, limit: int) -> list[dict]: + try: + raw = await self._client.lrange(self._k("reqlog"), 0, limit - 1) + except Exception as exc: + logger.warning("STATS_RECENT_FAILED", error=str(exc)) + return [] + return [_loads_entry(r) for r in raw if r is not None] + + async def page(self, offset: int, limit: int, query: str, operation: str, status: str) -> dict: + try: + total = await self._client.llen(self._k("reqlog")) + # Fetch a generous window so post-filtering can still fill a page. + raw = await self._client.lrange(self._k("reqlog"), 0, max(offset + limit * 5, 1000) - 1) + except Exception as exc: + logger.warning("STATS_PAGE_FAILED", error=str(exc)) + return { + "entries": [], + "count": 0, + "offset": offset, + "limit": limit, + "total": 0, + "has_more": False, + "operations": [], + } + samples = [_loads_sample(r) for r in raw if r is not None] + samples = [s for s in samples if s is not None] + out = _filter_and_paginate(samples, offset, limit, query, operation, status) + # Unfiltered: report the full list length (entries may exist beyond the + # fetched window). Filtered: keep the matched count from the window so + # pagination over the filtered set is correct. + if not (query or operation or status): + out["total"] = int(total) + out["has_more"] = offset + out["count"] < int(total) + return out + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _hfloat(h: dict, field: bytes) -> float: + v = h.get(field) + if v is None: + return 0.0 + try: + return float(v) + except ValueError, TypeError: + return 0.0 + + +def _decode_float_map(h: dict) -> dict[str, float]: + out: dict[str, float] = {} + for k, v in h.items(): + key = k.decode() if isinstance(k, bytes) else str(k) + try: + out[key] = float(v) + except ValueError, TypeError: + continue + return out + + +def _loads_entry(raw: bytes) -> dict: + return orjson.loads(raw) + + +def _loads_sample(raw: bytes) -> RequestSample | None: + try: + d = orjson.loads(raw) + return RequestSample(**d) + except ValueError, TypeError: + return None + + +def _filter_and_paginate( + samples: list[RequestSample], + offset: int, + limit: int, + query: str, + operation: str, + status: str, +) -> dict: + q = (query or "").strip().lower() + op_filter = (operation or "").strip().upper() + status_filter = (status or "").strip().lower() + + matched: list[RequestSample] = [] + for e in samples: + if op_filter and (e.method or e.operation).upper() != op_filter: + continue + if status_filter: + is_err = e.status >= 400 + if status_filter == "success" and is_err: + continue + if status_filter == "error" and not is_err: + continue + if q: + blob = f"{e.bucket} {e.key} {e.client_ip} {e.method} {e.operation} {e.status}".lower() + if q not in blob: + continue + matched.append(e) + + page = matched[offset : offset + limit] + all_ops = sorted({(e.method or e.operation) for e in samples if e.method or e.operation}) + return { + "entries": [asdict(e) for e in page], + "count": len(page), + "offset": offset, + "limit": limit, + "total": len(matched), + "has_more": offset + len(page) < len(matched), + "operations": all_ops, + } + + +# --------------------------------------------------------------------------- +# Factory + module-global store (used by the synchronous record_request path) +# --------------------------------------------------------------------------- + + +_store: StatsStore | None = None + + +def create_stats_store(settings: Settings) -> StatsStore: + """Build the appropriate stats store based on Redis configuration. + + Call AFTER init_redis(). Mirrors state.redis.create_state_store(). + """ + from ..state.redis import _redis_client, _use_redis + + if _use_redis and _redis_client is not None: + return RedisStatsStore(_redis_client, settings) + return MemoryStatsStore(settings) + + +def set_store(store: StatsStore) -> None: + global _store + _store = store + + +def get_store() -> StatsStore | None: + return _store diff --git a/s3proxy/admin/templates.py b/s3proxy/admin/templates.py index 0fcd28b..21e11fb 100644 --- a/s3proxy/admin/templates.py +++ b/s3proxy/admin/templates.py @@ -274,6 +274,21 @@ font-size: 12px; color: var(--text-muted); font-variant-numeric: tabular-nums; } + .range-tabs { + display: inline-flex; gap: 2px; + background: var(--surface-2, rgba(127,127,127,0.08)); + border: 1px solid var(--border); + border-radius: 8px; padding: 2px; + } + .range-tab { + border: 0; background: transparent; cursor: pointer; + font: inherit; font-size: 12px; color: var(--text-muted); + padding: 3px 10px; border-radius: 6px; + } + .range-tab:hover { color: var(--text); } + .range-tab.active { + background: var(--accent, #2563eb); color: #fff; + } .chart-wrap { position: relative; width: 100%; @@ -284,7 +299,11 @@ padding: 14px 14px 10px 14px; } .chart-wrap--big { - height: 420px; + /* Match the SVG viewBox aspect (900x360) so preserveAspectRatio="none" + scales uniformly — no axis squish/stretch — and hover maths stay exact. */ + height: auto; + aspect-ratio: 900 / 360; + max-height: 460px; padding: 18px 18px 14px 18px; } .chart-wrap svg { @@ -296,13 +315,13 @@ .chart-wrap .axis path { stroke: var(--border-strong); fill: none; } .chart-wrap .axis text { fill: var(--text-muted); - font-size: 10px; + font-size: 15px; font-variant-numeric: tabular-nums; font-family: inherit; } .chart-wrap .axis-label { fill: var(--text-subtle); - font-size: 10px; + font-size: 15px; letter-spacing: 0.04em; text-transform: uppercase; } @@ -395,7 +414,7 @@ } .metric-grid { display: grid; - grid-template-columns: minmax(0, 1.9fr) minmax(0, 1fr); + grid-template-columns: minmax(0, 3fr) minmax(0, 0.9fr); gap: 28px; align-items: start; } @@ -472,6 +491,7 @@ .back-link:hover { text-decoration: underline; } /* ---- Tables ---- */ + .scroll-x { width: 100%; overflow-x: auto; } table { width: 100%; border-collapse: collapse; } th, td { text-align: left; padding: 10px 8px; @@ -516,7 +536,7 @@ } /* ---- Detail views ---- */ - .detail-sub { color: var(--text-muted); font-size: 13px; margin-top: 2px; } + .detail-sub { color: var(--text-muted); font-size: 13px; margin-top: 2px; word-break: break-all; overflow-wrap: anywhere; } .empty-state { color: var(--text-muted); font-size: 13px; padding: 24px; text-align: center; @@ -609,6 +629,18 @@ box-shadow: 0 0 0 3px rgba(17,24,39,0.06); } .logs-count { color: var(--text-muted); font-size: 12px; margin-left: auto; } + .logs-pager { + display: flex; align-items: center; justify-content: center; + gap: 14px; margin-top: 14px; + } + .pager-btn { + border: 1px solid var(--border); background: var(--surface); + color: var(--text); font: inherit; font-size: 13px; + padding: 5px 14px; border-radius: 7px; cursor: pointer; + } + .pager-btn:hover:not(:disabled) { border-color: var(--border-strong); } + .pager-btn:disabled { opacity: 0.4; cursor: default; } + .pager-status { color: var(--text-muted); font-size: 12px; font-variant-numeric: tabular-nums; } /* ---- Footer ---- */ .footer { @@ -670,6 +702,7 @@ View all logs → +
@@ -681,6 +714,7 @@
No requests yet — traffic will appear here.
+
@@ -690,10 +724,10 @@ - + - +
NameEncryptionObjectsSize
NameObjects (seen)Size
No buckets observed yet.
No buckets observed yet.
@@ -730,14 +764,20 @@ Name + Encryption Size Last Modified - Loading… + Loading… +
+ + + +
@@ -766,6 +806,7 @@ +
@@ -777,6 +818,12 @@
Loading…
+
+
+ + + +
@@ -815,10 +862,21 @@
Over time + + + + + + + +  
- +
@@ -862,7 +920,14 @@ const API_BUCKET = "__BUCKET_URL__"; // expects /bucket appended const API_OBJECT = "__OBJECT_URL__"; // expects /bucket/key appended const API_LOGS = "__LOGS_URL__"; + const API_SERIES = "__SERIES_URL__"; const VIEW_POLL_MS = 2000; + // Map metric-card key -> /api/series metric name. + const SERIES_METRIC = {requests: "requests", data_encrypted: "crypto", errors: "errors"}; + let CHART_RANGE = "1h"; + let DATA_DIRECTION = "put"; // for the Data Encrypted metric: put|get + let M_UNIT = ""; + let M_YLABEL = ""; const $ = (id) => document.getElementById(id); function setText(id, v) { const el = $(id); if (el) el.textContent = v; } @@ -951,8 +1016,8 @@ if (!svg || !wrap || !tip) return; svg.innerHTML = ""; - const W = 600, H = 260; - const PAD_L = 52, PAD_R = 14, PAD_T = 16, PAD_B = 30; + const W = 900, H = 360; + const PAD_L = 82, PAD_R = 28, PAD_T = 34, PAD_B = 54; const plotW = W - PAD_L - PAD_R; const plotH = H - PAD_T - PAD_B; @@ -975,7 +1040,7 @@ msg.setAttribute("y", H / 2); msg.setAttribute("text-anchor", "middle"); msg.setAttribute("fill", "#9ca3af"); - msg.setAttribute("font-size", "12"); + msg.setAttribute("font-size", "16"); msg.textContent = "Collecting data… (points appear as traffic flows)"; svg.appendChild(msg); CHART_STATE[prefix] = null; @@ -983,9 +1048,14 @@ return; } - // Y scale with nice ticks + // Y scale with nice ticks. Scale to a robust max (p98) so a single early + // spike doesn't squash the rest of the series flat against the baseline; + // the rare outlier is allowed to clip slightly above the top gridline. const rawMax = Math.max(...vals, 1); - const {ticks, niceMax} = niceTicks(rawMax, 5); + const sorted = vals.slice().sort((a, b) => a - b); + const p98 = sorted[Math.min(sorted.length - 1, Math.floor(sorted.length * 0.98))] || 0; + const scaleMax = Math.max(1, p98 > 0 ? p98 : rawMax); + const {ticks, niceMax} = niceTicks(scaleMax, 5); // X scale over time const tMin = ts[0]; @@ -993,7 +1063,9 @@ const tSpan = Math.max(1, tMax - tMin); const xFor = (t) => PAD_L + ((t - tMin) / tSpan) * plotW; - const yFor = (v) => PAD_T + plotH - (v / niceMax) * plotH; + // Clamp to the plot top so an outlier scaled past niceMax doesn't draw + // above the chart area. + const yFor = (v) => Math.max(PAD_T, PAD_T + plotH - (v / niceMax) * plotH); // Grid + Y axis labels const gGrid = document.createElementNS(SVG_NS, "g"); @@ -1008,8 +1080,8 @@ line.setAttribute("y1", y); line.setAttribute("y2", y); gGrid.appendChild(line); const lbl = document.createElementNS(SVG_NS, "text"); - lbl.setAttribute("x", PAD_L - 8); - lbl.setAttribute("y", y + 3); + lbl.setAttribute("x", PAD_L - 12); + lbl.setAttribute("y", y + 5); lbl.setAttribute("text-anchor", "end"); lbl.textContent = formatNumber(tv); gAxis.appendChild(lbl); @@ -1021,8 +1093,9 @@ xLine.setAttribute("y1", PAD_T + plotH); xLine.setAttribute("y2", PAD_T + plotH); gAxis.appendChild(xLine); - // X labels: pick ~5 positions spread across the range, at real sample indexes - const desired = 5; + // X labels: pick positions spread across the range, at real sample + // indexes. Wider chart fits more labels without crowding. + const desired = Math.min(8, Math.max(2, n)); const xIdxs = []; for (let i = 0; i < desired; i++) { const idx = Math.round((i * (n - 1)) / (desired - 1)); @@ -1034,7 +1107,7 @@ const lbl = document.createElementNS(SVG_NS, "text"); const anchor = i === 0 ? "start" : (i === xIdxs.length - 1 ? "end" : "middle"); lbl.setAttribute("x", x); - lbl.setAttribute("y", PAD_T + plotH + 16); + lbl.setAttribute("y", PAD_T + plotH + 22); lbl.setAttribute("text-anchor", anchor); lbl.textContent = formatTime(ts[idx]); gAxis.appendChild(lbl); @@ -1051,7 +1124,7 @@ const yl = document.createElementNS(SVG_NS, "text"); yl.setAttribute("class", "axis-label"); yl.setAttribute("x", PAD_L); - yl.setAttribute("y", PAD_T - 6); + yl.setAttribute("y", PAD_T - 12); yl.textContent = yLabel; svg.appendChild(yl); } @@ -1188,15 +1261,60 @@ if (route.view === "object") await loadObject(route.bucket, route.object); if (route.view === "logs") await loadLogs(); if (route.view === "metric") { + // Each metric view starts on the 1h range, PUT direction. + DATA_DIRECTION = "put"; + setDirection("put"); + CHART_FP.m = null; + BREAKDOWN_FP.m = null; if (LAST_STATUS && LAST_STATUS.cards[route.metric]) { - // Force a redraw on view switch so the chart paints into its now-visible SVG. - CHART_FP.m = null; - BREAKDOWN_FP.m = null; renderMetric(route.metric, LAST_STATUS.cards[route.metric]); } else { await refresh(); } + await loadRange("1h"); // draws the chart from /api/series into the now-visible SVG + } + } + + function setDirection(dir) { + DATA_DIRECTION = dir; + const tabs = $("m-direction"); + if (tabs) { + for (const b of tabs.querySelectorAll(".range-tab")) { + b.classList.toggle("active", b.dataset.dir === dir); + } + } + } + + function setRange(range) { + CHART_RANGE = range; + const tabs = $("m-range"); + if (tabs) { + for (const b of tabs.querySelectorAll(".range-tab")) { + b.classList.toggle("active", b.dataset.range === range); + } + } + } + + // Fetch the metric's series for the selected range from Redis and draw it. + // The Data Encrypted metric splits into Encrypted (PUT) / Decrypted (GET) + // sub-tabs, each backed by its own per-direction byte series. + async function loadRange(range) { + setRange(range); + if (currentRoute.view !== "metric") return; + const metricKey = currentRoute.metric; + let metric = SERIES_METRIC[metricKey] || "requests"; + if (metricKey === "data_encrypted") { + metric = (DATA_DIRECTION === "get") ? "bytes_get" : "bytes_put"; } + try { + const params = new URLSearchParams({metric, range}); + const r = await fetch(API_SERIES + "?" + params.toString(), {credentials: "same-origin"}); + if (r.status === 401) { location.href = "__LOGIN_URL__"; return; } + if (!r.ok) return; + const d = await r.json(); + CHART_FP.m = null; + drawChart("m", d.spark || [], d.spark_times || [], M_UNIT, M_YLABEL); + } catch (e) { /* leave chart as-is */ } } function gotoDashboard() { location.hash = ""; } @@ -1261,17 +1379,18 @@ setText("mv-value", card.value); setText("mv-unit", card.unit || ""); setText("mv-delta", card.detail || ""); - setText("m-charttitle", card.label + " over time"); setText("m-breakdown-title", BREAKDOWN_TITLES[key] || "Breakdown"); - - // Full-axis chart: skip redraw if data identical, or if user is hovering. - if (card.spark !== undefined) { - const chartFp = JSON.stringify([card.spark, card.spark_times, card.unit, card.y_label]); - if (CHART_FP.m !== chartFp && !CHART_HOVER.m) { - CHART_FP.m = chartFp; - drawChart("m", card.spark, card.spark_times || [], card.unit || "", card.y_label || ""); - } - } + M_UNIT = card.unit || ""; + M_YLABEL = card.y_label || ""; + + // Data Encrypted splits into Encrypted (PUT) / Decrypted (GET) direction + // sub-tabs; other metrics hide them. + const dirTabs = $("m-direction"); + if (dirTabs) dirTabs.style.display = (key === "data_encrypted") ? "" : "none"; + const dirLabel = (key === "data_encrypted") + ? (DATA_DIRECTION === "get" ? "Data Decrypted" : "Data Encrypted") + : card.label; + setText("m-charttitle", dirLabel + " over time"); // Breakdown list (proportional bars). if (card.breakdown) { @@ -1306,7 +1425,7 @@ : `${escapeHtml(r.object)}`; return ` - ${escapeHtml(r.time)} + ${escapeHtml(r.time)} ${escapeHtml(r.operation)} ${bucketCell} ${objectCell} @@ -1325,14 +1444,12 @@ if (SECTION_FP.buckets === fp) return; SECTION_FP.buckets = fp; if (!rows || rows.length === 0) { - tbody.innerHTML = 'No buckets observed yet.'; + tbody.innerHTML = 'No buckets observed yet.'; return; } - const lock = ''; tbody.innerHTML = rows.map(b => ` ${escapeHtml(b.name)} - ${lock}${b.encrypted ? "Encrypted" : "Not Encrypted"} ${escapeHtml(b.objects)} ${escapeHtml(b.size)} @@ -1409,6 +1526,13 @@ // ------------------- Bucket detail (list-style) ------------------- const FOLDER_ICON = ''; const FILE_ICON = ''; + const LOCK_ICON = ''; + + function encBadge(enc) { + if (enc === true) return `${LOCK_ICON}Encrypted`; + if (enc === false) return `Not Encrypted`; + return ''; // unknown / not checked + } function renderCrumbs(bucket, prefix) { const crumbs = $("bv-crumbs"); @@ -1442,41 +1566,56 @@ } catch { return iso; } } + const BUCKET_PAGE = 20; + let BUCKET_OFFSET = 0; + async function loadBucket(bucket, prefix) { prefix = prefix || ""; setText("bv-title", bucket); renderCrumbs(bucket, prefix); const tbody = $("bv-body"); const fpKey = bucket + "|" + prefix; - // Only show "Loading" if we're coming from a different bucket/prefix. + // Reset to the first object page when the bucket/prefix changes. if (BUCKET_FP._last !== fpKey) { BUCKET_FP._last = fpKey; - tbody.innerHTML = 'Loading…'; + BUCKET_OFFSET = 0; + tbody.innerHTML = 'Loading…'; BUCKET_FP[fpKey] = null; } try { const url = API_BUCKET + "/" + encodeURIComponent(bucket) + - "?prefix=" + encodeURIComponent(prefix) + "&delimiter=/"; + "?prefix=" + encodeURIComponent(prefix) + "&delimiter=/" + + "&offset=" + BUCKET_OFFSET + "&page_size=" + BUCKET_PAGE; const r = await fetch(url, {credentials:"same-origin"}); if (r.status === 401) { location.href = "__LOGIN_URL__"; return; } if (!r.ok) { - tbody.innerHTML = `Failed to load: ${r.status}`; + tbody.innerHTML = `Failed to load: ${r.status}`; return; } const d = await r.json(); + const totalObj = d.total_objects != null ? d.total_objects : d.objects.length; + const from = totalObj === 0 ? 0 : d.offset + 1; + const to = d.offset + d.objects.length; setText( "bv-sub", `${d.folders.length} folder${d.folders.length === 1 ? "" : "s"}, ` + - `${d.objects.length} object${d.objects.length === 1 ? "" : "s"}` + - (d.is_truncated ? " (truncated — showing first 500)" : "") + `${totalObj} object${totalObj === 1 ? "" : "s"}` + + (totalObj > d.objects.length ? ` (showing ${from}–${to})` : "") + + (d.is_truncated ? " · bucket truncated at 1000 keys" : "") ); - const fp = JSON.stringify([d.folders, d.objects, d.is_truncated]); + // Pager controls + const prevBtn = $("bv-prev"), nextBtn = $("bv-next"); + if (prevBtn) prevBtn.disabled = d.offset <= 0; + if (nextBtn) nextBtn.disabled = !d.has_more; + setText("bv-page", totalObj === 0 ? "—" : `${from}–${to} of ${totalObj}`); + + const fp = JSON.stringify([d.folders, d.objects, d.offset]); if (BUCKET_FP[fpKey] === fp) return; BUCKET_FP[fpKey] = fp; const total = d.folders.length + d.objects.length; if (total === 0) { - tbody.innerHTML = 'Empty folder.'; + tbody.innerHTML = 'Empty folder.'; return; } const rows = []; @@ -1488,6 +1627,7 @@ ${FOLDER_ICON} ${escapeHtml((f.name || f.prefix) + "/")} + — — — `); @@ -1500,6 +1640,7 @@ ${FILE_ICON} ${escapeHtml(o.name || o.key)} + ${encBadge(o.encrypted)} ${escapeHtml(o.size_h || "—")} ${escapeHtml(formatIsoShort(o.last_modified))} `); @@ -1511,11 +1652,17 @@ } // ------------------- Logs view ------------------- + const LOGS_LIMIT = 50; + let LOGS_OFFSET = 0; + async function loadLogs() { const q = $("lv-q").value; const op = $("lv-op").value; const stt = $("lv-status").value; - const params = new URLSearchParams({q, operation: op, status: stt, limit: "200"}); + const params = new URLSearchParams({ + q, operation: op, status: stt, + limit: String(LOGS_LIMIT), offset: String(LOGS_OFFSET), + }); try { const r = await fetch(API_LOGS + "?" + params.toString(), {credentials: "same-origin"}); if (r.status === 401) { location.href = "__LOGIN_URL__"; return; } @@ -1533,10 +1680,16 @@ } } opSel.value = currentOp; - setText("lv-count", `${d.count} of ${d.total} entries`); - setText("lv-sub", `${d.total} entries in ring buffer`); + const from = d.total === 0 ? 0 : d.offset + 1; + const to = d.offset + d.count; + setText("lv-count", `${from}–${to} of ${d.total} entries`); + setText("lv-sub", `${d.total} entries (24h, capped)`); + setText("lv-page", `${from}–${to} of ${d.total}`); + const prevBtn = $("lv-prev"), nextBtn = $("lv-next"); + if (prevBtn) prevBtn.disabled = d.offset <= 0; + if (nextBtn) nextBtn.disabled = !d.has_more; const tbody = $("lv-body"); - const fp = JSON.stringify(d.entries || []); + const fp = JSON.stringify([d.entries || [], d.offset]); if (SECTION_FP.logs === fp) return; SECTION_FP.logs = fp; if (!d.entries.length) { @@ -1554,7 +1707,7 @@ : ''; return ` - ${escapeHtml(r.time)} + ${escapeHtml(r.time)} ${escapeHtml(r.operation)} ${bucketCell} ${objectCell} @@ -1591,13 +1744,15 @@ ["Content-Type", d.content_type || "—"], ["ETag", d.etag || "—"], ["Last Modified", d.last_modified || "—"], - ["Encrypted", d.encrypted ? "Yes (AES-256-GCM)" : "No"], + ["Encrypted", d.encrypted + ? ("Yes (AES-256-GCM" + (d.encryption_source === "sidecar" ? ", multipart sidecar" : "") + ")") + : "No"], ]; for (const [k, v] of Object.entries(d.metadata || {})) { rows.push(["x-amz-meta-" + k, v]); } tbody.innerHTML = rows.map(([k, v]) => - `${escapeHtml(k)}${escapeHtml(v)}` + `${escapeHtml(k)}${escapeHtml(v)}` ).join(""); } catch (e) { tbody.innerHTML = 'Network error.'; @@ -1610,6 +1765,24 @@ mWrap.addEventListener("mousemove", (e) => handleChartHover("m", e)); mWrap.addEventListener("mouseleave", () => handleChartLeave("m")); } + const mRange = $("m-range"); + if (mRange) { + mRange.addEventListener("click", (e) => { + const btn = e.target.closest(".range-tab"); + if (btn) loadRange(btn.dataset.range); + }); + } + const mDir = $("m-direction"); + if (mDir) { + mDir.addEventListener("click", (e) => { + const btn = e.target.closest(".range-tab"); + if (!btn) return; + setDirection(btn.dataset.dir); + const card = LAST_STATUS && LAST_STATUS.cards.data_encrypted; + if (card) renderMetric("data_encrypted", card); // updates title + loadRange(CHART_RANGE); // redraw with the selected direction's series + }); + } document.querySelectorAll("[data-goto]").forEach(el => { el.addEventListener("click", (e) => { e.preventDefault(); gotoDashboard(); }); }); @@ -1620,20 +1793,39 @@ }); window.addEventListener("hashchange", navigateFromHash); - // Logs toolbar: debounced search + filter changes + // Logs toolbar: debounced search + filter changes. Any filter change resets + // pagination to the first page. let _logsDebounce = 0; + function reloadLogsFromStart() { LOGS_OFFSET = 0; loadLogs(); } function onLogsFilterChange() { clearTimeout(_logsDebounce); - _logsDebounce = setTimeout(loadLogs, 150); + _logsDebounce = setTimeout(reloadLogsFromStart, 150); } $("lv-q").addEventListener("input", onLogsFilterChange); - $("lv-op").addEventListener("change", loadLogs); - $("lv-status").addEventListener("change", loadLogs); + $("lv-op").addEventListener("change", reloadLogsFromStart); + $("lv-status").addEventListener("change", reloadLogsFromStart); + $("lv-prev").addEventListener("click", () => { + LOGS_OFFSET = Math.max(0, LOGS_OFFSET - LOGS_LIMIT); + loadLogs(); + }); + $("lv-next").addEventListener("click", () => { + LOGS_OFFSET += LOGS_LIMIT; + loadLogs(); + }); + $("bv-prev").addEventListener("click", () => { + BUCKET_OFFSET = Math.max(0, BUCKET_OFFSET - BUCKET_PAGE); + loadBucket(currentRoute.bucket, currentRoute.prefix || ""); + }); + $("bv-next").addEventListener("click", () => { + BUCKET_OFFSET += BUCKET_PAGE; + loadBucket(currentRoute.bucket, currentRoute.prefix || ""); + }); // SSE pushes header/footer/cards/activity/buckets/keys on change. - // The active bucket/logs view still polls at a low rate since those aren't streamed. + // The logs view polls at a low rate (cheap, genuinely live). The bucket + // listing is NOT polled — it does a per-object encryption HEAD fan-out, so it + // refreshes only on navigation or the Refresh button. async function viewTick() { - if (currentRoute.view === "bucket") await loadBucket(currentRoute.bucket, currentRoute.prefix || ""); if (currentRoute.view === "logs") await loadLogs(); } @@ -1701,6 +1893,7 @@ def render_dashboard(admin_path: str = "/admin") -> str: html = html.replace("__BUCKET_URL__", f"{prefix}/api/buckets") html = html.replace("__OBJECT_URL__", f"{prefix}/api/objects") html = html.replace("__LOGS_URL__", f"{prefix}/api/logs") + html = html.replace("__SERIES_URL__", f"{prefix}/api/series") html = html.replace("__LOGIN_URL__", f"{prefix}/login") html = html.replace("__LOGOUT_URL__", f"{prefix}/logout") html = html.replace( diff --git a/s3proxy/app.py b/s3proxy/app.py index 67b12cb..e64a512 100644 --- a/s3proxy/app.py +++ b/s3proxy/app.py @@ -64,6 +64,13 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: ttl_seconds=settings.redis_upload_ttl_seconds, ) + # Admin stats store — Redis-backed (cluster-wide) when Redis is + # configured, else per-pod in-memory. Mirrors create_state_store(). + from .admin.stats_store import create_stats_store, set_store + + stats_store = create_stats_store(settings) + set_store(stats_store) # used by the synchronous record_request path + # Create handler and verifier with properly initialized manager verifier = SigV4Verifier(credentials_store) handler = S3ProxyHandler(settings, credentials_store, multipart_manager) @@ -72,6 +79,7 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: app.state.settings = settings app.state.handler = handler app.state.verifier = verifier + app.state.stats_store = stats_store app.state.start_time = time.monotonic() yield diff --git a/s3proxy/config.py b/s3proxy/config.py index 0d9e24e..1c28435 100644 --- a/s3proxy/config.py +++ b/s3proxy/config.py @@ -71,6 +71,21 @@ class Settings(BaseSettings): default=24, description="TTL for upload state in Redis (hours)" ) + # Admin stats store (Redis-backed cluster-wide dashboard state). Only used + # when Redis is configured; single-instance mode keeps per-pod in-memory. + request_log_cap: int = Field( + default=10000, description="Max request-log entries kept in the Redis capped list" + ) + request_log_ttl_hours: int = Field( + default=24, description="TTL for the request log in Redis (hours)" + ) + stats_ttl_hours: int = Field( + default=24, description="TTL for shared counters/breakdowns in Redis (hours)" + ) + stats_series_ttl_hours: int = Field( + default=168, description="TTL for the per-minute time-series in Redis (hours, 7d default)" + ) + # Logging log_level: str = Field(default="INFO", description="Log level (DEBUG, INFO, WARNING, ERROR)") @@ -127,3 +142,18 @@ def s3_endpoint(self) -> str: def redis_upload_ttl_seconds(self) -> int: """Get Redis upload TTL in seconds.""" return self.redis_upload_ttl_hours * 3600 + + @property + def request_log_ttl_seconds(self) -> int: + """Get request-log TTL in seconds.""" + return self.request_log_ttl_hours * 3600 + + @property + def stats_ttl_seconds(self) -> int: + """Get shared-counters TTL in seconds.""" + return self.stats_ttl_hours * 3600 + + @property + def stats_series_ttl_seconds(self) -> int: + """Get per-minute time-series TTL in seconds.""" + return self.stats_series_ttl_hours * 3600 diff --git a/s3proxy/request_handler.py b/s3proxy/request_handler.py index 4e322c6..09f291f 100644 --- a/s3proxy/request_handler.py +++ b/s3proxy/request_handler.py @@ -33,6 +33,23 @@ STREAMING_PAYLOAD_PREFIX = "STREAMING-" +def _is_admin_path(request: Request, path: str) -> bool: + """True if the path targets the admin dashboard (so it's excluded from stats). + + The admin router is mounted at settings.admin_path; requests there (including a + bare prefix with no trailing slash that misses the router and falls through to + the S3 catch-all) must not be recorded as proxied S3 traffic. + """ + settings = getattr(request.app.state, "settings", None) + if settings is None or getattr(settings, "admin_ui", False) is not True: + return False + prefix = getattr(settings, "admin_path", "") + if not isinstance(prefix, str) or not prefix: + return False + prefix = prefix.rstrip("/") + return path == prefix or path.startswith(prefix + "/") + + def _needs_body_for_signature(headers: dict[str, str], max_size: int) -> bool: """Check if body is needed for signature verification. @@ -140,7 +157,12 @@ async def handle_proxy_request( except ValueError: size = 0 client_ip = request.client.host if request.client else "" - record_request(method, path, operation, status_code, duration, size, client_ip) + # Don't record dashboard requests in the dashboard's own stats. A bare + # "/admin" (no trailing slash) doesn't match the mounted admin router and + # falls through to this S3 catch-all, where it would otherwise be logged + # as a phantom "admin" bucket. + if not _is_admin_path(request, path): + await record_request(method, path, operation, status_code, duration, size, client_ip) if reserved_memory > 0: await concurrency.release_memory(reserved_memory) diff --git a/tests/unit/test_admin.py b/tests/unit/test_admin.py index 6e8e0ef..175af96 100644 --- a/tests/unit/test_admin.py +++ b/tests/unit/test_admin.py @@ -11,22 +11,17 @@ from s3proxy.admin import collectors, record_request from s3proxy.admin.auth import AdminCredentials, create_auth_dependency from s3proxy.admin.router import create_admin_router +from s3proxy.admin.stats_store import ( + MemoryStatsStore, + RedisStatsStore, + RequestSample, + bucket_series, + set_store, +) from s3proxy.admin.templates import render_dashboard from s3proxy.config import Settings -def _reset_collector_state() -> None: - collectors._request_log._entries.clear() - collectors._rate_tracker._snapshots.clear() - - -@pytest.fixture(autouse=True) -def _clean_state(): - _reset_collector_state() - yield - _reset_collector_state() - - @pytest.fixture def admin_settings(): return Settings( @@ -39,9 +34,34 @@ def admin_settings(): ) -def test_record_request_splits_bucket_and_key() -> None: - record_request("GET", "/my-bucket/path/to/file.txt", "GetObject", 200, 0.042, 1024, "10.0.0.1") - entries = collectors._request_log.all() +@pytest.fixture +def mem_store(admin_settings): + """A fresh per-pod in-memory store, registered as the global record target.""" + store = MemoryStatsStore(admin_settings) + set_store(store) + yield store + set_store(None) + + +@pytest.fixture +def redis_store(admin_settings, mock_redis): + """A Redis-backed store on fakeredis, registered as the global target.""" + store = RedisStatsStore(mock_redis, admin_settings) + set_store(store) + yield store + set_store(None) + + +# --------------------------------------------------------------------------- +# record_request + collect_all (in-memory / per-pod path) +# --------------------------------------------------------------------------- + + +async def test_record_request_splits_bucket_and_key(mem_store) -> None: + await record_request( + "GET", "/my-bucket/path/to/file.txt", "GetObject", 200, 0.042, 1024, "10.0.0.1" + ) + entries = mem_store._log.all() assert len(entries) == 1 e = entries[0] assert e.bucket == "my-bucket" @@ -51,15 +71,20 @@ def test_record_request_splits_bucket_and_key() -> None: assert e.client_ip == "10.0.0.1" -def test_collect_all_builds_expected_sections(admin_settings) -> None: - record_request("PUT", "/customer-data/invoice.pdf", "PutObject", 200, 0.05, 2048, "10.0.0.1") - record_request("GET", "/archives/log.gz", "GetObject", 500, 0.1, 0, "10.0.0.2") +async def test_collect_all_builds_expected_sections(admin_settings, mem_store) -> None: + await record_request( + "PUT", "/customer-data/invoice.pdf", "PutObject", 200, 0.05, 2048, "10.0.0.1" + ) + await record_request("GET", "/archives/log.gz", "GetObject", 500, 0.1, 0, "10.0.0.2") start = time.monotonic() - 120 # 2 minutes - data = collectors.collect_all(admin_settings, start_time=start, version="9.9.9") + data = await collectors.collect_all( + mem_store, admin_settings, start_time=start, version="9.9.9" + ) assert data["header"]["title"] == "S3 Encryption Proxy" assert data["header"]["status"] == "Running" + assert data["header"]["cluster_wide"] is False assert "m" in data["header"]["uptime"] assert set(data["cards"].keys()) == {"requests", "data_encrypted", "errors", "active_buckets"} @@ -79,13 +104,204 @@ def test_collect_all_builds_expected_sections(admin_settings) -> None: assert data["footer"]["version"] == "9.9.9" +async def test_activity_timestamp_is_absolute(admin_settings, mem_store) -> None: + await record_request("GET", "/b/k", "GetObject", 200, 0.01, 1, "10.0.0.1") + data = await collectors.collect_all(mem_store, admin_settings, start_time=time.monotonic()) + row = data["activity"][0] + # Absolute "YYYY-MM-DD HH:MM:SS" primary display; relative kept as a tooltip. + assert row["time"][:2] == "20" and row["time"][4] == "-" and ":" in row["time"] + assert row["time_relative"].endswith("ago") + + +async def test_collector_does_not_crash_on_empty_metrics(admin_settings, mem_store) -> None: + """collect_all must work even before any request has been recorded.""" + data = await collectors.collect_all(mem_store, admin_settings, start_time=time.monotonic()) + expected = f"{int(collectors._read_labeled_counter_sum(metrics.REQUEST_COUNT)):,}" + assert data["cards"]["requests"]["value"] == expected + assert data["activity"] == [] + assert data["buckets"] == [] + + +# --------------------------------------------------------------------------- +# Redis-backed store (cluster-wide path) on fakeredis +# --------------------------------------------------------------------------- + + +async def test_redis_store_records_and_paginates(redis_store) -> None: + for i in range(120): + await redis_store.record( + RequestSample( + timestamp=time.time(), + method="GET", + operation="GetObject", + bucket="b", + key=f"k{i}", + status=200, + duration_ms=1.0, + size=10, + client_ip="10.0.0.1", + ) + ) + page1 = await redis_store.page(0, 50, "", "", "") + assert page1["count"] == 50 + assert page1["total"] == 120 + assert page1["has_more"] is True + # newest first + assert page1["entries"][0]["key"] == "k119" + + page3 = await redis_store.page(100, 50, "", "", "") + assert page3["count"] == 20 + assert page3["has_more"] is False + + +async def test_redis_store_caps_the_log(admin_settings, mock_redis) -> None: + admin_settings.request_log_cap = 5 + store = RedisStatsStore(mock_redis, admin_settings) + for i in range(20): + await store.record( + RequestSample(time.time(), "GET", "GetObject", "b", f"k{i}", 200, 1.0, 1, "ip") + ) + total = await mock_redis.llen("s3proxy:stats:reqlog") + assert total == 5 + ttl = await mock_redis.ttl("s3proxy:stats:reqlog") + assert 0 < ttl <= admin_settings.request_log_ttl_seconds + + +async def test_redis_store_filter_pagination(redis_store) -> None: + await redis_store.record( + RequestSample(time.time(), "PUT", "PutObject", "b", "a", 200, 1, 1, "ip") + ) + await redis_store.record( + RequestSample(time.time(), "GET", "GetObject", "b", "x", 500, 1, 1, "ip") + ) + await redis_store.record( + RequestSample(time.time(), "GET", "GetObject", "b", "y", 200, 1, 1, "ip") + ) + errors = await redis_store.page(0, 50, "", "", "error") + assert errors["total"] == 1 + assert errors["entries"][0]["key"] == "x" + puts = await redis_store.page(0, 50, "", "PUT", "") + assert puts["total"] == 1 + assert puts["entries"][0]["method"] == "PUT" + + +def _sample(method="GET", status=200, size=0, dur_ms=10.0): + return RequestSample( + timestamp=time.time(), + method=method, + operation=method, + bucket="b", + key="k", + status=status, + duration_ms=dur_ms, + size=size, + client_ip="ip", + ) + + +async def test_redis_store_cluster_wide_aggregate(admin_settings, mock_redis) -> None: + """Counters are written per-request by record() so they sum across pods. + + Two store instances on the same Redis simulate two replicas — the aggregate + must reflect both, regardless of which pod serves the dashboard. + """ + pod_a = RedisStatsStore(mock_redis, admin_settings) + pod_b = RedisStatsStore(mock_redis, admin_settings) + + # pod A handles 7 GET + 3 PUT (one PUT errors 500), pod B handles 5 GET (one 404) + for _ in range(7): + await pod_a.record(_sample("GET", 200, size=100)) + for i in range(3): + await pod_a.record(_sample("PUT", 500 if i == 0 else 200, size=1000)) + for i in range(5): + await pod_b.record(_sample("GET", 404 if i == 0 else 200, size=50)) + + agg = await pod_a.aggregate() # any pod sees the cluster-wide totals + assert agg is not None + assert agg.requests == 15 # 10 (A) + 5 (B) + assert agg.errors == 2 # one 500 + one 404 + assert agg.methods["GET"] == 12 + assert agg.methods["PUT"] == 3 + assert agg.errors_by_class["5xx"] == 1 + assert agg.errors_by_class["4xx"] == 1 + # PUT bytes counted as encrypted, GET bytes as decrypted + assert agg.bytes_encrypted == 3000 # 3 PUT * 1000 + assert agg.bytes_decrypted == 7 * 100 + 5 * 50 + + +async def test_redis_store_latency_is_cluster_wide(admin_settings, mock_redis) -> None: + pod_a = RedisStatsStore(mock_redis, admin_settings) + pod_b = RedisStatsStore(mock_redis, admin_settings) + for _ in range(10): + await pod_a.record(_sample(dur_ms=5.0)) # 0.005s bucket + for _ in range(10): + await pod_b.record(_sample(dur_ms=2000.0)) # 2.5s bucket + agg = await pod_a.aggregate() + assert agg.latency_buckets["+Inf"] == 20 # total observations, both pods + + +# --------------------------------------------------------------------------- +# Time-series bucketing +# --------------------------------------------------------------------------- + + +def test_bucket_series_zero_fills_and_buckets() -> None: + now = 10_000.0 + # two events in the same 120s bucket, one in another + pts = [(9800, 3.0), (9820, 2.0), (9600, 1.0)] + times, values = bucket_series(pts, window_seconds=600, bucket_seconds=120, now=now) + assert len(times) == len(values) == 5 + assert sum(values) == 6.0 # nothing dropped inside the window + # continuous axis (monotonic, fixed step) + assert times == sorted(times) + assert times[1] - times[0] == 120 + + +def test_bucket_series_drops_out_of_window() -> None: + now = 10_000.0 + pts = [(1000, 99.0)] # far outside a 600s window + _, values = bucket_series(pts, 600, 120, now=now) + assert sum(values) == 0.0 + + +async def test_redis_series_buckets_recorded_requests(redis_store) -> None: + for _ in range(3): + await redis_store.record( + RequestSample(time.time(), "GET", "GetObject", "b", "k", 200, 1, 1, "ip") + ) + times, values = await redis_store.series("requests", "1h") + assert sum(values) == 3.0 + assert len(times) == len(values) + + +async def test_redis_throughput_split_by_direction(redis_store) -> None: + # PUT bytes -> bytes_put series; GET bytes -> bytes_get series. + await redis_store.record( + RequestSample(time.time(), "PUT", "PutObject", "b", "k", 200, 1, 4096, "ip") + ) + await redis_store.record( + RequestSample(time.time(), "GET", "GetObject", "b", "k", 200, 1, 1024, "ip") + ) + _, put_vals = await redis_store.series("bytes_put", "1h") + _, get_vals = await redis_store.series("bytes_get", "1h") + assert sum(put_vals) == 4096.0 + assert sum(get_vals) == 1024.0 + + +# --------------------------------------------------------------------------- +# Templates + routes +# --------------------------------------------------------------------------- + + def test_render_dashboard_injects_api_url() -> None: html = render_dashboard(admin_path="/ops") assert '"/ops/api/status"' in html + assert '"/ops/api/series"' in html assert "__API_URL__" not in html + assert "__SERIES_URL__" not in html -def _make_app(settings: Settings): +def _make_app(settings: Settings, store=None): from fastapi import FastAPI app = FastAPI() @@ -93,6 +309,7 @@ def _make_app(settings: Settings): app.include_router(router, prefix=settings.admin_path) app.state.settings = settings app.state.start_time = time.monotonic() + app.state.stats_store = store or MemoryStatsStore(settings) return app @@ -161,6 +378,37 @@ def test_status_api_returns_expected_shape(admin_settings) -> None: assert "breakdown" in payload["cards"][key] +def test_series_api_returns_shape(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/api/series?metric=requests&range=3h", auth=("admin", "secret")) + assert r.status_code == 200 + payload = r.json() + assert payload["metric"] == "requests" + assert payload["range"] == "3h" + assert "spark" in payload and "spark_times" in payload + + +def test_throughput_api_returns_two_series(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/api/throughput?range=24h", auth=("admin", "secret")) + assert r.status_code == 200 + payload = r.json() + assert payload["range"] == "24h" + labels = [s["label"] for s in payload["series"]] + assert labels == ["Encrypted (PUT)", "Decrypted (GET)"] + for s in payload["series"]: + assert "spark" in s and "spark_times" in s + + +def test_logs_api_paginates(admin_settings) -> None: + client = TestClient(_make_app(admin_settings)) + r = client.get("/admin/api/logs?limit=10&offset=0", auth=("admin", "secret")) + assert r.status_code == 200 + payload = r.json() + for key in ("entries", "count", "offset", "limit", "total", "has_more", "operations"): + assert key in payload + + def test_status_api_401_without_auth(admin_settings) -> None: client = TestClient(_make_app(admin_settings)) r = client.get("/admin/api/status") @@ -191,13 +439,3 @@ def test_auth_raises_when_credentials_blank() -> None: ) with pytest.raises(RuntimeError): create_auth_dependency(settings, {}) - - -def test_collector_does_not_crash_on_empty_metrics(admin_settings) -> None: - """collect_all must work even before any request has been recorded.""" - # Ensure we don't blow up on cold start - data = collectors.collect_all(admin_settings, start_time=time.monotonic(), version="x") - expected = f"{int(collectors._read_labeled_counter_sum(metrics.REQUEST_COUNT)):,}" - assert data["cards"]["requests"]["value"] == expected - assert data["activity"] == [] - assert data["buckets"] == [] diff --git a/tests/unit/test_admin_encryption.py b/tests/unit/test_admin_encryption.py new file mode 100644 index 0000000..6877c5c --- /dev/null +++ b/tests/unit/test_admin_encryption.py @@ -0,0 +1,103 @@ +"""Admin encryption-status detection (issue #47 #6). + +Multipart objects store the wrapped DEK in a sidecar object, not an on-object +``isec`` tag — the create-time metadata does not survive CompleteMultipartUpload. +The admin dashboard must consult the sidecar before reporting "not encrypted". +""" + +from __future__ import annotations + +import s3proxy.client as client_mod +from s3proxy.admin.collectors import _has_multipart_sidecar, list_bucket_objects +from s3proxy.state.metadata import save_multipart_metadata +from s3proxy.state.models import MultipartMetadata + + +async def test_sidecar_present_means_encrypted(mock_s3) -> None: + bucket, key = "scylla-backups", "backup/sst/me-big-Data.db" + await save_multipart_metadata( + mock_s3, + bucket, + key, + MultipartMetadata( + version=2, + part_count=7, + total_plaintext_size=1234, + parts=[], + wrapped_dek=b"wrapped-dek-bytes", + kid="AKIA-TEST", + ), + ) + assert await _has_multipart_sidecar(mock_s3, bucket, key) is True + + +async def test_no_sidecar_means_not_detected(mock_s3) -> None: + # A plain object with no sidecar — single-PUT objects carry their DEK as an + # on-object tag instead, which head_object_detail checks separately. + await mock_s3.put_object(bucket="b", key="plain.txt", body=b"data") + assert await _has_multipart_sidecar(mock_s3, "b", "plain.txt") is False + + +async def test_sidecar_lookup_swallows_errors(mock_s3) -> None: + # Missing object -> load raises internally -> detection returns False, no raise. + assert await _has_multipart_sidecar(mock_s3, "b", "does-not-exist") is False + + +async def test_listing_annotates_per_object_encryption(mock_s3, settings) -> None: + """list-style annotation: on-object tag, multipart sidecar, and plaintext.""" + from s3proxy.admin.collectors import _annotate_encryption + + # 1) single-PUT encrypted: on-object isec tag + await mock_s3.put_object( + bucket="b", key="single.bin", body=b"x", metadata={settings.dektag_name: "wrapped"} + ) + # 2) multipart encrypted: sidecar, no on-object tag + await mock_s3.put_object(bucket="b", key="multi.bin", body=b"y") + await save_multipart_metadata( + mock_s3, + "b", + "multi.bin", + MultipartMetadata(version=2, part_count=3, wrapped_dek=b"dek", kid="k"), + ) + # 3) plaintext: no tag, no sidecar + await mock_s3.put_object(bucket="b", key="plain.txt", body=b"z") + + objects = [{"key": "single.bin"}, {"key": "multi.bin"}, {"key": "plain.txt"}] + await _annotate_encryption(settings, mock_s3, "b", objects) + + by_key = {o["key"]: o["encrypted"] for o in objects} + assert by_key["single.bin"] is True + assert by_key["multi.bin"] is True + assert by_key["plain.txt"] is False + + +async def test_listing_paginates_objects(mock_s3, settings, monkeypatch) -> None: + """The explorer paginates objects (page_size) and HEAD-annotates only the page.""" + + class _CtxClient: + async def __aenter__(self): + return mock_s3 + + async def __aexit__(self, *a): + return False + + monkeypatch.setattr(client_mod, "S3Client", lambda *a, **k: _CtxClient()) + + for i in range(25): + await mock_s3.put_object( + bucket="bkt", + key=f"d/obj{i:02d}", + body=b"x", + metadata={settings.dektag_name: "w"}, + ) + + creds = {"AKIAIOSFODNN7EXAMPLE": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"} + page1 = await list_bucket_objects(settings, creds, "bkt", prefix="d/", offset=0, page_size=20) + assert page1["total_objects"] == 25 + assert len(page1["objects"]) == 20 + assert page1["has_more"] is True + assert all(o["encrypted"] is True for o in page1["objects"]) + + page2 = await list_bucket_objects(settings, creds, "bkt", prefix="d/", offset=20, page_size=20) + assert len(page2["objects"]) == 5 + assert page2["has_more"] is False diff --git a/tests/unit/test_admin_path_filter.py b/tests/unit/test_admin_path_filter.py new file mode 100644 index 0000000..1b74be4 --- /dev/null +++ b/tests/unit/test_admin_path_filter.py @@ -0,0 +1,58 @@ +"""The S3 catch-all must not record admin-dashboard requests as proxied traffic. + +A bare ``/admin`` (no trailing slash) misses the mounted admin router and falls +through to the S3 catch-all, where it would be logged as a phantom "admin" +bucket. _is_admin_path filters those out. +""" + +from __future__ import annotations + +from types import SimpleNamespace + +from s3proxy.config import Settings +from s3proxy.request_handler import _is_admin_path + + +def _req(settings: Settings): + return SimpleNamespace(app=SimpleNamespace(state=SimpleNamespace(settings=settings))) + + +def _settings(**kw) -> Settings: + base = { + "host": "http://localhost:9000", + "admin_ui": True, + "admin_username": "admin", + "admin_password": "admin", + "admin_secret": "s", + "credentials": [{"access_key": "AK", "secret_key": "s", "kek": "k"}], + } + base.update(kw) + return Settings(**base) + + +def test_bare_admin_prefix_is_filtered() -> None: + r = _req(_settings()) + assert _is_admin_path(r, "/admin") is True + assert _is_admin_path(r, "/admin/") is True + assert _is_admin_path(r, "/admin/api/status") is True + + +def test_real_buckets_are_not_filtered() -> None: + r = _req(_settings()) + assert _is_admin_path(r, "/scylla-backups/obj.bin") is False + assert _is_admin_path(r, "/my-bucket") is False + # A bucket literally named "admin-data" must not be swallowed by prefix match. + assert _is_admin_path(r, "/admin-data/key") is False + + +def test_custom_admin_path() -> None: + r = _req(_settings(admin_path="/ops/dash")) + assert _is_admin_path(r, "/ops/dash") is True + assert _is_admin_path(r, "/ops/dash/login") is True + assert _is_admin_path(r, "/admin") is False # not the configured prefix + + +def test_not_filtered_when_admin_ui_disabled() -> None: + # admin_secret not required when admin_ui is off + r = _req(_settings(admin_ui=False, admin_secret="")) + assert _is_admin_path(r, "/admin") is False diff --git a/tests/unit/test_chart_frontproxy.py b/tests/unit/test_chart_frontproxy.py index 98f346a..6c965ae 100644 --- a/tests/unit/test_chart_frontproxy.py +++ b/tests/unit/test_chart_frontproxy.py @@ -118,6 +118,28 @@ def test_frontproxy_resources_rendered(chart_dir): assert by_kind_name(docs, kind, name), f"missing {kind}/{name}" +def test_frontproxy_binds_unprivileged_port(chart_dir): + """The HAProxy container runs as non-root with all caps dropped, so it must + bind a port > 1024 — binding 80 directly fails with EACCES (CrashLoopBackOff). + The Service still exposes port 80, targeting the named container port.""" + docs = render(chart_dir, "frontproxy.enabled=true") + + cm = by_kind_name(docs, "ConfigMap", "s3proxy-python-frontproxy") + cfg = cm["data"]["haproxy.cfg"] + bind_line = next(line for line in cfg.splitlines() if "bind" in line) + bind_port = int(bind_line.rsplit(":", 1)[1].strip()) + assert bind_port > 1024, f"frontproxy binds privileged port {bind_port}" + + dep = by_kind_name(docs, "Deployment", "s3proxy-python-frontproxy") + container = dep["spec"]["template"]["spec"]["containers"][0] + assert container["securityContext"]["runAsNonRoot"] is True + assert container["ports"][0]["containerPort"] == bind_port + + svc = by_kind_name(docs, "Service", "s3proxy-python-frontproxy") + assert svc["spec"]["ports"][0]["port"] == 80 + assert svc["spec"]["ports"][0]["targetPort"] == "http" + + def test_headless_service_is_headless(chart_dir): docs = render(chart_dir, "frontproxy.enabled=true") hs = by_kind_name(docs, "Service", "s3proxy-python-headless")