diff --git a/listener/src/config.test.ts b/listener/src/config.test.ts index 7b7ec70..3ffb85b 100644 --- a/listener/src/config.test.ts +++ b/listener/src/config.test.ts @@ -56,6 +56,8 @@ describe('Config validation', () => { delete process.env.RETRY_MAX_RETRIES; delete process.env.DISCORD_WEBHOOK_URL; delete process.env.DISCORD_WEBHOOK_ID; + delete process.env.NOTIFICATION_DEDUPLICATION_WINDOW_MS; + delete process.env.NOTIFICATION_DEDUPLICATION_MAX_SIZE; const config = loadConfig(); @@ -74,4 +76,20 @@ describe('Config validation', () => { }, }); }); + + it('loads notification deduplication settings when Discord is configured', () => { + process.env.DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/123/abc'; + process.env.DISCORD_WEBHOOK_ID = '123'; + process.env.NOTIFICATION_DEDUPLICATION_WINDOW_MS = '15000'; + process.env.NOTIFICATION_DEDUPLICATION_MAX_SIZE = '250'; + + const config = loadConfig(); + + expect(config.discord).toMatchObject({ + webhookUrl: 'https://discord.com/api/webhooks/123/abc', + webhookId: '123', + deduplicationWindowMs: 15000, + deduplicationMaxSize: 250, + }); + }); }); diff --git a/listener/src/config.ts b/listener/src/config.ts index 455168c..103d830 100644 --- a/listener/src/config.ts +++ b/listener/src/config.ts @@ -77,7 +77,12 @@ function loadDiscordConfig(): DiscordConfig | undefined { throw new ConfigError('DISCORD_WEBHOOK_ID is required when DISCORD_WEBHOOK_URL is provided.'); } - return { webhookUrl, webhookId }; + return { + webhookUrl, + webhookId, + deduplicationWindowMs: parseIntegerEnv('NOTIFICATION_DEDUPLICATION_WINDOW_MS', '60000'), + deduplicationMaxSize: parseIntegerEnv('NOTIFICATION_DEDUPLICATION_MAX_SIZE', '10000'), + }; } export function loadConfig(): Config { @@ -101,3 +106,4 @@ export function loadConfig(): Config { }, }; } + diff --git a/listener/src/services/discord-notification.test.ts b/listener/src/services/discord-notification.test.ts index c866716..2746792 100644 --- a/listener/src/services/discord-notification.test.ts +++ b/listener/src/services/discord-notification.test.ts @@ -106,7 +106,10 @@ describe('DiscordNotificationService', () => { const secondResult = await service.sendEventNotification(mockEvent, mockContractConfig); expect(mockFetch).toHaveBeenCalledTimes(1); - expect(secondResult).toBe(false); + expect(secondResult).toBe(true); + expect(service.getDeduplicationMetrics()).toEqual( + expect.objectContaining({ skippedDuplicates: 1, cacheSize: 1 }) + ); }); it('logs a duplicate detection event', async () => { @@ -129,6 +132,27 @@ describe('DiscordNotificationService', () => { ); }); + + it('allows the same notification request after the configured window expires', async () => { + mockFetch.mockResolvedValue({ ok: true }); + let now = 1000; + const deduplicator = new NotificationDeduplicator({ windowMs: 500, now: () => now }); + const service = new DiscordNotificationService(mockConfig, deduplicator); + const mockEvent = createMockEvent({ id: 'event-windowed' }); + const mockContractConfig = { address: 'CA123', events: ['test'] }; + + await service.sendEventNotification(mockEvent, mockContractConfig); + await service.sendEventNotification(mockEvent, mockContractConfig); + now = 1501; + const result = await service.sendEventNotification(mockEvent, mockContractConfig); + + expect(result).toBe(true); + expect(mockFetch).toHaveBeenCalledTimes(2); + expect(service.getDeduplicationMetrics()).toEqual( + expect.objectContaining({ acceptedRequests: 2, skippedDuplicates: 1 }) + ); + }); + it('allows the same event id on a different contract through', async () => { mockFetch.mockResolvedValue({ ok: true }); @@ -253,4 +277,4 @@ describe('DiscordNotificationService', () => { expect(valueField.value).toContain('my_symbol'); }); }); -}); \ No newline at end of file +}); diff --git a/listener/src/services/discord-notification.ts b/listener/src/services/discord-notification.ts index 4af1f40..2c764ff 100644 --- a/listener/src/services/discord-notification.ts +++ b/listener/src/services/discord-notification.ts @@ -28,7 +28,12 @@ export class DiscordNotificationService { constructor(config: DiscordConfig, deduplicator?: NotificationDeduplicator) { this.config = config; - this.deduplicator = deduplicator ?? new NotificationDeduplicator(); + this.deduplicator = + deduplicator ?? + new NotificationDeduplicator({ + windowMs: config.deduplicationWindowMs, + maxSize: config.deduplicationMaxSize, + }); } async sendEventNotification( @@ -43,8 +48,9 @@ export class DiscordNotificationService { eventId: event.id, contractAddress: contractConfig.address, fingerprint, + deduplication: this.deduplicator.getMetrics(), }); - return false; + return true; } const logContext = { requestId, @@ -82,6 +88,7 @@ export class DiscordNotificationService { logger.info('Discord notification delivered', { ...logContext, durationMs, + deduplication: this.deduplicator.getMetrics(), }); return true; } catch (error) { @@ -94,6 +101,10 @@ export class DiscordNotificationService { } } + getDeduplicationMetrics() { + return this.deduplicator.getMetrics(); + } + async sendTestMessage(requestId?: string): Promise { const message: DiscordMessage = { embeds: [ @@ -235,3 +246,4 @@ export class DiscordNotificationService { } } } + diff --git a/listener/src/services/notification-deduplicator.test.ts b/listener/src/services/notification-deduplicator.test.ts index 92fbae1..31a1fdd 100644 --- a/listener/src/services/notification-deduplicator.test.ts +++ b/listener/src/services/notification-deduplicator.test.ts @@ -100,6 +100,59 @@ describe('NotificationDeduplicator', () => { }); }); + + describe('time window and metrics', () => { + it('allows the same fingerprint after the deduplication window expires', () => { + let now = 1000; + const d = new NotificationDeduplicator({ windowMs: 500, now: () => now }); + + d.markSent('CONTRACT:event-windowed'); + expect(d.isDuplicate('CONTRACT:event-windowed')).toBe(true); + + now = 1501; + expect(d.isDuplicate('CONTRACT:event-windowed')).toBe(false); + expect(d.size()).toBe(0); + }); + + it('reports skipped duplicate metrics accurately', () => { + const d = new NotificationDeduplicator({ windowMs: 1000 }); + + d.markSent('CONTRACT:event-1'); + d.markSent('CONTRACT:event-2'); + d.isDuplicate('CONTRACT:event-1'); + d.isDuplicate('CONTRACT:event-1'); + d.isDuplicate('CONTRACT:event-new'); + + expect(d.getMetrics()).toEqual({ + acceptedRequests: 2, + skippedDuplicates: 2, + evictedEntries: 0, + expiredEntries: 0, + cacheSize: 2, + deduplicationWindowMs: 1000, + }); + }); + + it('counts evicted and expired entries in metrics', () => { + let now = 0; + const d = new NotificationDeduplicator({ maxSize: 2, windowMs: 100, now: () => now }); + + d.markSent('fp-1'); + d.markSent('fp-2'); + d.markSent('fp-3'); + now = 101; + d.size(); + + expect(d.getMetrics()).toEqual( + expect.objectContaining({ + evictedEntries: 1, + expiredEntries: 2, + cacheSize: 0, + }) + ); + }); + }); + describe('size', () => { it('returns 0 for a new deduplicator', () => { expect(new NotificationDeduplicator().size()).toBe(0); @@ -140,3 +193,4 @@ describe('NotificationDeduplicator', () => { }); }); }); + diff --git a/listener/src/services/notification-deduplicator.ts b/listener/src/services/notification-deduplicator.ts index 141849b..8373d0b 100644 --- a/listener/src/services/notification-deduplicator.ts +++ b/listener/src/services/notification-deduplicator.ts @@ -1,37 +1,97 @@ import logger from '../utils/logger'; const DEFAULT_MAX_SIZE = 10000; +const DEFAULT_WINDOW_MS = 60000; export function generateFingerprint(eventId: string, contractAddress: string): string { return `${contractAddress}:${eventId}`; } +export interface NotificationDeduplicatorOptions { + maxSize?: number; + windowMs?: number; + now?: () => number; +} + +export interface NotificationDeduplicationMetrics { + acceptedRequests: number; + skippedDuplicates: number; + evictedEntries: number; + expiredEntries: number; + cacheSize: number; + deduplicationWindowMs: number; +} + export class NotificationDeduplicator { - private readonly seen: Set; + private readonly seen: Map; private readonly maxSize: number; + private readonly windowMs: number; + private readonly now: () => number; + private acceptedRequests = 0; + private skippedDuplicates = 0; + private evictedEntries = 0; + private expiredEntries = 0; - constructor(maxSize = DEFAULT_MAX_SIZE) { - this.seen = new Set(); - this.maxSize = maxSize; + constructor(options: NotificationDeduplicatorOptions | number = {}) { + const normalizedOptions = typeof options === 'number' ? { maxSize: options } : options; + this.seen = new Map(); + this.maxSize = Math.max(1, normalizedOptions.maxSize ?? DEFAULT_MAX_SIZE); + this.windowMs = Math.max(1, normalizedOptions.windowMs ?? DEFAULT_WINDOW_MS); + this.now = normalizedOptions.now ?? Date.now; } isDuplicate(fingerprint: string): boolean { - return this.seen.has(fingerprint); + this.pruneExpired(); + const expiresAt = this.seen.get(fingerprint); + const duplicate = expiresAt !== undefined && expiresAt > this.now(); + + if (duplicate) { + this.skippedDuplicates++; + } + + return duplicate; } markSent(fingerprint: string): void { + this.pruneExpired(); if (this.seen.size >= this.maxSize) { - const oldest = this.seen.values().next().value as string; + const oldest = this.seen.keys().next().value as string; this.seen.delete(oldest); + this.evictedEntries++; logger.warn('Notification deduplicator cache full, evicted oldest entry', { evicted: oldest, cacheSize: this.maxSize, }); } - this.seen.add(fingerprint); + this.seen.set(fingerprint, this.now() + this.windowMs); + this.acceptedRequests++; } size(): number { + this.pruneExpired(); return this.seen.size; } + + getMetrics(): NotificationDeduplicationMetrics { + return { + acceptedRequests: this.acceptedRequests, + skippedDuplicates: this.skippedDuplicates, + evictedEntries: this.evictedEntries, + expiredEntries: this.expiredEntries, + cacheSize: this.size(), + deduplicationWindowMs: this.windowMs, + }; + } + + private pruneExpired(): void { + const now = this.now(); + for (const [fingerprint, expiresAt] of this.seen) { + if (expiresAt > now) { + continue; + } + + this.seen.delete(fingerprint); + this.expiredEntries++; + } + } } diff --git a/listener/src/types/index.ts b/listener/src/types/index.ts index 107ae76..027b6c7 100644 --- a/listener/src/types/index.ts +++ b/listener/src/types/index.ts @@ -6,6 +6,8 @@ export interface ContractConfig { export interface DiscordConfig { webhookUrl: string; webhookId: string; + deduplicationWindowMs?: number; + deduplicationMaxSize?: number; } export interface RetryQueueConfig { @@ -25,3 +27,4 @@ export interface Config { discord?: DiscordConfig; retryQueue?: RetryQueueConfig; } +