diff --git a/listener/src/database/database.ts b/listener/src/database/database.ts index 0d5682b..6ae3b94 100644 --- a/listener/src/database/database.ts +++ b/listener/src/database/database.ts @@ -80,18 +80,11 @@ export class Database { } const schema = fs.readFileSync(schemaPath, 'utf-8'); - - // Split by semicolon and execute each statement - const statements = schema - .split(';') - .map(s => s.trim()) - .filter(s => s.length > 0); - - for (const statement of statements) { - await this.run(statement); - } - logger.info('Database migrations completed', { statements: statements.length }); + // Execute the schema as one script so trigger bodies with semicolons work. + await this.exec(schema); + + logger.info('Database migrations completed'); } /** @@ -148,6 +141,24 @@ export class Database { }); } + /** + * Execute a SQL script that may contain multiple statements. + */ + async exec(sql: string): Promise { + if (!this.db) throw new Error('Database not initialized'); + + return new Promise((resolve, reject) => { + this.db!.exec(sql, (err) => { + if (err) { + logger.error('Database exec error', { sql, error: err }); + reject(err); + } else { + resolve(); + } + }); + }); + } + /** * Execute multiple statements in a transaction */ diff --git a/listener/src/services/event-subscriber.test.ts b/listener/src/services/event-subscriber.test.ts index 8e88798..c757598 100644 --- a/listener/src/services/event-subscriber.test.ts +++ b/listener/src/services/event-subscriber.test.ts @@ -100,6 +100,23 @@ describe('EventSubscriber', () => { await jest.advanceTimersByTimeAsync(0); await subscriber.stop(); }); + + it('does not start a second poll loop when start is called twice', async () => { + jest.useFakeTimers(); + mockGetEvents.mockResolvedValue({ events: [], cursor: '' }); + + const subscriber = new EventSubscriber(testConfig); + await subscriber.start(); + await subscriber.start(); + + await Promise.resolve(); + await Promise.resolve(); + + expect(mockGetEvents).toHaveBeenCalledTimes(1); + expect(mockLogger.warn).toHaveBeenCalledWith('Event subscriber already running'); + + await subscriber.stop(); + }); }); describe('successful event processing', () => { diff --git a/listener/src/services/event-subscriber.ts b/listener/src/services/event-subscriber.ts index 036a377..2056163 100644 --- a/listener/src/services/event-subscriber.ts +++ b/listener/src/services/event-subscriber.ts @@ -34,6 +34,11 @@ export class EventSubscriber { } async start(): Promise { + if (this.isRunning) { + logger.warn('Event subscriber already running'); + return; + } + this.isRunning = true; logger.info('Starting event subscriber service'); this.retryQueue?.start(); diff --git a/listener/src/services/notification-retry-queue.test.ts b/listener/src/services/notification-retry-queue.test.ts index d495bca..27f7779 100644 --- a/listener/src/services/notification-retry-queue.test.ts +++ b/listener/src/services/notification-retry-queue.test.ts @@ -63,6 +63,25 @@ describe('NotificationRetryQueue', () => { expect.objectContaining({ eventId: 'evt-q', requestId: 'req-1' }) ); }); + + it('skips duplicate retry queue entries for the same event', () => { + const logger = jest.requireMock('../utils/logger').default; + const notificationFn: NotificationFn = jest.fn(); + const queue = new NotificationRetryQueue(notificationFn, { baseDelayMs: 1000 }); + const event = createMockEvent({ id: 'evt-dup' }); + + queue.enqueue(event, mockContractConfig, 'req-1'); + queue.enqueue(event, mockContractConfig, 'req-2'); + + expect(queue.size()).toBe(1); + expect(logger.info).toHaveBeenCalledWith( + 'Skipping duplicate retry queue entry', + expect.objectContaining({ + eventId: 'evt-dup', + contractAddress: mockContractConfig.address, + }) + ); + }); }); describe('processQueue', () => { diff --git a/listener/src/services/notification-retry-queue.ts b/listener/src/services/notification-retry-queue.ts index f6c8c37..d669dc3 100644 --- a/listener/src/services/notification-retry-queue.ts +++ b/listener/src/services/notification-retry-queue.ts @@ -1,6 +1,7 @@ import * as StellarSDK from '@stellar/stellar-sdk'; import { ContractConfig } from '../types'; import logger from '../utils/logger'; +import { getEventName } from '../utils/event-utils'; export interface RetryQueueOptions { baseDelayMs?: number; @@ -30,6 +31,7 @@ export type NotificationFn = ( export class NotificationRetryQueue { private queue: RetryItem[] = []; + private readonly queuedFingerprints: Set = new Set(); private readonly baseDelayMs: number; private readonly maxRetries: number; private readonly processIntervalMs: number; @@ -48,6 +50,18 @@ export class NotificationRetryQueue { contractConfig: ContractConfig, requestId?: string ): void { + const fingerprint = buildRetryFingerprint(event, contractConfig.address); + + if (this.queuedFingerprints.has(fingerprint)) { + logger.info('Skipping duplicate retry queue entry', { + requestId, + eventId: event.id, + contractAddress: contractConfig.address, + fingerprint, + }); + return; + } + const delayMs = this.calculateDelay(0); const nextRetryAt = Date.now() + delayMs; @@ -60,6 +74,7 @@ export class NotificationRetryQueue { maxRetries: this.maxRetries, }); + this.queuedFingerprints.add(fingerprint); this.queue.push({ event, contractConfig, retryCount: 0, nextRetryAt, requestId }); } @@ -95,6 +110,7 @@ export class NotificationRetryQueue { private async retryItem(item: RetryItem): Promise { const attempt = item.retryCount + 1; + const fingerprint = buildRetryFingerprint(item.event, item.contractConfig.address); logger.info('Retrying failed notification', { requestId: item.requestId, @@ -107,6 +123,7 @@ export class NotificationRetryQueue { const success = await this.notificationFn(item.event, item.contractConfig, item.requestId); if (success) { + this.queuedFingerprints.delete(fingerprint); logger.info('Retry succeeded', { requestId: item.requestId, eventId: item.event.id, @@ -117,6 +134,7 @@ export class NotificationRetryQueue { } if (attempt >= this.maxRetries) { + this.queuedFingerprints.delete(fingerprint); logger.error('Notification permanently failed after max retries', { requestId: item.requestId, eventId: item.event.id, @@ -145,3 +163,12 @@ export class NotificationRetryQueue { return this.baseDelayMs * Math.pow(2, retryCount); } } + +function buildRetryFingerprint( + event: StellarSDK.rpc.Api.EventResponse, + contractAddress: string +): string { + const eventName = + getEventName(event.topic) ?? event.topic.map((entry) => entry.toString()).join('|'); + return `${contractAddress}:${event.id}:${eventName}:${event.txHash ?? ''}`; +} diff --git a/listener/src/services/notification-scheduler.ts b/listener/src/services/notification-scheduler.ts index 57de13c..99157d2 100644 --- a/listener/src/services/notification-scheduler.ts +++ b/listener/src/services/notification-scheduler.ts @@ -23,7 +23,7 @@ export class NotificationScheduler { constructor( repository: ScheduledNotificationRepository, config: SchedulerConfig, - discordService?: DiscordNotificationService + discordService?: DiscordNotificationService | null ) { this.repository = repository; this.config = config; diff --git a/listener/src/types/scheduled-notification.ts b/listener/src/types/scheduled-notification.ts index bb2b0ab..f564afc 100644 --- a/listener/src/types/scheduled-notification.ts +++ b/listener/src/types/scheduled-notification.ts @@ -90,7 +90,7 @@ export interface SchedulerConfig { enabled: boolean; pollIntervalMs: number; lockTimeoutMs: number; - processorId: string; + processorId?: string; batchSize: number; timingBufferMs: number; }