Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions listener/src/database/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,11 @@ export class Database {
}

const schema = fs.readFileSync(schemaPath, 'utf-8');

// Split by semicolon and execute each statement
const statements = schema
.split(';')
.map(s => s.trim())
.filter(s => s.length > 0);

for (const statement of statements) {
await this.run(statement);
}

logger.info('Database migrations completed', { statements: statements.length });
// Execute the schema as one script so trigger bodies with semicolons work.
await this.exec(schema);

logger.info('Database migrations completed');
}

/**
Expand Down Expand Up @@ -148,6 +141,24 @@ export class Database {
});
}

/**
* Execute a SQL script that may contain multiple statements.
*/
async exec(sql: string): Promise<void> {
if (!this.db) throw new Error('Database not initialized');

return new Promise((resolve, reject) => {
this.db!.exec(sql, (err) => {
if (err) {
logger.error('Database exec error', { sql, error: err });
reject(err);
} else {
resolve();
}
});
});
}

/**
* Execute multiple statements in a transaction
*/
Expand Down
17 changes: 17 additions & 0 deletions listener/src/services/event-subscriber.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,23 @@ describe('EventSubscriber', () => {
await jest.advanceTimersByTimeAsync(0);
await subscriber.stop();
});

it('does not start a second poll loop when start is called twice', async () => {
jest.useFakeTimers();
mockGetEvents.mockResolvedValue({ events: [], cursor: '' });

const subscriber = new EventSubscriber(testConfig);
await subscriber.start();
await subscriber.start();

await Promise.resolve();
await Promise.resolve();

expect(mockGetEvents).toHaveBeenCalledTimes(1);
expect(mockLogger.warn).toHaveBeenCalledWith('Event subscriber already running');

await subscriber.stop();
});
});

describe('successful event processing', () => {
Expand Down
5 changes: 5 additions & 0 deletions listener/src/services/event-subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ export class EventSubscriber {
}

async start(): Promise<void> {
if (this.isRunning) {
logger.warn('Event subscriber already running');
return;
}

this.isRunning = true;
logger.info('Starting event subscriber service');
this.retryQueue?.start();
Expand Down
19 changes: 19 additions & 0 deletions listener/src/services/notification-retry-queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,25 @@ describe('NotificationRetryQueue', () => {
expect.objectContaining({ eventId: 'evt-q', requestId: 'req-1' })
);
});

it('skips duplicate retry queue entries for the same event', () => {
const logger = jest.requireMock('../utils/logger').default;
const notificationFn: NotificationFn = jest.fn();
const queue = new NotificationRetryQueue(notificationFn, { baseDelayMs: 1000 });
const event = createMockEvent({ id: 'evt-dup' });

queue.enqueue(event, mockContractConfig, 'req-1');
queue.enqueue(event, mockContractConfig, 'req-2');

expect(queue.size()).toBe(1);
expect(logger.info).toHaveBeenCalledWith(
'Skipping duplicate retry queue entry',
expect.objectContaining({
eventId: 'evt-dup',
contractAddress: mockContractConfig.address,
})
);
});
});

describe('processQueue', () => {
Expand Down
27 changes: 27 additions & 0 deletions listener/src/services/notification-retry-queue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as StellarSDK from '@stellar/stellar-sdk';
import { ContractConfig } from '../types';
import logger from '../utils/logger';
import { getEventName } from '../utils/event-utils';

export interface RetryQueueOptions {
baseDelayMs?: number;
Expand Down Expand Up @@ -30,6 +31,7 @@ export type NotificationFn = (

export class NotificationRetryQueue {
private queue: RetryItem[] = [];
private readonly queuedFingerprints: Set<string> = new Set();
private readonly baseDelayMs: number;
private readonly maxRetries: number;
private readonly processIntervalMs: number;
Expand All @@ -48,6 +50,18 @@ export class NotificationRetryQueue {
contractConfig: ContractConfig,
requestId?: string
): void {
const fingerprint = buildRetryFingerprint(event, contractConfig.address);

if (this.queuedFingerprints.has(fingerprint)) {
logger.info('Skipping duplicate retry queue entry', {
requestId,
eventId: event.id,
contractAddress: contractConfig.address,
fingerprint,
});
return;
}

const delayMs = this.calculateDelay(0);
const nextRetryAt = Date.now() + delayMs;

Expand All @@ -60,6 +74,7 @@ export class NotificationRetryQueue {
maxRetries: this.maxRetries,
});

this.queuedFingerprints.add(fingerprint);
this.queue.push({ event, contractConfig, retryCount: 0, nextRetryAt, requestId });
}

Expand Down Expand Up @@ -95,6 +110,7 @@ export class NotificationRetryQueue {

private async retryItem(item: RetryItem): Promise<void> {
const attempt = item.retryCount + 1;
const fingerprint = buildRetryFingerprint(item.event, item.contractConfig.address);

logger.info('Retrying failed notification', {
requestId: item.requestId,
Expand All @@ -107,6 +123,7 @@ export class NotificationRetryQueue {
const success = await this.notificationFn(item.event, item.contractConfig, item.requestId);

if (success) {
this.queuedFingerprints.delete(fingerprint);
logger.info('Retry succeeded', {
requestId: item.requestId,
eventId: item.event.id,
Expand All @@ -117,6 +134,7 @@ export class NotificationRetryQueue {
}

if (attempt >= this.maxRetries) {
this.queuedFingerprints.delete(fingerprint);
logger.error('Notification permanently failed after max retries', {
requestId: item.requestId,
eventId: item.event.id,
Expand Down Expand Up @@ -145,3 +163,12 @@ export class NotificationRetryQueue {
return this.baseDelayMs * Math.pow(2, retryCount);
}
}

function buildRetryFingerprint(
event: StellarSDK.rpc.Api.EventResponse,
contractAddress: string
): string {
const eventName =
getEventName(event.topic) ?? event.topic.map((entry) => entry.toString()).join('|');
return `${contractAddress}:${event.id}:${eventName}:${event.txHash ?? ''}`;
}
2 changes: 1 addition & 1 deletion listener/src/services/notification-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class NotificationScheduler {
constructor(
repository: ScheduledNotificationRepository,
config: SchedulerConfig,
discordService?: DiscordNotificationService
discordService?: DiscordNotificationService | null
) {
this.repository = repository;
this.config = config;
Expand Down
2 changes: 1 addition & 1 deletion listener/src/types/scheduled-notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export interface SchedulerConfig {
enabled: boolean;
pollIntervalMs: number;
lockTimeoutMs: number;
processorId: string;
processorId?: string;
batchSize: number;
timingBufferMs: number;
}
Loading