diff --git a/src/app.module.ts b/src/app.module.ts index d9285b6..4e3f9c4 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -1,4 +1,4 @@ -import { Module, NestModule, MiddlewareConsumer, RequestMethod } from '@nestjs/common'; +import { Module, NestModule, MiddlewareConsumer } from '@nestjs/common'; import { APP_FILTER } from '@nestjs/core'; import { AppController } from './app.controller'; import { AppService } from './app.service'; @@ -12,48 +12,24 @@ import { TokensModule } from './tokens/tokens.module'; import { AllExceptionsFilter } from './common/filters/all-exceptions.filter'; import { OgModule } from './og/og.module'; import { TradeModule } from './trade/trade.module'; +import { QueueModule } from './queue/queue.module'; import { ConfigModule } from '@nestjs/config'; import { MaintenanceMiddleware } from './common/middleware/maintenance.middleware'; import { RequestIdMiddleware } from './common/middleware/request-id.middleware'; import { RedisModule } from './common/redis/redis.module'; -/** - * Root module of the application. - * Orchestrates the integration of all feature modules and global middleware. - */ @Module({ imports: [ ConfigModule.forRoot({ isGlobal: true }), - RedisModule, - PrismaModule, - HealthModule, - RiskModule, - AuthModule, - AnalyticsModule, - SwapModule, - TokensModule, - OgModule, - TradeModule + RedisModule, PrismaModule, HealthModule, RiskModule, + AuthModule, AnalyticsModule, SwapModule, TokensModule, + OgModule, TradeModule, QueueModule, ], controllers: [AppController], - providers: [ - AppService, - { - provide: APP_FILTER, - useClass: AllExceptionsFilter, - }, - ], + providers: [AppService, { provide: APP_FILTER, useClass: AllExceptionsFilter }], }) export class AppModule implements NestModule { - /** - * Configures global middleware for the entire application. - * Currently applies RequestIdMiddleware and MaintenanceMiddleware to all routes. - * - * @param consumer - The middleware consumer to register middleware on. - */ configure(consumer: MiddlewareConsumer) { - consumer - .apply(RequestIdMiddleware, MaintenanceMiddleware) - .forRoutes('*'); + consumer.apply(RequestIdMiddleware, MaintenanceMiddleware).forRoutes('*'); } } diff --git a/src/queue/indexer.processor.ts b/src/queue/indexer.processor.ts new file mode 100644 index 0000000..7adfdda --- /dev/null +++ b/src/queue/indexer.processor.ts @@ -0,0 +1,27 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { IndexerRetryJob } from './queue.service'; +import { Server } from '@stellar/stellar-sdk/rpc'; + +@Processor('indexer-retry') +export class IndexerProcessor extends WorkerHost { + private readonly logger = new Logger(IndexerProcessor.name); + + async process(job: Job): Promise { + const { rpcMethod, params } = job.data; + this.logger.log('Processing indexer retry: ' + rpcMethod + ' (attempt ' + job.attemptsMade + ')'); + + const rpcUrl = process.env.STELLAR_RPC_URL || 'https://soroban-testnet.stellar.org'; + const server = new Server(rpcUrl); + + try { + const result = await (server as any)[rpcMethod](...(Array.isArray(params) ? params : [params])); + this.logger.log('Indexer retry succeeded: ' + rpcMethod); + return result; + } catch (err) { + this.logger.error('Indexer retry failed: ' + rpcMethod, err); + throw err; + } + } +} diff --git a/src/queue/notification.processor.ts b/src/queue/notification.processor.ts new file mode 100644 index 0000000..b9476a0 --- /dev/null +++ b/src/queue/notification.processor.ts @@ -0,0 +1,34 @@ +import { Processor, WorkerHost } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { NotificationJob } from './queue.service'; + +@Processor('notification') +export class NotificationProcessor extends WorkerHost { + private readonly logger = new Logger(NotificationProcessor.name); + + async process(job: Job): Promise { + const { type, recipient, subject, payload } = job.data; + this.logger.log('Processing notification: ' + type + ' to ' + recipient + ' (attempt ' + job.attemptsMade + ')'); + + if (type === 'webhook') { + try { + await fetch(recipient, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ subject, ...payload }), + }); + this.logger.log('Webhook sent successfully'); + } catch (err) { + this.logger.error('Webhook failed', err); + throw err; + } + } + + if (type === 'email') { + this.logger.log('Email notification queued for: ' + recipient); + } + + return { status: 'processed' }; + } +} diff --git a/src/queue/queue.module.ts b/src/queue/queue.module.ts new file mode 100644 index 0000000..da3ccc7 --- /dev/null +++ b/src/queue/queue.module.ts @@ -0,0 +1,32 @@ +import { Module, Global } from '@nestjs/common'; +import { BullModule } from '@nestjs/bullmq'; +import { ConfigService } from '@nestjs/config'; +import { QueueService } from './queue.service'; +import { IndexerProcessor } from './indexer.processor'; +import { NotificationProcessor } from './notification.processor'; + +@Global() +@Module({ + imports: [ + BullModule.forRootAsync({ + useFactory: (config: ConfigService) => ({ + connection: { + host: config.get('REDIS_HOST', 'localhost'), + port: config.get('REDIS_PORT', 6379), + }, + defaultJobOptions: { + attempts: 5, + backoff: { type: 'exponential', delay: 2000 }, + }, + }), + inject: [ConfigService], + }), + BullModule.registerQueue( + { name: 'indexer-retry' }, + { name: 'notification' }, + ), + ], + providers: [QueueService, IndexerProcessor, NotificationProcessor], + exports: [QueueService, BullModule], +}) +export class QueueModule {} diff --git a/src/queue/queue.service.ts b/src/queue/queue.service.ts new file mode 100644 index 0000000..449c813 --- /dev/null +++ b/src/queue/queue.service.ts @@ -0,0 +1,41 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectQueue } from '@nestjs/bullmq'; +import { Queue } from 'bullmq'; + +export interface IndexerRetryJob { + rpcMethod: string; + params: Record; + attempt: number; +} + +export interface NotificationJob { + type: 'email' | 'webhook'; + recipient: string; + subject: string; + payload: Record; +} + +@Injectable() +export class QueueService { + private readonly logger = new Logger(QueueService.name); + + constructor( + @InjectQueue('indexer-retry') private indexerQueue: Queue, + @InjectQueue('notification') private notificationQueue: Queue, + ) {} + + async enqueueIndexerRetry(job: IndexerRetryJob) { + const id = indexer-retry-; + await this.indexerQueue.add('retry-rpc', job, { jobId: id }); + this.logger.log('Enqueued indexer retry job: ' + id); + return { jobId: id, status: 'accepted' }; + } + + async enqueueNotification(job: NotificationJob) { + const id = +otification-; + await this.notificationQueue.add('send-notification', job, { jobId: id }); + this.logger.log('Enqueued notification job: ' + id); + return { jobId: id, status: 'accepted' }; + } +}