Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion listener/src/config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Config, ContractConfig, DiscordConfig, WebhookSecret } from './types';
import { Config, ContractConfig, DiscordConfig, WebhookSecret, AppCleanupConfig } from './types';

export class ConfigError extends Error {
constructor(message: string) {
Expand Down Expand Up @@ -112,6 +112,15 @@ function validateWebhookSecrets(value: unknown): WebhookSecret[] {
});
}

function loadCleanupConfig(): AppCleanupConfig {
return {
intervalMs: parseIntegerEnv('CLEANUP_INTERVAL_MS', String(60 * 60 * 1000)),
notificationRetentionMs: parseIntegerEnv('NOTIFICATION_RETENTION_MS', String(7 * 24 * 60 * 60 * 1000)),
rateLimitEventRetentionMs: parseIntegerEnv('RATE_LIMIT_EVENT_RETENTION_MS', String(24 * 60 * 60 * 1000)),
eventRetentionMs: parseIntegerEnv('EVENT_RETENTION_MS', String(24 * 60 * 60 * 1000)),
};
}

export function loadConfig(): Config {
const discord = loadDiscordConfig();
const rawContractAddresses = parseJsonEnv<unknown>('CONTRACT_ADDRESSES', '[]');
Expand Down Expand Up @@ -152,6 +161,7 @@ export function loadConfig(): Config {
maxRequests: parseIntegerEnv('RATE_LIMIT_MAX_REQUESTS', '60'),
clientOverrides,
},
cleanup: loadCleanupConfig(),
};
}

15 changes: 15 additions & 0 deletions listener/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import { NotificationTemplateService } from './services/notification-template-se
import { TemplateAuditTrail } from './services/template-audit-trail';
import { getTemplateCache } from './services/notification-template-cache';
import { NotificationAPI } from './services/notification-api';
import { CleanupService } from './services/cleanup-service';
import { initializeDatabase } from './database/database';
import { DiscordNotificationService } from './services/discord-notification';
import { eventRegistry } from './store/event-registry';
import logger from './utils/logger';
import { loadConfig, ConfigError } from './config';

Expand All @@ -22,11 +24,20 @@ async function main() {
let scheduler: NotificationScheduler | null = null;
let notificationAPI: NotificationAPI | null = null;
let templateService: NotificationTemplateService | null = null;
let cleanupService: CleanupService | null = null;

try {
logger.info('Initializing database');
const db = await initializeDatabase(config.databasePath);

// Rebuild registry with configured event TTL
if (config.cleanup) {
eventRegistry['ttlMs'] = config.cleanup.eventRetentionMs;
}

cleanupService = new CleanupService(db, eventRegistry, config.cleanup);
cleanupService.start();

const templateRepository = new NotificationTemplateRepository(
db,
new TemplateAuditTrail(db),
Expand Down Expand Up @@ -71,6 +82,10 @@ async function main() {
const shutdown = async () => {
logger.info('Shutting down services...');

if (cleanupService) {
await cleanupService.stop();
}

if (scheduler) {
await scheduler.stop();
}
Expand Down
142 changes: 142 additions & 0 deletions listener/src/services/cleanup-service.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { CleanupService } from './cleanup-service';
import { EventRegistry } from '../store/event-registry';

jest.mock('../utils/logger', () => ({
__esModule: true,
default: { debug: jest.fn(), info: jest.fn(), warn: jest.fn(), error: jest.fn() },
}));

// ── helpers ────────────────────────────────────────────────────────────────

function makeDb(changes = 0) {
return {
run: jest.fn().mockResolvedValue({ lastID: 0, changes }),
get: jest.fn().mockResolvedValue({ count: changes }),
} as unknown as import('../database/database').Database;
}

function addEvent(registry: EventRegistry, overrides: Partial<{ receivedAt: number }> = {}) {
const event = registry.addFromInput({
eventId: `evt-${Math.random()}`,
contractAddress: 'CABC',
eventName: 'TestEvent',
ledger: 1,
type: 'contract',
topic: [],
value: { switch: () => ({ name: 'scvVoid' }), value: () => undefined } as any,
});
if (overrides.receivedAt !== undefined) {
(event as any).receivedAt = overrides.receivedAt;
}
return event;
}

// ── EventRegistry TTL tests ────────────────────────────────────────────────

describe('EventRegistry TTL cleanup', () => {
it('pruneExpired removes events older than ttlMs', () => {
const registry = new EventRegistry(100, 1000);
addEvent(registry, { receivedAt: Date.now() - 2000 }); // expired
addEvent(registry, { receivedAt: Date.now() - 500 }); // fresh

const removed = registry.pruneExpired();

expect(removed).toBe(1);
expect(registry.count()).toBe(1);
});

it('pruneExpired keeps all events when none are expired', () => {
const registry = new EventRegistry(100, 10_000);
addEvent(registry);
addEvent(registry);

expect(registry.pruneExpired()).toBe(0);
expect(registry.count()).toBe(2);
});

it('startCleanup and stopCleanup control the interval', () => {
jest.useFakeTimers();
const registry = new EventRegistry(100, 500);
const spy = jest.spyOn(registry, 'pruneExpired');

registry.startCleanup(200);
jest.advanceTimersByTime(600);
expect(spy).toHaveBeenCalledTimes(3);

registry.stopCleanup();
jest.advanceTimersByTime(600);
expect(spy).toHaveBeenCalledTimes(3); // no more calls

jest.useRealTimers();
});
});

// ── CleanupService DB tests ────────────────────────────────────────────────

describe('CleanupService', () => {
beforeEach(() => jest.useFakeTimers());
afterEach(() => jest.useRealTimers());

it('starts and fires db cleanup on interval', async () => {
const db = makeDb(3);
const registry = new EventRegistry();
const service = new CleanupService(db, registry, { intervalMs: 500, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 1000 });

service.start();
jest.advanceTimersByTime(600);

// Allow pending promises to resolve
await Promise.resolve();

expect(db.run).toHaveBeenCalled();
});

it('runDbCleanup deletes old notifications and rate_limit_events', async () => {
const db = makeDb(5);
const registry = new EventRegistry();
const service = new CleanupService(db, registry, { intervalMs: 60000, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 500 });

const result = await service.runDbCleanup();

// Two DELETE calls: scheduled_notifications + rate_limit_events
expect(db.run).toHaveBeenCalledTimes(2);
expect(db.run).toHaveBeenCalledWith(
expect.stringContaining('DELETE FROM scheduled_notifications'),
expect.any(Array),
);
expect(db.run).toHaveBeenCalledWith(
expect.stringContaining('DELETE FROM rate_limit_events'),
expect.any(Array),
);
expect(result.notifications).toBe(5);
expect(result.rateLimitEvents).toBe(5);
});

it('stop clears the interval and stops registry cleanup', async () => {
const db = makeDb();
const registry = new EventRegistry();
const stopRegistrySpy = jest.spyOn(registry, 'stopCleanup');
const service = new CleanupService(db, registry, { intervalMs: 200, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 1000 });

service.start();
await service.stop();

jest.advanceTimersByTime(1000);
// db.run should not be called after stop (interval cleared)
expect(db.run).not.toHaveBeenCalled();
expect(stopRegistrySpy).toHaveBeenCalled();
});

it('calling start twice does not create duplicate intervals', () => {
const db = makeDb();
const registry = new EventRegistry();
const service = new CleanupService(db, registry, { intervalMs: 300, notificationRetentionMs: 1000, rateLimitEventRetentionMs: 1000 });

service.start();
service.start(); // second call is a no-op

jest.advanceTimersByTime(400);
// Should only fire once despite two start() calls
expect(db.run).toHaveBeenCalledTimes(2); // one interval tick × 2 SQL statements
});
});
79 changes: 79 additions & 0 deletions listener/src/services/cleanup-service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { Database } from '../database/database';
import logger from '../utils/logger';
import { EventRegistry } from '../store/event-registry';

export interface CleanupConfig {
/** How often to run cleanup (ms). Default: 1 hour. */
intervalMs: number;
/** Retain completed/failed notifications for this long (ms). Default: 7 days. */
notificationRetentionMs: number;
/** Retain rate-limit audit events for this long (ms). Default: 24 hours. */
rateLimitEventRetentionMs: number;
}

const DEFAULTS: CleanupConfig = {
intervalMs: 60 * 60 * 1000,
notificationRetentionMs: 7 * 24 * 60 * 60 * 1000,
rateLimitEventRetentionMs: 24 * 60 * 60 * 1000,
};

export class CleanupService {
private timer: ReturnType<typeof setInterval> | null = null;
private readonly config: CleanupConfig;

constructor(
private readonly db: Database,
private readonly registry: EventRegistry,
config: Partial<CleanupConfig> = {},
) {
this.config = { ...DEFAULTS, ...config };
}

start(): void {
if (this.timer) return;
this.registry.startCleanup(this.config.intervalMs);
this.timer = setInterval(() => void this.runDbCleanup(), this.config.intervalMs);
logger.info('CleanupService started', this.config as unknown as Record<string, unknown>);
}

async stop(): Promise<void> {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
this.registry.stopCleanup();
logger.info('CleanupService stopped');
}

async runDbCleanup(): Promise<{ notifications: number; executionLogs: number; rateLimitEvents: number }> {
const notificationCutoff = new Date(Date.now() - this.config.notificationRetentionMs).toISOString();
const rateLimitCutoff = new Date(Date.now() - this.config.rateLimitEventRetentionMs).toISOString();

const [notifResult, rateLimitResult] = await Promise.all([
this.db.run(
`DELETE FROM scheduled_notifications
WHERE status IN ('COMPLETED','FAILED','CANCELLED')
AND processing_completed_at < ?`,
[notificationCutoff],
),
this.db.run(
`DELETE FROM rate_limit_events WHERE timestamp < ?`,
[rateLimitCutoff],
),
]);

// execution_log rows are cascade-deleted with their parent; count separately for metrics
const logResult = await this.db.get<{ count: number }>(
`SELECT changes() as count`,
);

const result = {
notifications: notifResult.changes,
executionLogs: 0, // removed via ON DELETE CASCADE
rateLimitEvents: rateLimitResult.changes,
};

logger.info('DB cleanup completed', result);
return result;
}
}
29 changes: 28 additions & 1 deletion listener/src/store/event-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,40 @@ import { formatScValArray, formatScValValue } from '../utils/scval-format';
import logger from '../utils/logger';

const DEFAULT_MAX_EVENTS = 10000;
const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000; // 24 hours

export class EventRegistry {
private events: DisplayEvent[] = [];
private readonly maxEvents: number;
private readonly ttlMs: number;
private cleanupTimer: ReturnType<typeof setInterval> | null = null;

constructor(maxEvents = DEFAULT_MAX_EVENTS) {
constructor(maxEvents = DEFAULT_MAX_EVENTS, ttlMs = DEFAULT_TTL_MS) {
this.maxEvents = maxEvents;
this.ttlMs = ttlMs;
}

startCleanup(intervalMs = 60_000): void {
if (this.cleanupTimer) return;
this.cleanupTimer = setInterval(() => this.pruneExpired(), intervalMs);
}

stopCleanup(): void {
if (this.cleanupTimer) {
clearInterval(this.cleanupTimer);
this.cleanupTimer = null;
}
}

pruneExpired(): number {
const cutoff = Date.now() - this.ttlMs;
const before = this.events.length;
this.events = this.events.filter((e) => e.receivedAt >= cutoff);
const removed = before - this.events.length;
if (removed > 0) {
logger.info('Pruned expired events from registry', { removed, remaining: this.events.length });
}
return removed;
}

addFromInput(input: RegistryEventInput): DisplayEvent {
Expand Down
12 changes: 12 additions & 0 deletions listener/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export interface Config {
scheduler?: SchedulerConfig;
databasePath?: string;
rateLimit?: RateLimitConfig;
cleanup?: AppCleanupConfig;
}

export interface SchedulerConfig {
Expand All @@ -56,3 +57,14 @@ export interface SchedulerConfig {
timingBufferMs: number;
}

export interface AppCleanupConfig {
/** How often to run cleanup jobs (ms). */
intervalMs: number;
/** Retain completed/failed/cancelled notifications for this long (ms). */
notificationRetentionMs: number;
/** Retain rate-limit audit rows for this long (ms). */
rateLimitEventRetentionMs: number;
/** Retain in-memory events for this long (ms). */
eventRetentionMs: number;
}

Loading