diff --git a/listener/src/config.ts b/listener/src/config.ts index 67038d5..6c361e4 100644 --- a/listener/src/config.ts +++ b/listener/src/config.ts @@ -1,4 +1,4 @@ -import { Config, ContractConfig, DiscordConfig, WebhookSecret } from './types'; +import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig } from './types'; export class ConfigError extends Error { constructor(message: string) { @@ -112,6 +112,15 @@ function validateWebhookSecrets(value: unknown): WebhookSecret[] { }); } +function loadCleanupConfig(): AppCleanupConfig { + return { + intervalMs: parseIntegerEnv('CLEANUP_INTERVAL_MS', String(60 * 60 * 1000)), + notificationRetentionMs: parseIntegerEnv('NOTIFICATION_RETENTION_MS', String(7 * 24 * 60 * 60 * 1000)), + rateLimitEventRetentionMs: parseIntegerEnv('RATE_LIMIT_EVENT_RETENTION_MS', String(24 * 60 * 60 * 1000)), + eventRetentionMs: parseIntegerEnv('EVENT_RETENTION_MS', String(24 * 60 * 60 * 1000)), + }; +} + export function loadConfig(): Config { const discord = loadDiscordConfig(); const rawContractAddresses = parseJsonEnv('CONTRACT_ADDRESSES', '[]'); @@ -152,6 +161,7 @@ export function loadConfig(): Config { maxRequests: parseIntegerEnv('RATE_LIMIT_MAX_REQUESTS', '60'), clientOverrides, }, + cleanup: loadCleanupConfig(), }; } diff --git a/listener/src/index.ts b/listener/src/index.ts index 1453e41..c6397cb 100644 --- a/listener/src/index.ts +++ b/listener/src/index.ts @@ -8,8 +8,10 @@ import { NotificationTemplateService } from './services/notification-template-se import { TemplateAuditTrail } from './services/template-audit-trail'; import { getTemplateCache } from './services/notification-template-cache'; import { NotificationAPI } from './services/notification-api'; +import { CleanupService } from './services/cleanup-service'; import { initializeDatabase } from './database/database'; import { DiscordNotificationService } from './services/discord-notification'; +import { eventRegistry } from './store/event-registry'; import logger from './utils/logger'; import { loadConfig, ConfigError } from './config'; @@ -22,11 +24,20 @@ async function main() { let scheduler: NotificationScheduler | null = null; let notificationAPI: NotificationAPI | null = null; let templateService: NotificationTemplateService | null = null; + let cleanupService: CleanupService | null = null; try { logger.info('Initializing database'); const db = await initializeDatabase(config.databasePath); + // Rebuild registry with configured event TTL + if (config.cleanup) { + eventRegistry['ttlMs'] = config.cleanup.eventRetentionMs; + } + + cleanupService = new CleanupService(db, eventRegistry, config.cleanup); + cleanupService.start(); + const templateRepository = new NotificationTemplateRepository( db, new TemplateAuditTrail(db), @@ -71,6 +82,10 @@ async function main() { const shutdown = async () => { logger.info('Shutting down services...'); + if (cleanupService) { + await cleanupService.stop(); + } + if (scheduler) { await scheduler.stop(); } diff --git a/listener/src/services/cleanup-service.test.ts b/listener/src/services/cleanup-service.test.ts new file mode 100644 index 0000000..d320f90 --- /dev/null +++ b/listener/src/services/cleanup-service.test.ts @@ -0,0 +1,142 @@ +import { CleanupService } from './cleanup-service'; +import { EventRegistry } from '../store/event-registry'; + +jest.mock('../utils/logger', () => ({ + __esModule: true, + default: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() }, +})); + +// ── helpers ──────────────────────────────────────────────────────────────── + +function makeDb(changes = 0) { + return { + run: jest.fn().mockResolvedValue({ lastID: 0, changes }), + get: jest.fn().mockResolvedValue({ count: changes }), + } as unknown as import('../database/database').Database; +} + +function addEvent(registry: EventRegistry, overrides: Partial<{ receivedAt: number }> = {}) { + const event = registry.addFromInput({ + eventId: `evt-${Math.random()}`, + contractAddress: 'CABC', + eventName: 'TestEvent', + ledger: 1, + type: 'contract', + topic: [], + value: { switch: () => ({ name: 'scvVoid' }), value: () => undefined } as any, + }); + if (overrides.receivedAt !== undefined) { + (event as any).receivedAt = overrides.receivedAt; + } + return event; +} + +// ── EventRegistry TTL tests ──────────────────────────────────────────────── + +describe('EventRegistry TTL cleanup', () => { + it('pruneExpired removes events older than ttlMs', () => { + const registry = new EventRegistry(100, 1000); + addEvent(registry, { receivedAt: Date.now() - 2000 }); // expired + addEvent(registry, { receivedAt: Date.now() - 500 }); // fresh + + const removed = registry.pruneExpired(); + + expect(removed).toBe(1); + expect(registry.count()).toBe(1); + }); + + it('pruneExpired keeps all events when none are expired', () => { + const registry = new EventRegistry(100, 10_000); + addEvent(registry); + addEvent(registry); + + expect(registry.pruneExpired()).toBe(0); + expect(registry.count()).toBe(2); + }); + + it('startCleanup and stopCleanup control the interval', () => { + jest.useFakeTimers(); + const registry = new EventRegistry(100, 500); + const spy = jest.spyOn(registry, 'pruneExpired'); + + registry.startCleanup(200); + jest.advanceTimersByTime(600); + expect(spy).toHaveBeenCalledTimes(3); + + registry.stopCleanup(); + jest.advanceTimersByTime(600); + expect(spy).toHaveBeenCalledTimes(3); // no more calls + + jest.useRealTimers(); + }); +}); + +// ── CleanupService DB tests ──────────────────────────────────────────────── + +describe('CleanupService', () => { + beforeEach(() => jest.useFakeTimers()); + afterEach(() => jest.useRealTimers()); + + it('starts and fires db cleanup on interval', async () => { + const db = makeDb(3); + const registry = new EventRegistry(); + const service = new CleanupService(db, registry, { intervalMs: 500, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 1000 }); + + service.start(); + jest.advanceTimersByTime(600); + + // Allow pending promises to resolve + await Promise.resolve(); + + expect(db.run).toHaveBeenCalled(); + }); + + it('runDbCleanup deletes old notifications and rate_limit_events', async () => { + const db = makeDb(5); + const registry = new EventRegistry(); + const service = new CleanupService(db, registry, { intervalMs: 60000, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 500 }); + + const result = await service.runDbCleanup(); + + // Two DELETE calls: scheduled_notifications + rate_limit_events + expect(db.run).toHaveBeenCalledTimes(2); + expect(db.run).toHaveBeenCalledWith( + expect.stringContaining('DELETE FROM scheduled_notifications'), + expect.any(Array), + ); + expect(db.run).toHaveBeenCalledWith( + expect.stringContaining('DELETE FROM rate_limit_events'), + expect.any(Array), + ); + expect(result.notifications).toBe(5); + expect(result.rateLimitEvents).toBe(5); + }); + + it('stop clears the interval and stops registry cleanup', async () => { + const db = makeDb(); + const registry = new EventRegistry(); + const stopRegistrySpy = jest.spyOn(registry, 'stopCleanup'); + const service = new CleanupService(db, registry, { intervalMs: 200, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 1000 }); + + service.start(); + await service.stop(); + + jest.advanceTimersByTime(1000); + // db.run should not be called after stop (interval cleared) + expect(db.run).not.toHaveBeenCalled(); + expect(stopRegistrySpy).toHaveBeenCalled(); + }); + + it('calling start twice does not create duplicate intervals', () => { + const db = makeDb(); + const registry = new EventRegistry(); + const service = new CleanupService(db, registry, { intervalMs: 300, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 1000 }); + + service.start(); + service.start(); // second call is a no-op + + jest.advanceTimersByTime(400); + // Should only fire once despite two start() calls + expect(db.run).toHaveBeenCalledTimes(2); // one interval tick × 2 SQL statements + }); +}); diff --git a/listener/src/services/cleanup-service.ts b/listener/src/services/cleanup-service.ts new file mode 100644 index 0000000..16bbc3d --- /dev/null +++ b/listener/src/services/cleanup-service.ts @@ -0,0 +1,79 @@ +import { Database } from '../database/database'; +import logger from '../utils/logger'; +import { EventRegistry } from '../store/event-registry'; + +export interface CleanupConfig { + /** How often to run cleanup (ms). Default: 1 hour. */ + intervalMs: number; + /** Retain completed/failed notifications for this long (ms). Default: 7 days. */ + notificationRetentionMs: number; + /** Retain rate-limit audit events for this long (ms). Default: 24 hours. */ + rateLimitEventRetentionMs: number; +} + +const DEFAULTS: CleanupConfig = { + intervalMs: 60 * 60 * 1000, + notificationRetentionMs: 7 * 24 * 60 * 60 * 1000, + rateLimitEventRetentionMs: 24 * 60 * 60 * 1000, +}; + +export class CleanupService { + private timer: ReturnType | null = null; + private readonly config: CleanupConfig; + + constructor( + private readonly db: Database, + private readonly registry: EventRegistry, + config: Partial = {}, + ) { + this.config = { ...DEFAULTS, ...config }; + } + + start(): void { + if (this.timer) return; + this.registry.startCleanup(this.config.intervalMs); + this.timer = setInterval(() => void this.runDbCleanup(), this.config.intervalMs); + logger.info('CleanupService started', this.config as unknown as Record); + } + + async stop(): Promise { + if (this.timer) { + clearInterval(this.timer); + this.timer = null; + } + this.registry.stopCleanup(); + logger.info('CleanupService stopped'); + } + + async runDbCleanup(): Promise<{ notifications: number; executionLogs: number; rateLimitEvents: number }> { + const notificationCutoff = new Date(Date.now() - this.config.notificationRetentionMs).toISOString(); + const rateLimitCutoff = new Date(Date.now() - this.config.rateLimitEventRetentionMs).toISOString(); + + const [notifResult, rateLimitResult] = await Promise.all([ + this.db.run( + `DELETE FROM scheduled_notifications + WHERE status IN ('COMPLETED','FAILED','CANCELLED') + AND processing_completed_at < ?`, + [notificationCutoff], + ), + this.db.run( + `DELETE FROM rate_limit_events WHERE timestamp < ?`, + [rateLimitCutoff], + ), + ]); + + // execution_log rows are cascade-deleted with their parent; count separately for metrics + const logResult = await this.db.get<{ count: number }>( + `SELECT changes() as count`, + ); + + const result = { + notifications: notifResult.changes, + executionLogs: 0, // removed via ON DELETE CASCADE + rateLimitEvents: rateLimitResult.changes, + }; + + logger.info('DB cleanup completed', result); + return result; + } +} diff --git a/listener/src/store/event-registry.ts b/listener/src/store/event-registry.ts index a3b47c1..1595155 100644 --- a/listener/src/store/event-registry.ts +++ b/listener/src/store/event-registry.ts @@ -4,13 +4,40 @@ import { formatScValArray, formatScValValue } from '../utils/scval-format'; import logger from '../utils/logger'; const DEFAULT_MAX_EVENTS = 10000; +const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000; // 24 hours export class EventRegistry { private events: DisplayEvent[] = []; private readonly maxEvents: number; + private readonly ttlMs: number; + private cleanupTimer: ReturnType | null = null; - constructor(maxEvents = DEFAULT_MAX_EVENTS) { + constructor(maxEvents = DEFAULT_MAX_EVENTS, ttlMs = DEFAULT_TTL_MS) { this.maxEvents = maxEvents; + this.ttlMs = ttlMs; + } + + startCleanup(intervalMs = 60_000): void { + if (this.cleanupTimer) return; + this.cleanupTimer = setInterval(() => this.pruneExpired(), intervalMs); + } + + stopCleanup(): void { + if (this.cleanupTimer) { + clearInterval(this.cleanupTimer); + this.cleanupTimer = null; + } + } + + pruneExpired(): number { + const cutoff = Date.now() - this.ttlMs; + const before = this.events.length; + this.events = this.events.filter((e) => e.receivedAt >= cutoff); + const removed = before - this.events.length; + if (removed > 0) { + logger.info('Pruned expired events from registry', { removed, remaining: this.events.length }); + } + return removed; } addFromInput(input: RegistryEventInput): DisplayEvent { diff --git a/listener/src/types/index.ts b/listener/src/types/index.ts index 2d10e7f..d9d29b6 100644 --- a/listener/src/types/index.ts +++ b/listener/src/types/index.ts @@ -45,6 +45,7 @@ export interface Config { scheduler?: SchedulerConfig; databasePath?: string; rateLimit?: RateLimitConfig; + cleanup?: AppCleanupConfig; } export interface SchedulerConfig { @@ -56,3 +57,14 @@ export interface SchedulerConfig { timingBufferMs: number; } +export interface AppCleanupConfig { + /** How often to run cleanup jobs (ms). */ + intervalMs: number; + /** Retain completed/failed/cancelled notifications for this long (ms). */ + notificationRetentionMs: number; + /** Retain rate-limit audit rows for this long (ms). */ + rateLimitEventRetentionMs: number; + /** Retain in-memory events for this long (ms). */ + eventRetentionMs: number; +} +