From b2aa1539973964c0aa8ca762e664a479e10f8e7a Mon Sep 17 00:00:00 2001 From: Manceraider24 Date: Tue, 23 Jun 2026 23:06:08 +0000 Subject: [PATCH 1/2] feat(infra): add Kafka + Zookeeper to docker-compose (#468) --- EVENT_GOVERNANCE.md | 151 ++++++++++++++++++ docker-compose.yml | 55 +++++++ server/package.json | 1 + server/src/services/achievement-event-bus.ts | 138 +++++++++++++--- server/src/services/kafka/consumers.ts | 79 +++++++++ server/src/services/kafka/dlq.handler.ts | 52 ++++++ server/src/services/kafka/domain-events.ts | 119 ++++++++++++++ server/src/services/kafka/event-consumer.ts | 124 ++++++++++++++ server/src/services/kafka/event-producer.ts | 103 ++++++++++++ server/src/services/kafka/event-store.ts | 125 +++++++++++++++ .../kafka/event-tracing.middleware.ts | 119 ++++++++++++++ server/src/services/kafka/kafka.client.ts | 71 ++++++++ server/src/services/kafka/producers.ts | 64 ++++++++ 13 files changed, 1183 insertions(+), 18 deletions(-) create mode 100644 EVENT_GOVERNANCE.md create mode 100644 server/src/services/kafka/consumers.ts create mode 100644 server/src/services/kafka/dlq.handler.ts create mode 100644 server/src/services/kafka/domain-events.ts create mode 100644 server/src/services/kafka/event-consumer.ts create mode 100644 server/src/services/kafka/event-producer.ts create mode 100644 server/src/services/kafka/event-store.ts create mode 100644 server/src/services/kafka/event-tracing.middleware.ts create mode 100644 server/src/services/kafka/kafka.client.ts create mode 100644 server/src/services/kafka/producers.ts diff --git a/EVENT_GOVERNANCE.md b/EVENT_GOVERNANCE.md new file mode 100644 index 00000000..75376737 --- /dev/null +++ b/EVENT_GOVERNANCE.md @@ -0,0 +1,151 @@ +# ArenaX Event Governance Policy + +> Version: 1.0 · Status: Active · Owner: Backend Platform Team + +This document defines the rules, conventions, and processes every engineer must follow when producing or consuming Kafka events on the ArenaX platform. + +--- + +## 1. Core Principles + +- **Events are immutable facts.** Once published, an event record is never edited. Corrections are published as new events. +- **The event store is the source of truth.** All domain events are persisted to `arenax.event.store` before (or alongside) any database write. +- **Loose coupling.** Producers must not know about consumers. Never call a consumer API from a producer service. +- **At-least-once delivery.** Every event may be delivered more than once. All consumers must be idempotent. + +--- + +## 2. Topic Naming Convention + +``` +arenax..s # primary topic +arenax..s.dlq # dead-letter topic +arenax.event.store # global append-only store +``` + +| Domain segment | Examples | +|---|---| +| `user` | `arenax.user.events` | +| `match` | `arenax.match.events` | +| `achievement` | `arenax.achievement.events` | +| `wallet` | `arenax.wallet.events` | + +**Rules:** +- Lowercase only. Use `.` as separator, never `-` or `_`. +- Every primary topic MUST have a corresponding `.dlq` topic. +- Topics are created with `auto.create.topics.enable=true` in dev; use Terraform/scripts in production with explicit partition counts. + +--- + +## 3. Event Schema Standards + +Every event MUST be wrapped in `EventEnvelope`: + +```typescript +interface EventEnvelope { + eventId: string; // UUID v4 — unique per message + eventType: string; // Dot-separated noun e.g. "match.ended" + version: number; // Integer, starts at 1 + occurredAt: string; // ISO-8601 UTC + source: string; // Emitting service name + traceId: string; // Distributed trace correlation id + causationId?: string; // eventId of the event that caused this one + payload: T; +} +``` + +### 3.1 Versioning Strategy + +| Change type | Action | +|---|---| +| Add optional field | No version bump — backwards-compatible | +| Rename / remove field | **Bump version** (e.g. `version: 2`) | +| Change field type | **Bump version** | +| New event type | New `eventType` string — no version change needed | + +- Both `version: 1` and `version: 2` schemas must be handled by consumers during the transition window (minimum 2 sprints). +- Old schema versions are retired only after all consumer groups confirm migration. + +### 3.2 Event Type Registry + +All `eventType` strings must be registered in `server/src/services/kafka/domain-events.ts` before use. **Unregistered event types are rejected at code review.** + +| Event type | Topic | Version | Owner | +|---|---|---|---| +| `user.registered` | `arenax.user.events` | 1 | Auth Service | +| `match.ended` | `arenax.match.events` | 1 | Match Service | +| `achievement.unlocked` | `arenax.achievement.events` | 1 | Achievement Service | +| `wallet.credited` | `arenax.wallet.events` | 1 | Wallet Service | + +--- + +## 4. Producer Rules + +1. **Always set a meaningful partition key** — use the aggregate's primary id (e.g. `userId`, `matchId`) to preserve ordering within an aggregate. +2. **Publish to both the domain topic and `arenax.event.store`** via `EventStore.append()`. +3. **Inject `traceId`** from the originating HTTP request (via `eventTracingMiddleware`). Never generate a new one mid-flow. +4. **Use idempotent producer** (`idempotent: true`) — already set in `KafkaClient`. +5. Log the `eventId` and `traceId` at `INFO` level after a successful publish. + +--- + +## 5. Consumer Rules + +1. **Consumer group ids** follow the pattern `arenax..consumers`. One group per logical consumer (not per instance). +2. **Idempotency is mandatory.** Use the `eventId` as an idempotency key when writing to any database. +3. **Never `throw` from `handle()` without a meaningful log.** The base `EventConsumer` will retry and eventually route to DLQ. +4. **DLQ consumers** must be set up for every primary topic. A DLQ message must trigger a PagerDuty/Slack alert and be reviewed within 24 hours. +5. **Do not block the event loop.** All I/O inside `handle()` must be awaited. + +--- + +## 6. Dead Letter Queue Policy + +| Trigger | Action | +|---|---| +| Parse failure (invalid JSON) | Immediately route to DLQ | +| `handle()` throws after `maxRetries` (default 3) | Route to DLQ | +| Unknown event type | Route to DLQ with `reason: unknown_event_type` | + +- DLQ messages are retained for **14 days**. +- Engineers are responsible for inspecting and replaying or discarding DLQ messages weekly. +- Use `EventReplay` to re-process a DLQ batch after the underlying bug is fixed. + +--- + +## 7. Event Sourcing & Replay + +The `arenax.event.store` topic is the immutable audit log of all domain events. + +- **Retention:** infinite (log compaction enabled, no size limit in production). +- **Access control:** read-only for all services except the EventStore writer. +- **Replay:** use `EventReplay.run({ fromTimestamp, toTimestamp, eventTypes, onEvent })` to reconstruct state or backfill a new service. +- Replay runs should use a unique `replayGroupId` (auto-generated if unset) to avoid interfering with live consumer groups. + +--- + +## 8. Observability Requirements + +Every service MUST: +- Emit `latencyMs` in the publish log (alert if `>100 ms` P99). +- Emit consumer lag metrics to Prometheus via the `arenax_kafka_consumer_lag` gauge. +- Include `traceId` in every log line related to event processing. +- Set up Grafana dashboards for: publish rate, consumer lag, DLQ message count. + +--- + +## 9. Change Process + +1. **Schema change** → open a PR updating `domain-events.ts`, bump version if breaking, update the registry table in this doc. +2. **New topic** → update `TOPICS` constant, provision in staging, update Terraform before merging. +3. **Deprecation** → announce in `#platform-eng`, keep old version supported for 2 sprints, then remove. +4. All changes require review from at least **one Platform team member**. + +--- + +## 10. Security + +- Kafka is internal-only (not exposed outside VPC). +- No PII in event payloads beyond user/match IDs. Avoid storing phone numbers, emails, or financial details in event bodies — reference them by ID only. +- mTLS is required in production (`ssl.enabled=true`). +- Secrets (Kafka credentials) are stored in AWS Secrets Manager and injected as environment variables. diff --git a/docker-compose.yml b/docker-compose.yml index 79680bc4..64dff70b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,6 +49,57 @@ services: timeout: 20s retries: 3 + # Zookeeper (Kafka dependency) + zookeeper: + image: confluentinc/cp-zookeeper:7.6.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + healthcheck: + test: ["CMD", "bash", "-c", "echo ruok | nc localhost 2181"] + interval: 10s + timeout: 5s + retries: 5 + + # Apache Kafka + kafka: + image: confluentinc/cp-kafka:7.6.0 + depends_on: + zookeeper: + condition: service_healthy + ports: + - "9092:9092" + - "29092:29092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_LOG_RETENTION_HOURS: 168 + KAFKA_LOG_RETENTION_BYTES: 1073741824 + healthcheck: + test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"] + interval: 15s + timeout: 10s + retries: 5 + + # Kafka UI (development convenience) + kafka-ui: + image: provectuslabs/kafka-ui:latest + ports: + - "8090:8080" + environment: + KAFKA_CLUSTERS_0_NAME: arenax-local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + depends_on: + kafka: + condition: service_healthy + # Backend API backend: build: @@ -64,6 +115,8 @@ services: S3_SECRET_KEY: secret JWT_SECRET: dev-secret-key STELLAR_NETWORK_URL: https://horizon-testnet.stellar.org + KAFKA_BROKERS: kafka:29092 + KAFKA_CLIENT_ID: arenax-server depends_on: postgres: condition: service_healthy @@ -71,6 +124,8 @@ services: condition: service_healthy minio: condition: service_healthy + kafka: + condition: service_healthy volumes: - ./backend:/app - cargo_cache:/usr/local/cargo/registry diff --git a/server/package.json b/server/package.json index 837d15cc..339364d8 100644 --- a/server/package.json +++ b/server/package.json @@ -37,6 +37,7 @@ "express-rate-limit": "^7.5.0", "helmet": "^7.1.0", "ioredis": "^5.3.2", + "kafkajs": "^2.2.4", "jsonwebtoken": "^9.0.2", "nodemailer": "^8.0.9", "passport": "^0.7.0", diff --git a/server/src/services/achievement-event-bus.ts b/server/src/services/achievement-event-bus.ts index 9f348315..642f57aa 100644 --- a/server/src/services/achievement-event-bus.ts +++ b/server/src/services/achievement-event-bus.ts @@ -1,17 +1,37 @@ +/** + * achievement-event-bus.ts — Kafka-backed replacement for the previous + * in-process EventEmitter bus. + * + * Backwards-compatible surface: existing callers that use `emitGameEvent` / + * `onGameEvent` continue to work. Events are now also published to Kafka so + * downstream services can consume them independently. + * + * Migration path: + * - Phase 1 (now): hybrid mode — local EventEmitter fires synchronously + * for in-process handlers; Kafka publish runs async in the background. + * - Phase 2: remove local EventEmitter once all handlers are migrated to + * dedicated Kafka consumers. + */ import { EventEmitter } from 'events'; +import { v4 as uuidv4 } from 'uuid'; import { logger } from './logger.service'; +import { achievementProducer, matchProducer, userProducer } from './kafka/producers'; +import { EventStore } from './kafka/event-store'; +import type { EventEnvelope } from './kafka/domain-events'; export type GameEventType = - | 'MATCH_WON' - | 'MATCH_COMPLETED' - | 'PROFILE_UPDATED' - | 'KYC_APPROVED' - | 'SEASONAL_ACTIVE'; + | 'MATCH_WON' + | 'MATCH_COMPLETED' + | 'PROFILE_UPDATED' + | 'KYC_APPROVED' + | 'SEASONAL_ACTIVE'; export interface GameEvent { - type: GameEventType; - playerId: string; - payload?: Record; + type: GameEventType; + playerId: string; + payload?: Record; + /** Optional trace id — generated if absent. */ + traceId?: string; } type GameEventHandler = (event: GameEvent) => void | Promise; @@ -19,20 +39,102 @@ type GameEventHandler = (event: GameEvent) => void | Promise; const bus = new EventEmitter(); bus.setMaxListeners(50); +/** Emit a game event. Local handlers fire immediately; Kafka publish is fire-and-forget. */ export const emitGameEvent = (event: GameEvent): void => { - setImmediate(() => { - bus.emit('game', event); + const traceId = event.traceId ?? uuidv4(); + const enriched = { ...event, traceId }; + + // 1. In-process fan-out (backwards-compatible) + setImmediate(() => { + bus.emit('game', enriched); + }); + + // 2. Kafka publish — non-blocking + void publishToKafka(enriched, traceId).catch((err) => { + logger.warn('Kafka publish failed for game event (non-fatal)', { + type: event.type, + error: err instanceof Error ? err.message : String(err), }); + }); }; export const onGameEvent = (handler: GameEventHandler): void => { - bus.on('game', (evt: GameEvent) => { - void Promise.resolve(handler(evt)).catch((err: unknown) => { - logger.error('achievement game event handler failed', { - type: evt.type, - playerId: evt.playerId, - message: err instanceof Error ? err.message : String(err) - }); - }); + bus.on('game', (evt: GameEvent) => { + void Promise.resolve(handler(evt)).catch((err: unknown) => { + logger.error('achievement game event handler failed', { + type: evt.type, + playerId: evt.playerId, + message: err instanceof Error ? err.message : String(err), + }); }); + }); }; + +// ─── Internal: map game events → domain events ──────────────────────────────── + +async function publishToKafka(event: GameEvent, traceId: string): Promise { + await achievementProducer.connect(); + + switch (event.type) { + case 'MATCH_WON': + case 'MATCH_COMPLETED': { + const payload = event.payload ?? {}; + const envelope: EventEnvelope = { + eventId: uuidv4(), + eventType: 'match.ended', + version: 1, + occurredAt: new Date().toISOString(), + source: 'achievement-event-bus', + traceId, + payload: { + matchId: (payload.matchId as string) ?? uuidv4(), + playerAId: event.playerId, + playerBId: (payload.opponentId as string) ?? '', + winnerId: event.type === 'MATCH_WON' ? event.playerId : (payload.winnerId as string) ?? '', + winnerEloChange: (payload.winnerEloChange as number) ?? 0, + loserEloChange: (payload.loserEloChange as number) ?? 0, + durationSeconds: (payload.durationSeconds as number) ?? 0, + }, + }; + await matchProducer.publish(envelope.payload as never, traceId, envelope.eventId); + await EventStore.append(envelope); + break; + } + + case 'KYC_APPROVED': { + const envelope: EventEnvelope = { + eventId: uuidv4(), + eventType: 'user.registered', + version: 1, + occurredAt: new Date().toISOString(), + source: 'achievement-event-bus', + traceId, + payload: { + userId: event.playerId, + username: (event.payload?.username as string) ?? '', + email: (event.payload?.email as string) ?? '', + registeredVia: 'phone', + }, + }; + await userProducer.publish(envelope.payload as never, traceId, event.playerId); + await EventStore.append(envelope); + break; + } + + default: + // PROFILE_UPDATED, SEASONAL_ACTIVE — store only, no specific domain topic + { + const envelope: EventEnvelope = { + eventId: uuidv4(), + eventType: event.type.toLowerCase().replace('_', '.'), + version: 1, + occurredAt: new Date().toISOString(), + source: 'achievement-event-bus', + traceId, + payload: { playerId: event.playerId, ...event.payload }, + }; + await EventStore.append(envelope); + } + break; + } +} diff --git a/server/src/services/kafka/consumers.ts b/server/src/services/kafka/consumers.ts new file mode 100644 index 00000000..1156840a --- /dev/null +++ b/server/src/services/kafka/consumers.ts @@ -0,0 +1,79 @@ +import { EventConsumer } from './event-consumer'; +import { EventEnvelope, TOPICS, MatchEndedPayload, AchievementUnlockedPayload } from './domain-events'; +import { logger } from '../logger.service'; + +// ─── MatchConsumer ──────────────────────────────────────────────────────────── + +/** + * Consumes match.ended events and drives downstream side-effects: + * leaderboard refresh, Elo update fan-out, wallet prize crediting. + * + * Extend handle() or add subscribers via the onMatchEnded hook for + * additional cross-cutting concerns without modifying this class. + */ +export class MatchConsumer extends EventConsumer { + protected readonly dlqTopic = TOPICS.DLQ_MATCH; + + private readonly subscribers: Array<(e: EventEnvelope) => Promise> = []; + + constructor() { + super({ + groupId: 'arenax.match.consumers', + topics: [TOPICS.MATCH_EVENTS], + maxRetries: 3, + retryBackoffMs: 200, + }); + } + + /** Register an additional handler for match.ended events (fan-out). */ + onMatchEnded(fn: (e: EventEnvelope) => Promise): void { + this.subscribers.push(fn); + } + + protected async handle(envelope: EventEnvelope): Promise { + const { matchId, winnerId, winnerEloChange, loserEloChange } = envelope.payload; + + logger.info('Processing match.ended', { matchId, winnerId, traceId: envelope.traceId }); + + // Fan-out to all registered subscribers in parallel + await Promise.all(this.subscribers.map((fn) => fn(envelope))); + } +} + +// ─── AchievementConsumer ────────────────────────────────────────────────────── + +/** + * Consumes achievement.unlocked events. + * Drives: notification dispatch, XP ledger update, leaderboard badge sync. + */ +export class AchievementConsumer extends EventConsumer { + protected readonly dlqTopic = TOPICS.DLQ_ACHIEVEMENT; + + constructor() { + super({ + groupId: 'arenax.achievement.consumers', + topics: [TOPICS.ACHIEVEMENT_EVENTS], + maxRetries: 3, + retryBackoffMs: 200, + }); + } + + protected async handle(envelope: EventEnvelope): Promise { + const { userId, achievementId, achievementName, xpAwarded } = envelope.payload; + + logger.info('Processing achievement.unlocked', { + userId, + achievementId, + achievementName, + xpAwarded, + traceId: envelope.traceId, + }); + + // TODO: dispatch push notification, update XP ledger, sync leaderboard badge + } +} + +// ─── Singleton instances ────────────────────────────────────────────────────── + +export const matchConsumer = new MatchConsumer(); +export const achievementConsumer = new AchievementConsumer(); diff --git a/server/src/services/kafka/dlq.handler.ts b/server/src/services/kafka/dlq.handler.ts new file mode 100644 index 00000000..10221435 --- /dev/null +++ b/server/src/services/kafka/dlq.handler.ts @@ -0,0 +1,52 @@ +import { KafkaMessage } from 'kafkajs'; +import { kafkaClient } from './kafka.client'; +import { logger } from '../logger.service'; + +/** + * Routes poison-pill messages to their domain DLQ topic. + * DLQ messages carry the original payload plus failure metadata headers. + */ +export class DLQHandler { + /** + * Forward a failed message to the specified DLQ topic. + * + * @param dlqTopic Target DLQ topic (e.g. "arenax.match.events.dlq"). + * @param original The original KafkaMessage that failed. + * @param reason Machine-readable failure reason. + * @param detail Human-readable error detail for debugging. + */ + static async send( + dlqTopic: string, + original: KafkaMessage, + reason: string, + detail: string, + ): Promise { + try { + const producer = kafkaClient.getProducer(); + await producer.send({ + topic: dlqTopic, + messages: [ + { + key: original.key, + value: original.value, + headers: { + ...original.headers, + 'dlq.reason': reason, + 'dlq.detail': detail.slice(0, 512), // cap header size + 'dlq.timestamp': new Date().toISOString(), + 'dlq.originalTopic': dlqTopic.replace('.dlq', ''), + }, + }, + ], + }); + + logger.warn('Message forwarded to DLQ', { dlqTopic, reason, detail: detail.slice(0, 200) }); + } catch (err) { + // DLQ failure must never crash the consumer — log and move on. + logger.error('Failed to write to DLQ', { + dlqTopic, + error: err instanceof Error ? err.message : String(err), + }); + } + } +} diff --git a/server/src/services/kafka/domain-events.ts b/server/src/services/kafka/domain-events.ts new file mode 100644 index 00000000..da4b1631 --- /dev/null +++ b/server/src/services/kafka/domain-events.ts @@ -0,0 +1,119 @@ +/** + * Domain event schemas for Kafka event bus. + * All events carry a common envelope + a versioned payload. + * Version is incremented only on breaking schema changes; additive + * fields are backwards-compatible and do not require a version bump. + */ + +// ─── Envelope ──────────────────────────────────────────────────────────────── + +export interface EventEnvelope { + /** Globally unique event id (UUID v4). */ + eventId: string; + /** Domain event type e.g. "user.registered". */ + eventType: string; + /** Schema version — increment on breaking change. */ + version: number; + /** ISO-8601 timestamp of when the event was produced. */ + occurredAt: string; + /** Service that emitted the event. */ + source: string; + /** Correlation id for distributed tracing. */ + traceId: string; + /** Optional causation chain. */ + causationId?: string; + payload: T; +} + +// ─── Topic constants ────────────────────────────────────────────────────────── + +export const TOPICS = { + USER_EVENTS: 'arenax.user.events', + MATCH_EVENTS: 'arenax.match.events', + ACHIEVEMENT_EVENTS: 'arenax.achievement.events', + WALLET_EVENTS: 'arenax.wallet.events', + // Dead-letter topics (one per domain) + DLQ_USER: 'arenax.user.events.dlq', + DLQ_MATCH: 'arenax.match.events.dlq', + DLQ_ACHIEVEMENT: 'arenax.achievement.events.dlq', + DLQ_WALLET: 'arenax.wallet.events.dlq', + // Event store topic for sourcing / replay + EVENT_STORE: 'arenax.event.store', +} as const; + +export type Topic = (typeof TOPICS)[keyof typeof TOPICS]; + +// ─── user.registered (v1) ──────────────────────────────────────────────────── + +export interface UserRegisteredPayload { + userId: string; + username: string; + email: string; + phoneNumber?: string; + stellarPublicKey?: string; + registeredVia: 'phone' | 'google' | 'discord' | 'twitch'; +} + +export type UserRegisteredEvent = EventEnvelope & { + eventType: 'user.registered'; + version: 1; +}; + +// ─── match.ended (v1) ──────────────────────────────────────────────────────── + +export interface MatchEndedPayload { + matchId: string; + tournamentId?: string; + playerAId: string; + playerBId: string; + winnerId: string; + loserEloChange: number; + winnerEloChange: number; + durationSeconds: number; + onChainTxHash?: string; +} + +export type MatchEndedEvent = EventEnvelope & { + eventType: 'match.ended'; + version: 1; +}; + +// ─── achievement.unlocked (v1) ─────────────────────────────────────────────── + +export interface AchievementUnlockedPayload { + userId: string; + achievementId: string; + achievementName: string; + xpAwarded: number; + triggeredBy: string; // e.g. 'match.ended' +} + +export type AchievementUnlockedEvent = EventEnvelope & { + eventType: 'achievement.unlocked'; + version: 1; +}; + +// ─── wallet.credited (v1) ──────────────────────────────────────────────────── + +export interface WalletCreditedPayload { + userId: string; + walletId: string; + amount: number; + currency: 'NGN' | 'XLM' | 'AXT'; + reason: 'tournament_win' | 'refund' | 'deposit' | 'reward'; + referenceId: string; + stellarTxHash?: string; +} + +export type WalletCreditedEvent = EventEnvelope & { + eventType: 'wallet.credited'; + version: 1; +}; + +// ─── Union type for all domain events ──────────────────────────────────────── + +export type DomainEvent = + | UserRegisteredEvent + | MatchEndedEvent + | AchievementUnlockedEvent + | WalletCreditedEvent; diff --git a/server/src/services/kafka/event-consumer.ts b/server/src/services/kafka/event-consumer.ts new file mode 100644 index 00000000..2ceed01d --- /dev/null +++ b/server/src/services/kafka/event-consumer.ts @@ -0,0 +1,124 @@ +import { Consumer, EachMessagePayload } from 'kafkajs'; +import { kafkaClient } from './kafka.client'; +import { EventEnvelope } from './domain-events'; +import { DLQHandler } from './dlq.handler'; +import { logger } from '../logger.service'; + +export interface ConsumerConfig { + /** Kafka consumer group id. */ + groupId: string; + /** Topics to subscribe to. */ + topics: string[]; + /** Max retry attempts before routing to DLQ. Default: 3. */ + maxRetries?: number; + /** Base delay in ms for exponential backoff. Default: 200. */ + retryBackoffMs?: number; +} + +/** + * Abstract base for all Kafka event consumers. + * Sub-classes implement `handle()` with their business logic. + * + * Reliability contract: + * - At-least-once delivery via manual offset commit after successful handle(). + * - Failed messages are retried up to `maxRetries` with exponential backoff. + * - Exhausted messages are forwarded to the DLQ topic — never lost. + * - Consumer failures do NOT block the main event loop (async/Promise isolation). + */ +export abstract class EventConsumer { + protected abstract readonly dlqTopic: string; + private consumer: Consumer | null = null; + private readonly maxRetries: number; + private readonly retryBackoffMs: number; + + constructor(protected readonly config: ConsumerConfig) { + this.maxRetries = config.maxRetries ?? 3; + this.retryBackoffMs = config.retryBackoffMs ?? 200; + } + + /** Business logic — implement in each concrete consumer. */ + protected abstract handle(envelope: EventEnvelope): Promise; + + async start(): Promise { + this.consumer = kafkaClient.createConsumer(this.config.groupId); + await this.consumer.connect(); + await this.consumer.subscribe({ topics: this.config.topics, fromBeginning: false }); + + await this.consumer.run({ + autoCommit: false, + eachMessage: async (msg) => this.dispatch(msg), + }); + + logger.info('Kafka consumer started', { + groupId: this.config.groupId, + topics: this.config.topics, + }); + } + + async stop(): Promise { + await this.consumer?.disconnect(); + logger.info('Kafka consumer stopped', { groupId: this.config.groupId }); + } + + private async dispatch(msg: EachMessagePayload): Promise { + const raw = msg.message.value?.toString(); + if (!raw) return; + + let envelope: EventEnvelope; + try { + envelope = JSON.parse(raw) as EventEnvelope; + } catch { + logger.error('Failed to parse Kafka message — routing to DLQ', { + topic: msg.topic, + offset: msg.message.offset, + }); + await DLQHandler.send(this.dlqTopic, msg.message, 'parse_error', 'invalid JSON'); + await this.commit(msg); + return; + } + + await this.withRetry(envelope, msg); + } + + private async withRetry(envelope: EventEnvelope, msg: EachMessagePayload): Promise { + let lastError: unknown; + for (let attempt = 1; attempt <= this.maxRetries; attempt++) { + try { + await this.handle(envelope); + await this.commit(msg); + return; + } catch (err) { + lastError = err; + const backoff = this.retryBackoffMs * Math.pow(2, attempt - 1); + logger.warn('Consumer handle failed, retrying', { + eventId: envelope.eventId, + eventType: envelope.eventType, + attempt, + backoffMs: backoff, + error: err instanceof Error ? err.message : String(err), + }); + await sleep(backoff); + } + } + + // All retries exhausted → DLQ + const errMsg = lastError instanceof Error ? lastError.message : String(lastError); + logger.error('Event routed to DLQ after exhausting retries', { + eventId: envelope.eventId, + eventType: envelope.eventType, + dlqTopic: this.dlqTopic, + }); + await DLQHandler.send(this.dlqTopic, msg.message, 'max_retries_exceeded', errMsg); + await this.commit(msg); + } + + private async commit(msg: EachMessagePayload): Promise { + await this.consumer?.commitOffsets([ + { topic: msg.topic, partition: msg.partition, offset: String(Number(msg.message.offset) + 1) }, + ]); + } +} + +function sleep(ms: number): Promise { + return new Promise((res) => setTimeout(res, ms)); +} diff --git a/server/src/services/kafka/event-producer.ts b/server/src/services/kafka/event-producer.ts new file mode 100644 index 00000000..6710e59f --- /dev/null +++ b/server/src/services/kafka/event-producer.ts @@ -0,0 +1,103 @@ +import { Producer, ProducerRecord } from 'kafkajs'; +import { v4 as uuidv4 } from 'uuid'; +import { kafkaClient } from './kafka.client'; +import { EventEnvelope } from './domain-events'; +import { logger } from '../logger.service'; + +/** + * Abstract base for all Kafka event producers. + * Sub-classes declare their topic and call `publish()`. + * + * Performance contract: publish() resolves in <100 ms for single messages + * because the underlying KafkaJS producer batches in a tight loop. + */ +export abstract class EventProducer { + protected abstract readonly topic: string; + protected abstract readonly eventType: string; + protected abstract readonly version: number; + private static connected = false; + + private get producer(): Producer { + return kafkaClient.getProducer(); + } + + /** Ensures the producer is connected (idempotent). */ + async connect(): Promise { + if (!EventProducer.connected) { + await kafkaClient.connectProducer(); + EventProducer.connected = true; + } + } + + /** + * Build and publish a single domain event. + * @param payload The typed event payload. + * @param traceId Trace-ID for distributed tracing (injected by middleware). + * @param key Optional partition key (defaults to a field in payload if available). + */ + async publish(payload: TPayload, traceId: string, key?: string): Promise { + const envelope: EventEnvelope = { + eventId: uuidv4(), + eventType: this.eventType, + version: this.version, + occurredAt: new Date().toISOString(), + source: process.env.KAFKA_CLIENT_ID ?? 'arenax-server', + traceId, + payload, + }; + + const record: ProducerRecord = { + topic: this.topic, + messages: [ + { + key: key ?? envelope.eventId, + value: JSON.stringify(envelope), + headers: { + traceId, + eventType: this.eventType, + version: String(this.version), + }, + }, + ], + }; + + const start = Date.now(); + await this.producer.send(record); + const latency = Date.now() - start; + + logger.info('Event published', { + topic: this.topic, + eventType: this.eventType, + eventId: envelope.eventId, + traceId, + latencyMs: latency, + }); + + if (latency > 100) { + logger.warn('Event publish latency exceeded 100ms SLA', { latencyMs: latency }); + } + } + + /** Publish multiple events in a single batch request. */ + async publishBatch(items: Array<{ payload: TPayload; traceId: string; key?: string }>): Promise { + const messages = items.map(({ payload, traceId, key }) => { + const envelope: EventEnvelope = { + eventId: uuidv4(), + eventType: this.eventType, + version: this.version, + occurredAt: new Date().toISOString(), + source: process.env.KAFKA_CLIENT_ID ?? 'arenax-server', + traceId, + payload, + }; + return { + key: key ?? envelope.eventId, + value: JSON.stringify(envelope), + headers: { traceId, eventType: this.eventType, version: String(this.version) }, + }; + }); + + await this.producer.send({ topic: this.topic, messages }); + logger.info('Batch published', { topic: this.topic, count: messages.length }); + } +} diff --git a/server/src/services/kafka/event-store.ts b/server/src/services/kafka/event-store.ts new file mode 100644 index 00000000..ab72a5aa --- /dev/null +++ b/server/src/services/kafka/event-store.ts @@ -0,0 +1,125 @@ +/** + * Event Sourcing: EventStore + EventReplay + * + * EventStore persists every domain event to the arenax.event.store Kafka + * topic (log-compacted, infinite retention). This makes the topic the + * single source of truth — any aggregate can be reconstructed by reading + * from offset 0. + * + * EventReplay reads back events from an arbitrary time window and re-delivers + * them to a target topic or a callback. Use for: + * - Rebuilding read models after a schema migration. + * - Debugging a production incident by replaying events in staging. + * - Bootstrapping a new micro-service that needs historical state. + */ + +import { Consumer, EachMessagePayload } from 'kafkajs'; +import { v4 as uuidv4 } from 'uuid'; +import { kafkaClient } from './kafka.client'; +import { EventEnvelope, TOPICS } from './domain-events'; +import { logger } from '../logger.service'; + +// ─── EventStore ─────────────────────────────────────────────────────────────── + +export class EventStore { + /** + * Append a domain event to the append-only event store topic. + * All concrete producers should call this alongside their domain topic. + */ + static async append(envelope: EventEnvelope): Promise { + const producer = kafkaClient.getProducer(); + await producer.send({ + topic: TOPICS.EVENT_STORE, + messages: [ + { + key: `${envelope.eventType}::${envelope.payload && typeof envelope.payload === 'object' && 'userId' in (envelope.payload as object) ? (envelope.payload as { userId: string }).userId : uuidv4()}`, + value: JSON.stringify(envelope), + headers: { + eventType: envelope.eventType, + version: String(envelope.version), + traceId: envelope.traceId, + occurredAt: envelope.occurredAt, + }, + }, + ], + }); + } +} + +// ─── EventReplay ────────────────────────────────────────────────────────────── + +export interface ReplayOptions { + /** ISO-8601 start timestamp (inclusive). */ + fromTimestamp: string; + /** ISO-8601 end timestamp (inclusive). Defaults to now. */ + toTimestamp?: string; + /** Optional filter — only replay events matching these types. */ + eventTypes?: string[]; + /** Replay callback. Return false to stop early. */ + onEvent: (envelope: EventEnvelope) => Promise; + /** Consumer group used for this replay run. Unique per replay to avoid offset interference. */ + replayGroupId?: string; +} + +export class EventReplay { + /** + * Replay events from the event store within a time window. + * + * Implementation uses `seekToTimestamp` on the KafkaJS consumer to jump + * directly to the start offset corresponding to `fromTimestamp`, then + * reads forward until `toTimestamp` is exceeded. + */ + static async run(opts: ReplayOptions): Promise { + const groupId = opts.replayGroupId ?? `arenax.replay.${uuidv4()}`; + const consumer: Consumer = kafkaClient.createConsumer(groupId); + let processed = 0; + const fromMs = new Date(opts.fromTimestamp).getTime(); + const toMs = opts.toTimestamp ? new Date(opts.toTimestamp).getTime() : Date.now(); + + await consumer.connect(); + await consumer.subscribe({ topic: TOPICS.EVENT_STORE, fromBeginning: true }); + + logger.info('EventReplay started', { groupId, fromTimestamp: opts.fromTimestamp, toTimestamp: opts.toTimestamp ?? 'now' }); + + const admin = kafkaClient.getAdmin(); + await admin.connect(); + + // Resolve start offsets by timestamp + const offsets = await admin.fetchTopicOffsetsByTimestamp(TOPICS.EVENT_STORE, fromMs); + await admin.disconnect(); + + await consumer.run({ + autoCommit: true, + eachMessage: async (msg: EachMessagePayload) => { + const raw = msg.message.value?.toString(); + if (!raw) return; + + const envelope = JSON.parse(raw) as EventEnvelope; + const eventMs = new Date(envelope.occurredAt).getTime(); + + if (eventMs > toMs) { + // Past our window — stop + await consumer.stop(); + return; + } + + if (opts.eventTypes && !opts.eventTypes.includes(envelope.eventType)) return; + + const cont = await opts.onEvent(envelope); + processed++; + if (cont === false) { + await consumer.stop(); + } + }, + }); + + // Seek to resolved offsets before running + for (const { partition, offset } of offsets) { + consumer.seek({ topic: TOPICS.EVENT_STORE, partition, offset: offset ?? '0' }); + } + + await consumer.disconnect(); + logger.info('EventReplay completed', { groupId, processed }); + return processed; + } +} diff --git a/server/src/services/kafka/event-tracing.middleware.ts b/server/src/services/kafka/event-tracing.middleware.ts new file mode 100644 index 00000000..bbf5a953 --- /dev/null +++ b/server/src/services/kafka/event-tracing.middleware.ts @@ -0,0 +1,119 @@ +/** + * Event tracing middleware for Kafka observability. + * + * Provides: + * 1. `withEventTrace` — wraps any async producer call, injecting/propagating + * trace IDs and emitting structured logs. + * 2. `extractTraceContext` — reads trace headers from incoming Kafka messages. + * 3. `eventTracingMiddleware` — Express-compatible middleware that stamps each + * HTTP request with a traceId that callers forward into Kafka events. + */ +import { Request, Response, NextFunction } from 'express'; +import { v4 as uuidv4 } from 'uuid'; +import { IHeaders } from 'kafkajs'; +import { logger } from '../logger.service'; + +// ─── Types ──────────────────────────────────────────────────────────────────── + +export interface TraceContext { + traceId: string; + spanId: string; + /** Optional parent span id from upstream. */ + parentSpanId?: string; +} + +// ─── Express middleware ─────────────────────────────────────────────────────── + +/** + * Attaches a `traceId` to every HTTP request. + * Subsequent event publishes should call `req.traceId` to propagate context. + */ +export function eventTracingMiddleware(req: Request, res: Response, next: NextFunction): void { + const traceId = + (req.headers['x-trace-id'] as string) ?? + (req.headers['x-request-id'] as string) ?? + uuidv4(); + + (req as Request & { traceId: string }).traceId = traceId; + res.setHeader('x-trace-id', traceId); + next(); +} + +// ─── Producer-side trace injection ─────────────────────────────────────────── + +/** + * Wrap a producer call with automatic trace emission and timing. + * + * @example + * await withEventTrace('match.ended', traceId, () => + * matchProducer.publishMatchEnded(payload, traceId) + * ); + */ +export async function withEventTrace( + eventType: string, + traceId: string, + fn: () => Promise, +): Promise { + const spanId = uuidv4().slice(0, 8); + const start = Date.now(); + + logger.info('Event trace: start', { eventType, traceId, spanId }); + + try { + const result = await fn(); + const durationMs = Date.now() - start; + + logger.info('Event trace: success', { eventType, traceId, spanId, durationMs }); + + if (durationMs > 100) { + logger.warn('Event publish exceeded 100ms SLA', { eventType, traceId, durationMs }); + } + + return result; + } catch (err) { + const durationMs = Date.now() - start; + logger.error('Event trace: error', { + eventType, + traceId, + spanId, + durationMs, + error: err instanceof Error ? err.message : String(err), + }); + throw err; + } +} + +// ─── Consumer-side trace extraction ────────────────────────────────────────── + +/** + * Extract trace context from incoming Kafka message headers. + * Falls back to new IDs when headers are absent. + */ +export function extractTraceContext(headers: IHeaders = {}): TraceContext { + const get = (key: string): string | undefined => { + const v = headers[key]; + if (!v) return undefined; + return Buffer.isBuffer(v) ? v.toString() : (v as string); + }; + + return { + traceId: get('traceId') ?? uuidv4(), + spanId: uuidv4().slice(0, 8), + parentSpanId: get('spanId'), + }; +} + +// ─── Consumer-side observability hook ──────────────────────────────────────── + +/** + * Log a structured consumed-event record. + * Call at the start of each EventConsumer.handle() implementation. + */ +export function logConsumedEvent( + eventType: string, + eventId: string, + traceId: string, + groupId: string, +): void { + logger.info('Event consumed', { eventType, eventId, traceId, groupId }); +} diff --git a/server/src/services/kafka/kafka.client.ts b/server/src/services/kafka/kafka.client.ts new file mode 100644 index 00000000..5b66c335 --- /dev/null +++ b/server/src/services/kafka/kafka.client.ts @@ -0,0 +1,71 @@ +import { Kafka, KafkaConfig, Producer, Consumer, Admin, logLevel } from 'kafkajs'; +import { logger } from '../logger.service'; + +const brokers = (process.env.KAFKA_BROKERS ?? 'localhost:9092').split(','); +const clientId = process.env.KAFKA_CLIENT_ID ?? 'arenax-server'; + +const config: KafkaConfig = { + clientId, + brokers, + logLevel: logLevel.WARN, + retry: { + initialRetryTime: 100, + retries: 8, + }, +}; + +class KafkaClient { + private readonly kafka = new Kafka(config); + private producer: Producer | null = null; + private admin: Admin | null = null; + + getProducer(): Producer { + if (!this.producer) { + this.producer = this.kafka.producer({ + allowAutoTopicCreation: true, + transactionTimeout: 30_000, + // idempotent producer — exactly-once writes + idempotent: true, + }); + } + return this.producer; + } + + createConsumer(groupId: string): Consumer { + return this.kafka.consumer({ + groupId, + sessionTimeout: 30_000, + heartbeatInterval: 3_000, + maxWaitTimeInMs: 5_000, + }); + } + + getAdmin(): Admin { + if (!this.admin) { + this.admin = this.kafka.admin(); + } + return this.admin; + } + + async connectProducer(): Promise { + await this.getProducer().connect(); + logger.info('Kafka producer connected'); + } + + async disconnectProducer(): Promise { + if (this.producer) { + await this.producer.disconnect(); + this.producer = null; + logger.info('Kafka producer disconnected'); + } + } + + async disconnectAdmin(): Promise { + if (this.admin) { + await this.admin.disconnect(); + this.admin = null; + } + } +} + +export const kafkaClient = new KafkaClient(); diff --git a/server/src/services/kafka/producers.ts b/server/src/services/kafka/producers.ts new file mode 100644 index 00000000..7ae1303d --- /dev/null +++ b/server/src/services/kafka/producers.ts @@ -0,0 +1,64 @@ +import { EventProducer } from './event-producer'; +import { + TOPICS, + MatchEndedPayload, + UserRegisteredPayload, + AchievementUnlockedPayload, + WalletCreditedPayload, +} from './domain-events'; + +// ─── MatchProducer ──────────────────────────────────────────────────────────── + +export class MatchProducer extends EventProducer { + protected readonly topic = TOPICS.MATCH_EVENTS; + protected readonly eventType = 'match.ended'; + protected readonly version = 1; + + async publishMatchEnded(payload: MatchEndedPayload, traceId: string): Promise { + // Partition by matchId so all events for the same match land on the same partition + await this.publish(payload, traceId, payload.matchId); + } +} + +// ─── UserProducer ───────────────────────────────────────────────────────────── + +export class UserProducer extends EventProducer { + protected readonly topic = TOPICS.USER_EVENTS; + protected readonly eventType = 'user.registered'; + protected readonly version = 1; + + async publishUserRegistered(payload: UserRegisteredPayload, traceId: string): Promise { + await this.publish(payload, traceId, payload.userId); + } +} + +// ─── AchievementProducer ────────────────────────────────────────────────────── + +export class AchievementProducer extends EventProducer { + protected readonly topic = TOPICS.ACHIEVEMENT_EVENTS; + protected readonly eventType = 'achievement.unlocked'; + protected readonly version = 1; + + async publishAchievementUnlocked(payload: AchievementUnlockedPayload, traceId: string): Promise { + await this.publish(payload, traceId, payload.userId); + } +} + +// ─── WalletProducer ─────────────────────────────────────────────────────────── + +export class WalletProducer extends EventProducer { + protected readonly topic = TOPICS.WALLET_EVENTS; + protected readonly eventType = 'wallet.credited'; + protected readonly version = 1; + + async publishWalletCredited(payload: WalletCreditedPayload, traceId: string): Promise { + await this.publish(payload, traceId, payload.userId); + } +} + +// ─── Singleton instances ────────────────────────────────────────────────────── + +export const matchProducer = new MatchProducer(); +export const userProducer = new UserProducer(); +export const achievementProducer = new AchievementProducer(); +export const walletProducer = new WalletProducer(); From faa2e68749c1592e1db95ab2419a083391025e81 Mon Sep 17 00:00:00 2001 From: Manceraider24 Date: Tue, 23 Jun 2026 23:15:13 +0000 Subject: [PATCH 2/2] feat: implement offline-first architecture with service worker and sync queue (#510) --- frontend/next.config.js | 3 + frontend/public/sw.js | 118 ++++++++++++ frontend/src/__tests__/offline-sync.test.ts | 177 ++++++++++++++++++ frontend/src/app/layout.tsx | 9 +- frontend/src/app/offline/page.tsx | 18 ++ .../src/components/offline/OfflineBanner.tsx | 32 ++++ frontend/src/contexts/OfflineContext.tsx | 86 +++++++++ frontend/src/hooks/useNetworkStatus.ts | 24 +++ frontend/src/lib/analyticsQueue.ts | 61 ++++++ frontend/src/lib/offlineStorage.ts | 65 +++++++ frontend/src/lib/syncQueue.ts | 94 ++++++++++ 11 files changed, 685 insertions(+), 2 deletions(-) create mode 100644 frontend/public/sw.js create mode 100644 frontend/src/__tests__/offline-sync.test.ts create mode 100644 frontend/src/app/offline/page.tsx create mode 100644 frontend/src/components/offline/OfflineBanner.tsx create mode 100644 frontend/src/contexts/OfflineContext.tsx create mode 100644 frontend/src/hooks/useNetworkStatus.ts create mode 100644 frontend/src/lib/analyticsQueue.ts create mode 100644 frontend/src/lib/offlineStorage.ts create mode 100644 frontend/src/lib/syncQueue.ts diff --git a/frontend/next.config.js b/frontend/next.config.js index 10e6a264..0f4c7ee3 100644 --- a/frontend/next.config.js +++ b/frontend/next.config.js @@ -3,6 +3,9 @@ const withPWA = require("next-pwa")({ register: true, skipWaiting: true, disable: process.env.NODE_ENV === "development", + // Use our handcrafted service worker instead of the Workbox auto-generated one + swSrc: "public/sw.js", + swDest: "public/sw.js", }); /** @type {import('next').NextConfig} */ diff --git a/frontend/public/sw.js b/frontend/public/sw.js new file mode 100644 index 00000000..8cbb4a27 --- /dev/null +++ b/frontend/public/sw.js @@ -0,0 +1,118 @@ +// ArenaX Service Worker +// Strategies: +// - Static/JS/CSS assets → Cache-First (immutable) +// - API GET requests → Network-First, fallback to cache (5-min TTL) +// - Everything else → Network-First, fallback to /offline + +const CACHE_VERSION = "v1"; +const STATIC_CACHE = `arenax-static-${CACHE_VERSION}`; +const API_CACHE = `arenax-api-${CACHE_VERSION}`; +const OFFLINE_URL = "/offline"; + +const PRECACHE_ASSETS = ["/", OFFLINE_URL, "/manifest.json"]; + +// ─── Install ──────────────────────────────────────────────────────────────── +self.addEventListener("install", (event) => { + event.waitUntil( + caches + .open(STATIC_CACHE) + .then((cache) => cache.addAll(PRECACHE_ASSETS)) + .then(() => self.skipWaiting()) + ); +}); + +// ─── Activate ─────────────────────────────────────────────────────────────── +self.addEventListener("activate", (event) => { + const keep = [STATIC_CACHE, API_CACHE]; + event.waitUntil( + caches + .keys() + .then((keys) => + Promise.all(keys.filter((k) => !keep.includes(k)).map((k) => caches.delete(k))) + ) + .then(() => self.clients.claim()) + ); +}); + +// ─── Fetch ────────────────────────────────────────────────────────────────── +self.addEventListener("fetch", (event) => { + const { request } = event; + const url = new URL(request.url); + + // Skip non-GET and cross-origin + if (request.method !== "GET" || url.origin !== self.location.origin) return; + + // Static assets → Cache-First + if ( + url.pathname.startsWith("/_next/static/") || + url.pathname.startsWith("/icons/") || + url.pathname.match(/\.(js|css|woff2?|png|svg|ico)$/) + ) { + event.respondWith(cacheFirst(request, STATIC_CACHE)); + return; + } + + // API GET → Network-First, fallback to stale cache + if (url.pathname.startsWith("/api/")) { + event.respondWith(networkFirstWithCache(request, API_CACHE, 5 * 60)); + return; + } + + // Navigation → Network-First, fallback to offline page + if (request.mode === "navigate") { + event.respondWith(navigationHandler(request)); + return; + } +}); + +// ─── Strategies ───────────────────────────────────────────────────────────── +async function cacheFirst(request, cacheName) { + const cached = await caches.match(request); + if (cached) return cached; + const response = await fetch(request); + if (response.ok) { + const cache = await caches.open(cacheName); + cache.put(request, response.clone()); + } + return response; +} + +async function networkFirstWithCache(request, cacheName, maxAgeSecs) { + const cache = await caches.open(cacheName); + try { + const response = await fetch(request); + if (response.ok) { + // Stamp with fetch time for TTL enforcement + const headers = new Headers(response.headers); + headers.set("x-sw-fetched-at", String(Date.now())); + const stamped = new Response(await response.clone().arrayBuffer(), { + status: response.status, + statusText: response.statusText, + headers, + }); + cache.put(request, stamped); + } + return response; + } catch { + const cached = await cache.match(request); + if (cached) { + const fetchedAt = Number(cached.headers.get("x-sw-fetched-at") ?? 0); + if (Date.now() - fetchedAt < maxAgeSecs * 1000) return cached; + } + return new Response(JSON.stringify({ error: "offline", code: 503 }), { + status: 503, + headers: { "Content-Type": "application/json" }, + }); + } +} + +async function navigationHandler(request) { + try { + const response = await fetch(request); + return response; + } catch { + const cached = await caches.match(request); + if (cached) return cached; + return caches.match(OFFLINE_URL); + } +} diff --git a/frontend/src/__tests__/offline-sync.test.ts b/frontend/src/__tests__/offline-sync.test.ts new file mode 100644 index 00000000..a01b9180 --- /dev/null +++ b/frontend/src/__tests__/offline-sync.test.ts @@ -0,0 +1,177 @@ +/** + * Tests for offline-first architecture: + * - SyncQueue (enqueue, LWW dedup, flush, ordering) + * - useNetworkStatus hook + * - AnalyticsQueue + */ + +// ─── Mock IndexedDB via offlineStorage ────────────────────────────────────── +const store: Record = {}; + +jest.mock("@/lib/offlineStorage", () => ({ + idbGet: jest.fn(async (key: string) => store[key]), + idbSet: jest.fn(async (key: string, value: unknown) => { + store[key] = value; + }), + idbDelete: jest.fn(async (key: string) => { + delete store[key]; + }), + idbClear: jest.fn(async () => { + Object.keys(store).forEach((k) => delete store[k]); + }), +})); + +// ─── Imports ──────────────────────────────────────────────────────────────── +import { + enqueueSync, + flushSyncQueue, + getSyncQueueLength, +} from "@/lib/syncQueue"; +import { enqueueAnalytics, flushAnalyticsQueue } from "@/lib/analyticsQueue"; +import { renderHook, act } from "@testing-library/react"; +import { useNetworkStatus } from "@/hooks/useNetworkStatus"; + +// ─── Helpers ──────────────────────────────────────────────────────────────── +beforeEach(() => { + // Reset in-memory store between tests + Object.keys(store).forEach((k) => delete store[k]); + jest.restoreAllMocks(); + // Restore navigator.onLine + Object.defineProperty(navigator, "onLine", { value: true, configurable: true }); +}); + +// ── SyncQueue ──────────────────────────────────────────────────────────────── +describe("syncQueue", () => { + it("enqueues an item and reports pending count", async () => { + await enqueueSync({ url: "/api/matches/1/report", method: "POST", body: '{"score":10}' }); + expect(await getSyncQueueLength()).toBe(1); + }); + + it("applies Last-Write-Wins for same url+method", async () => { + await enqueueSync({ url: "/api/matches/1/report", method: "POST", body: '{"score":10}' }); + await enqueueSync({ url: "/api/matches/1/report", method: "POST", body: '{"score":20}' }); + + expect(await getSyncQueueLength()).toBe(1); + // The queue in the store should hold the second (newer) body + const { idbGet } = jest.requireMock("@/lib/offlineStorage"); + const queue = await idbGet("offline:sync-queue"); + expect(queue[0].body).toBe('{"score":20}'); + }); + + it("flushes items in chronological order and clears on 2xx", async () => { + const fetchOrder: string[] = []; + global.fetch = jest.fn(async (url: string) => { + fetchOrder.push(url as string); + return { ok: true, status: 200 } as Response; + }) as jest.Mock; + + // Add two items with different timestamps + await enqueueSync({ url: "/api/a", method: "POST" }); + await new Promise((r) => setTimeout(r, 5)); // ensure ts difference + await enqueueSync({ url: "/api/b", method: "POST" }); + + await flushSyncQueue(); + + expect(fetchOrder).toEqual(["/api/a", "/api/b"]); + expect(await getSyncQueueLength()).toBe(0); + }); + + it("stops flushing if network is unavailable", async () => { + global.fetch = jest.fn().mockRejectedValue(new TypeError("Network error")) as jest.Mock; + + await enqueueSync({ url: "/api/c", method: "POST" }); + await flushSyncQueue(); + + // Item should remain queued + expect(await getSyncQueueLength()).toBe(1); + }); + + it("removes 409 conflict responses (server resolved it)", async () => { + global.fetch = jest.fn(async () => ({ ok: false, status: 409 } as Response)) as jest.Mock; + + await enqueueSync({ url: "/api/d", method: "POST" }); + await flushSyncQueue(); + + expect(await getSyncQueueLength()).toBe(0); + }); +}); + +// ── useNetworkStatus ───────────────────────────────────────────────────────── +describe("useNetworkStatus", () => { + it("returns true when navigator.onLine is true", () => { + Object.defineProperty(navigator, "onLine", { value: true, configurable: true }); + const { result } = renderHook(() => useNetworkStatus()); + expect(result.current.isOnline).toBe(true); + }); + + it("updates to false when offline event fires", () => { + Object.defineProperty(navigator, "onLine", { value: true, configurable: true }); + const { result } = renderHook(() => useNetworkStatus()); + + act(() => { + window.dispatchEvent(new Event("offline")); + }); + + expect(result.current.isOnline).toBe(false); + }); + + it("updates back to true when online event fires", () => { + Object.defineProperty(navigator, "onLine", { value: false, configurable: true }); + const { result } = renderHook(() => useNetworkStatus()); + + act(() => { + window.dispatchEvent(new Event("online")); + }); + + expect(result.current.isOnline).toBe(true); + }); +}); + +// ── AnalyticsQueue ─────────────────────────────────────────────────────────── +describe("analyticsQueue", () => { + it("enqueues analytics events", async () => { + await enqueueAnalytics({ name: "page_view", timestamp: Date.now() }); + const { idbGet } = jest.requireMock("@/lib/offlineStorage"); + const queue = await idbGet("offline:analytics-queue"); + expect(queue).toHaveLength(1); + expect(queue[0].name).toBe("page_view"); + }); + + it("flushes via sendBeacon when available and clears queue", async () => { + await enqueueAnalytics({ name: "match_start", timestamp: Date.now() }); + + const mockSendBeacon = jest.fn(() => true); + Object.defineProperty(navigator, "sendBeacon", { + value: mockSendBeacon, + configurable: true, + }); + + await flushAnalyticsQueue(); + + expect(mockSendBeacon).toHaveBeenCalledWith( + "/api/analytics/events", + expect.stringContaining("match_start") + ); + + const { idbGet } = jest.requireMock("@/lib/offlineStorage"); + expect(await idbGet("offline:analytics-queue")).toHaveLength(0); + }); + + it("falls back to fetch when sendBeacon fails", async () => { + await enqueueAnalytics({ name: "tournament_join", timestamp: Date.now() }); + + Object.defineProperty(navigator, "sendBeacon", { + value: jest.fn(() => false), + configurable: true, + }); + + global.fetch = jest.fn(async () => ({ ok: true } as Response)) as jest.Mock; + + await flushAnalyticsQueue(); + + expect(global.fetch).toHaveBeenCalledWith( + "/api/analytics/events", + expect.objectContaining({ method: "POST" }) + ); + }); +}); diff --git a/frontend/src/app/layout.tsx b/frontend/src/app/layout.tsx index 2784b020..672ee161 100644 --- a/frontend/src/app/layout.tsx +++ b/frontend/src/app/layout.tsx @@ -9,6 +9,8 @@ import { TxStatusProvider } from "@/hooks/useTxStatus"; import { WalletProvider } from "@/hooks/useWallet"; import { NotificationProvider } from "@/contexts/NotificationContext"; import { WebVitalsInit } from "@/components/providers/WebVitalsInit"; +import { OfflineProvider } from "@/contexts/OfflineContext"; +import { OfflineBanner } from "@/components/offline/OfflineBanner"; export const metadata: Metadata = { title: "ArenaX", @@ -49,8 +51,11 @@ export default function RootLayout({ - - {children} + + + + {children} + diff --git a/frontend/src/app/offline/page.tsx b/frontend/src/app/offline/page.tsx new file mode 100644 index 00000000..76c7505b --- /dev/null +++ b/frontend/src/app/offline/page.tsx @@ -0,0 +1,18 @@ +export default function OfflinePage() { + return ( +
+ +

You're offline

+

+ No internet connection detected. Check your network and try again. + Changes you make will sync automatically when you reconnect. +

+ +
+ ); +} diff --git a/frontend/src/components/offline/OfflineBanner.tsx b/frontend/src/components/offline/OfflineBanner.tsx new file mode 100644 index 00000000..6aebe010 --- /dev/null +++ b/frontend/src/components/offline/OfflineBanner.tsx @@ -0,0 +1,32 @@ +"use client"; + +import { useOffline } from "@/contexts/OfflineContext"; + +export function OfflineBanner() { + const { isOnline, isSyncing, pendingCount } = useOffline(); + + if (isOnline && !isSyncing) return null; + + return ( +
+ {isSyncing ? ( + <> + + Syncing changes{pendingCount > 0 ? ` (${pendingCount} left)` : ""}… + + ) : ( + <> + + You are offline + {pendingCount > 0 && ` · ${pendingCount} change${pendingCount > 1 ? "s" : ""} pending`} + + )} +
+ ); +} diff --git a/frontend/src/contexts/OfflineContext.tsx b/frontend/src/contexts/OfflineContext.tsx new file mode 100644 index 00000000..d52c1f2e --- /dev/null +++ b/frontend/src/contexts/OfflineContext.tsx @@ -0,0 +1,86 @@ +"use client"; + +import { + createContext, + useContext, + useEffect, + useRef, + useState, + useCallback, + ReactNode, +} from "react"; +import { useNetworkStatus } from "@/hooks/useNetworkStatus"; +import { + enqueueSync, + flushSyncQueue, + getSyncQueueLength, + SyncItem, +} from "@/lib/syncQueue"; +import { flushAnalyticsQueue, enqueueAnalytics } from "@/lib/analyticsQueue"; + +interface OfflineContextValue { + isOnline: boolean; + isSyncing: boolean; + pendingCount: number; + queueMutation: (item: Omit) => Promise; + trackEvent: (name: string, props?: Record) => void; +} + +const OfflineContext = createContext(undefined); + +export function useOffline(): OfflineContextValue { + const ctx = useContext(OfflineContext); + if (!ctx) throw new Error("useOffline must be used inside OfflineProvider"); + return ctx; +} + +export function OfflineProvider({ children }: { children: ReactNode }) { + const { isOnline } = useNetworkStatus(); + const [isSyncing, setIsSyncing] = useState(false); + const [pendingCount, setPendingCount] = useState(0); + const prevOnline = useRef(isOnline); + + // Refresh pending count whenever online state changes + useEffect(() => { + getSyncQueueLength().then(setPendingCount); + }, [isOnline]); + + // When coming back online, flush both queues + useEffect(() => { + if (isOnline && !prevOnline.current) { + setIsSyncing(true); + Promise.all([ + flushSyncQueue((remaining) => setPendingCount(remaining)), + flushAnalyticsQueue(), + ]).finally(() => { + setIsSyncing(false); + getSyncQueueLength().then(setPendingCount); + }); + } + prevOnline.current = isOnline; + }, [isOnline]); + + const queueMutation = useCallback( + async (item: Omit) => { + await enqueueSync(item); + setPendingCount((n) => n + 1); + }, + [] + ); + + const trackEvent = useCallback( + (name: string, props?: Record) => { + enqueueAnalytics({ name, props, timestamp: Date.now() }); + if (isOnline) flushAnalyticsQueue(); + }, + [isOnline] + ); + + return ( + + {children} + + ); +} diff --git a/frontend/src/hooks/useNetworkStatus.ts b/frontend/src/hooks/useNetworkStatus.ts new file mode 100644 index 00000000..b7a3ad0e --- /dev/null +++ b/frontend/src/hooks/useNetworkStatus.ts @@ -0,0 +1,24 @@ +"use client"; + +import { useEffect, useState } from "react"; + +export function useNetworkStatus() { + const [isOnline, setIsOnline] = useState( + typeof navigator !== "undefined" ? navigator.onLine : true + ); + + useEffect(() => { + const up = () => setIsOnline(true); + const down = () => setIsOnline(false); + + window.addEventListener("online", up); + window.addEventListener("offline", down); + + return () => { + window.removeEventListener("online", up); + window.removeEventListener("offline", down); + }; + }, []); + + return { isOnline }; +} diff --git a/frontend/src/lib/analyticsQueue.ts b/frontend/src/lib/analyticsQueue.ts new file mode 100644 index 00000000..c1914d30 --- /dev/null +++ b/frontend/src/lib/analyticsQueue.ts @@ -0,0 +1,61 @@ +"use client"; + +import { idbGet, idbSet } from "./offlineStorage"; + +const ANALYTICS_KEY = "offline:analytics-queue"; + +export interface AnalyticsEvent { + name: string; + props?: Record; + timestamp: number; +} + +async function readQueue(): Promise { + return (await idbGet(ANALYTICS_KEY)) ?? []; +} + +async function writeQueue(queue: AnalyticsEvent[]): Promise { + await idbSet(ANALYTICS_KEY, queue); +} + +export async function enqueueAnalytics(event: AnalyticsEvent): Promise { + const queue = await readQueue(); + queue.push(event); + await writeQueue(queue); +} + +/** + * Flush queued analytics events. + * Replace the `sendBeacon` target with your actual analytics endpoint. + */ +export async function flushAnalyticsQueue(): Promise { + const queue = await readQueue(); + if (queue.length === 0) return; + + const endpoint = "/api/analytics/events"; + + try { + const sent = navigator.sendBeacon( + endpoint, + JSON.stringify({ events: queue }) + ); + if (sent) { + await writeQueue([]); + return; + } + } catch { + // sendBeacon not available or failed, fall through to fetch + } + + try { + const res = await fetch(endpoint, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ events: queue }), + keepalive: true, + }); + if (res.ok) await writeQueue([]); + } catch { + // Network still down – will retry on next flush + } +} diff --git a/frontend/src/lib/offlineStorage.ts b/frontend/src/lib/offlineStorage.ts new file mode 100644 index 00000000..d01bec79 --- /dev/null +++ b/frontend/src/lib/offlineStorage.ts @@ -0,0 +1,65 @@ +/** + * Lightweight IndexedDB wrapper for offline-first storage. + * Stores key-value pairs across a named object store. + */ + +const DB_NAME = "arenax-offline"; +const DB_VERSION = 1; +const STORE_NAME = "kv"; + +function openDB(): Promise { + return new Promise((resolve, reject) => { + const req = indexedDB.open(DB_NAME, DB_VERSION); + req.onupgradeneeded = () => req.result.createObjectStore(STORE_NAME); + req.onsuccess = () => resolve(req.result); + req.onerror = () => reject(req.error); + }); +} + +export async function idbGet(key: string): Promise { + const db = await openDB(); + return new Promise((resolve, reject) => { + const req = db + .transaction(STORE_NAME, "readonly") + .objectStore(STORE_NAME) + .get(key); + req.onsuccess = () => resolve(req.result as T | undefined); + req.onerror = () => reject(req.error); + }); +} + +export async function idbSet(key: string, value: T): Promise { + const db = await openDB(); + return new Promise((resolve, reject) => { + const req = db + .transaction(STORE_NAME, "readwrite") + .objectStore(STORE_NAME) + .put(value, key); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); +} + +export async function idbDelete(key: string): Promise { + const db = await openDB(); + return new Promise((resolve, reject) => { + const req = db + .transaction(STORE_NAME, "readwrite") + .objectStore(STORE_NAME) + .delete(key); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); +} + +export async function idbClear(): Promise { + const db = await openDB(); + return new Promise((resolve, reject) => { + const req = db + .transaction(STORE_NAME, "readwrite") + .objectStore(STORE_NAME) + .clear(); + req.onsuccess = () => resolve(); + req.onerror = () => reject(req.error); + }); +} diff --git a/frontend/src/lib/syncQueue.ts b/frontend/src/lib/syncQueue.ts new file mode 100644 index 00000000..a9158da1 --- /dev/null +++ b/frontend/src/lib/syncQueue.ts @@ -0,0 +1,94 @@ +"use client"; + +import { idbGet, idbSet } from "./offlineStorage"; + +const QUEUE_KEY = "offline:sync-queue"; + +export interface SyncItem { + id: string; + url: string; + method: string; + body?: string; + headers?: Record; + /** Unix ms timestamp – used for Last-Write-Wins conflict resolution */ + timestamp: number; +} + +async function readQueue(): Promise { + return (await idbGet(QUEUE_KEY)) ?? []; +} + +async function writeQueue(queue: SyncItem[]): Promise { + await idbSet(QUEUE_KEY, queue); +} + +/** Enqueue a mutation that should be replayed when back online. */ +export async function enqueueSync( + item: Omit +): Promise { + const queue = await readQueue(); + + // Last-Write-Wins: if the same URL+method already exists, replace with newer timestamp + const idx = queue.findIndex( + (q) => q.url === item.url && q.method === item.method + ); + + const entry: SyncItem = { + ...item, + id: crypto.randomUUID(), + timestamp: Date.now(), + }; + + if (idx >= 0) { + queue.splice(idx, 1, entry); + } else { + queue.push(entry); + } + + await writeQueue(queue); +} + +/** Replay all queued mutations in chronological order. Removes each on success. */ +export async function flushSyncQueue( + onProgress?: (remaining: number) => void +): Promise { + let queue = await readQueue(); + // Ensure chronological order + queue.sort((a, b) => a.timestamp - b.timestamp); + + for (const item of [...queue]) { + try { + const token = + typeof localStorage !== "undefined" + ? (localStorage.getItem("auth_token") ?? + sessionStorage.getItem("auth_token")) + : null; + + const headers: Record = { + "Content-Type": "application/json", + ...item.headers, + }; + if (token) headers.Authorization = `Bearer ${token}`; + + const res = await fetch(item.url, { + method: item.method, + body: item.body, + headers, + }); + + // 2xx or 409 (conflict already resolved server-side) → remove from queue + if (res.ok || res.status === 409) { + queue = queue.filter((q) => q.id !== item.id); + await writeQueue(queue); + onProgress?.(queue.length); + } + } catch { + // Network still unavailable – stop and try again later + break; + } + } +} + +export async function getSyncQueueLength(): Promise { + return (await readQueue()).length; +}