Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions listener/src/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ describe('Config validation', () => {
delete process.env.RETRY_MAX_RETRIES;
delete process.env.DISCORD_WEBHOOK_URL;
delete process.env.DISCORD_WEBHOOK_ID;
delete process.env.NOTIFICATION_DEDUPLICATION_WINDOW_MS;
delete process.env.NOTIFICATION_DEDUPLICATION_MAX_SIZE;

const config = loadConfig();

Expand All @@ -74,4 +76,20 @@ describe('Config validation', () => {
},
});
});

it('loads notification deduplication settings when Discord is configured', () => {
process.env.DISCORD_WEBHOOK_URL = 'https://discord.com/api/webhooks/123/abc';
process.env.DISCORD_WEBHOOK_ID = '123';
process.env.NOTIFICATION_DEDUPLICATION_WINDOW_MS = '15000';
process.env.NOTIFICATION_DEDUPLICATION_MAX_SIZE = '250';

const config = loadConfig();

expect(config.discord).toMatchObject({
webhookUrl: 'https://discord.com/api/webhooks/123/abc',
webhookId: '123',
deduplicationWindowMs: 15000,
deduplicationMaxSize: 250,
});
});
});
8 changes: 7 additions & 1 deletion listener/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,12 @@ function loadDiscordConfig(): DiscordConfig | undefined {
throw new ConfigError('DISCORD_WEBHOOK_ID is required when DISCORD_WEBHOOK_URL is provided.');
}

return { webhookUrl, webhookId };
return {
webhookUrl,
webhookId,
deduplicationWindowMs: parseIntegerEnv('NOTIFICATION_DEDUPLICATION_WINDOW_MS', '60000'),
deduplicationMaxSize: parseIntegerEnv('NOTIFICATION_DEDUPLICATION_MAX_SIZE', '10000'),
};
}

export function loadConfig(): Config {
Expand All @@ -101,3 +106,4 @@ export function loadConfig(): Config {
},
};
}

28 changes: 26 additions & 2 deletions listener/src/services/discord-notification.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ describe('DiscordNotificationService', () => {
const secondResult = await service.sendEventNotification(mockEvent, mockContractConfig);

expect(mockFetch).toHaveBeenCalledTimes(1);
expect(secondResult).toBe(false);
expect(secondResult).toBe(true);
expect(service.getDeduplicationMetrics()).toEqual(
expect.objectContaining({ skippedDuplicates: 1, cacheSize: 1 })
);
});

it('logs a duplicate detection event', async () => {
Expand All @@ -129,6 +132,27 @@ describe('DiscordNotificationService', () => {
);
});


it('allows the same notification request after the configured window expires', async () => {
mockFetch.mockResolvedValue({ ok: true });
let now = 1000;
const deduplicator = new NotificationDeduplicator({ windowMs: 500, now: () => now });
const service = new DiscordNotificationService(mockConfig, deduplicator);
const mockEvent = createMockEvent({ id: 'event-windowed' });
const mockContractConfig = { address: 'CA123', events: ['test'] };

await service.sendEventNotification(mockEvent, mockContractConfig);
await service.sendEventNotification(mockEvent, mockContractConfig);
now = 1501;
const result = await service.sendEventNotification(mockEvent, mockContractConfig);

expect(result).toBe(true);
expect(mockFetch).toHaveBeenCalledTimes(2);
expect(service.getDeduplicationMetrics()).toEqual(
expect.objectContaining({ acceptedRequests: 2, skippedDuplicates: 1 })
);
});

it('allows the same event id on a different contract through', async () => {
mockFetch.mockResolvedValue({ ok: true });

Expand Down Expand Up @@ -253,4 +277,4 @@ describe('DiscordNotificationService', () => {
expect(valueField.value).toContain('my_symbol');
});
});
});
});
16 changes: 14 additions & 2 deletions listener/src/services/discord-notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ export class DiscordNotificationService {

constructor(config: DiscordConfig, deduplicator?: NotificationDeduplicator) {
this.config = config;
this.deduplicator = deduplicator ?? new NotificationDeduplicator();
this.deduplicator =
deduplicator ??
new NotificationDeduplicator({
windowMs: config.deduplicationWindowMs,
maxSize: config.deduplicationMaxSize,
});
}

async sendEventNotification(
Expand All @@ -43,8 +48,9 @@ export class DiscordNotificationService {
eventId: event.id,
contractAddress: contractConfig.address,
fingerprint,
deduplication: this.deduplicator.getMetrics(),
});
return false;
return true;
}
const logContext = {
requestId,
Expand Down Expand Up @@ -82,6 +88,7 @@ export class DiscordNotificationService {
logger.info('Discord notification delivered', {
...logContext,
durationMs,
deduplication: this.deduplicator.getMetrics(),
});
return true;
} catch (error) {
Expand All @@ -94,6 +101,10 @@ export class DiscordNotificationService {
}
}

getDeduplicationMetrics() {
return this.deduplicator.getMetrics();
}

async sendTestMessage(requestId?: string): Promise<boolean> {
const message: DiscordMessage = {
embeds: [
Expand Down Expand Up @@ -235,3 +246,4 @@ export class DiscordNotificationService {
}
}
}

54 changes: 54 additions & 0 deletions listener/src/services/notification-deduplicator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,59 @@ describe('NotificationDeduplicator', () => {
});
});


describe('time window and metrics', () => {
it('allows the same fingerprint after the deduplication window expires', () => {
let now = 1000;
const d = new NotificationDeduplicator({ windowMs: 500, now: () => now });

d.markSent('CONTRACT:event-windowed');
expect(d.isDuplicate('CONTRACT:event-windowed')).toBe(true);

now = 1501;
expect(d.isDuplicate('CONTRACT:event-windowed')).toBe(false);
expect(d.size()).toBe(0);
});

it('reports skipped duplicate metrics accurately', () => {
const d = new NotificationDeduplicator({ windowMs: 1000 });

d.markSent('CONTRACT:event-1');
d.markSent('CONTRACT:event-2');
d.isDuplicate('CONTRACT:event-1');
d.isDuplicate('CONTRACT:event-1');
d.isDuplicate('CONTRACT:event-new');

expect(d.getMetrics()).toEqual({
acceptedRequests: 2,
skippedDuplicates: 2,
evictedEntries: 0,
expiredEntries: 0,
cacheSize: 2,
deduplicationWindowMs: 1000,
});
});

it('counts evicted and expired entries in metrics', () => {
let now = 0;
const d = new NotificationDeduplicator({ maxSize: 2, windowMs: 100, now: () => now });

d.markSent('fp-1');
d.markSent('fp-2');
d.markSent('fp-3');
now = 101;
d.size();

expect(d.getMetrics()).toEqual(
expect.objectContaining({
evictedEntries: 1,
expiredEntries: 2,
cacheSize: 0,
})
);
});
});

describe('size', () => {
it('returns 0 for a new deduplicator', () => {
expect(new NotificationDeduplicator().size()).toBe(0);
Expand Down Expand Up @@ -140,3 +193,4 @@ describe('NotificationDeduplicator', () => {
});
});
});

74 changes: 67 additions & 7 deletions listener/src/services/notification-deduplicator.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,97 @@
import logger from '../utils/logger';

const DEFAULT_MAX_SIZE = 10000;
const DEFAULT_WINDOW_MS = 60000;

export function generateFingerprint(eventId: string, contractAddress: string): string {
return `${contractAddress}:${eventId}`;
}

export interface NotificationDeduplicatorOptions {
maxSize?: number;
windowMs?: number;
now?: () => number;
}

export interface NotificationDeduplicationMetrics {
acceptedRequests: number;
skippedDuplicates: number;
evictedEntries: number;
expiredEntries: number;
cacheSize: number;
deduplicationWindowMs: number;
}

export class NotificationDeduplicator {
private readonly seen: Set<string>;
private readonly seen: Map<string, number>;
private readonly maxSize: number;
private readonly windowMs: number;
private readonly now: () => number;
private acceptedRequests = 0;
private skippedDuplicates = 0;
private evictedEntries = 0;
private expiredEntries = 0;

constructor(maxSize = DEFAULT_MAX_SIZE) {
this.seen = new Set();
this.maxSize = maxSize;
constructor(options: NotificationDeduplicatorOptions | number = {}) {
const normalizedOptions = typeof options === 'number' ? { maxSize: options } : options;
this.seen = new Map();
this.maxSize = Math.max(1, normalizedOptions.maxSize ?? DEFAULT_MAX_SIZE);
this.windowMs = Math.max(1, normalizedOptions.windowMs ?? DEFAULT_WINDOW_MS);
this.now = normalizedOptions.now ?? Date.now;
}

isDuplicate(fingerprint: string): boolean {
return this.seen.has(fingerprint);
this.pruneExpired();
const expiresAt = this.seen.get(fingerprint);
const duplicate = expiresAt !== undefined && expiresAt > this.now();

if (duplicate) {
this.skippedDuplicates++;
}

return duplicate;
}

markSent(fingerprint: string): void {
this.pruneExpired();
if (this.seen.size >= this.maxSize) {
const oldest = this.seen.values().next().value as string;
const oldest = this.seen.keys().next().value as string;
this.seen.delete(oldest);
this.evictedEntries++;
logger.warn('Notification deduplicator cache full, evicted oldest entry', {
evicted: oldest,
cacheSize: this.maxSize,
});
}
this.seen.add(fingerprint);
this.seen.set(fingerprint, this.now() + this.windowMs);
this.acceptedRequests++;
}

size(): number {
this.pruneExpired();
return this.seen.size;
}

getMetrics(): NotificationDeduplicationMetrics {
return {
acceptedRequests: this.acceptedRequests,
skippedDuplicates: this.skippedDuplicates,
evictedEntries: this.evictedEntries,
expiredEntries: this.expiredEntries,
cacheSize: this.size(),
deduplicationWindowMs: this.windowMs,
};
}

private pruneExpired(): void {
const now = this.now();
for (const [fingerprint, expiresAt] of this.seen) {
if (expiresAt > now) {
continue;
}

this.seen.delete(fingerprint);
this.expiredEntries++;
}
}
}
3 changes: 3 additions & 0 deletions listener/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export interface ContractConfig {
export interface DiscordConfig {
webhookUrl: string;
webhookId: string;
deduplicationWindowMs?: number;
deduplicationMaxSize?: number;
}

export interface RetryQueueConfig {
Expand All @@ -25,3 +27,4 @@ export interface Config {
discord?: DiscordConfig;
retryQueue?: RetryQueueConfig;
}

Loading