Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 7 additions & 31 deletions src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Module, NestModule, MiddlewareConsumer, RequestMethod } from '@nestjs/common';
import { Module, NestModule, MiddlewareConsumer } from '@nestjs/common';
import { APP_FILTER } from '@nestjs/core';
import { AppController } from './app.controller';
import { AppService } from './app.service';
Expand All @@ -12,48 +12,24 @@ import { TokensModule } from './tokens/tokens.module';
import { AllExceptionsFilter } from './common/filters/all-exceptions.filter';
import { OgModule } from './og/og.module';
import { TradeModule } from './trade/trade.module';
import { QueueModule } from './queue/queue.module';
import { ConfigModule } from '@nestjs/config';
import { MaintenanceMiddleware } from './common/middleware/maintenance.middleware';
import { RequestIdMiddleware } from './common/middleware/request-id.middleware';
import { RedisModule } from './common/redis/redis.module';

/**
* Root module of the application.
* Orchestrates the integration of all feature modules and global middleware.
*/
@Module({
imports: [
ConfigModule.forRoot({ isGlobal: true }),
RedisModule,
PrismaModule,
HealthModule,
RiskModule,
AuthModule,
AnalyticsModule,
SwapModule,
TokensModule,
OgModule,
TradeModule
RedisModule, PrismaModule, HealthModule, RiskModule,
AuthModule, AnalyticsModule, SwapModule, TokensModule,
OgModule, TradeModule, QueueModule,
],
controllers: [AppController],
providers: [
AppService,
{
provide: APP_FILTER,
useClass: AllExceptionsFilter,
},
],
providers: [AppService, { provide: APP_FILTER, useClass: AllExceptionsFilter }],
})
export class AppModule implements NestModule {
/**
* Configures global middleware for the entire application.
* Currently applies RequestIdMiddleware and MaintenanceMiddleware to all routes.
*
* @param consumer - The middleware consumer to register middleware on.
*/
configure(consumer: MiddlewareConsumer) {
consumer
.apply(RequestIdMiddleware, MaintenanceMiddleware)
.forRoutes('*');
consumer.apply(RequestIdMiddleware, MaintenanceMiddleware).forRoutes('*');
}
}
27 changes: 27 additions & 0 deletions src/queue/indexer.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { IndexerRetryJob } from './queue.service';
import { Server } from '@stellar/stellar-sdk/rpc';

@Processor('indexer-retry')
export class IndexerProcessor extends WorkerHost {
private readonly logger = new Logger(IndexerProcessor.name);

async process(job: Job<IndexerRetryJob>): Promise<any> {
const { rpcMethod, params } = job.data;
this.logger.log('Processing indexer retry: ' + rpcMethod + ' (attempt ' + job.attemptsMade + ')');

const rpcUrl = process.env.STELLAR_RPC_URL || 'https://soroban-testnet.stellar.org';
const server = new Server(rpcUrl);

try {
const result = await (server as any)[rpcMethod](...(Array.isArray(params) ? params : [params]));
this.logger.log('Indexer retry succeeded: ' + rpcMethod);
return result;
} catch (err) {
this.logger.error('Indexer retry failed: ' + rpcMethod, err);
throw err;
}
}
}
34 changes: 34 additions & 0 deletions src/queue/notification.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { NotificationJob } from './queue.service';

@Processor('notification')
export class NotificationProcessor extends WorkerHost {
private readonly logger = new Logger(NotificationProcessor.name);

async process(job: Job<NotificationJob>): Promise<any> {
const { type, recipient, subject, payload } = job.data;
this.logger.log('Processing notification: ' + type + ' to ' + recipient + ' (attempt ' + job.attemptsMade + ')');

if (type === 'webhook') {
try {
await fetch(recipient, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ subject, ...payload }),
});
this.logger.log('Webhook sent successfully');
} catch (err) {
this.logger.error('Webhook failed', err);
throw err;
}
}

if (type === 'email') {
this.logger.log('Email notification queued for: ' + recipient);
}

return { status: 'processed' };
}
}
32 changes: 32 additions & 0 deletions src/queue/queue.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { Module, Global } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
import { ConfigService } from '@nestjs/config';
import { QueueService } from './queue.service';
import { IndexerProcessor } from './indexer.processor';
import { NotificationProcessor } from './notification.processor';

@Global()
@Module({
imports: [
BullModule.forRootAsync({
useFactory: (config: ConfigService) => ({
connection: {
host: config.get('REDIS_HOST', 'localhost'),
port: config.get('REDIS_PORT', 6379),
},
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
},
}),
inject: [ConfigService],
}),
BullModule.registerQueue(
{ name: 'indexer-retry' },
{ name: 'notification' },
),
],
providers: [QueueService, IndexerProcessor, NotificationProcessor],
exports: [QueueService, BullModule],
})
export class QueueModule {}
41 changes: 41 additions & 0 deletions src/queue/queue.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';

export interface IndexerRetryJob {
rpcMethod: string;
params: Record<string, any>;
attempt: number;
}

export interface NotificationJob {
type: 'email' | 'webhook';
recipient: string;
subject: string;
payload: Record<string, any>;
}

@Injectable()
export class QueueService {
private readonly logger = new Logger(QueueService.name);

constructor(
@InjectQueue('indexer-retry') private indexerQueue: Queue,
@InjectQueue('notification') private notificationQueue: Queue,
) {}

async enqueueIndexerRetry(job: IndexerRetryJob) {
const id = indexer-retry-;
await this.indexerQueue.add('retry-rpc', job, { jobId: id });
this.logger.log('Enqueued indexer retry job: ' + id);
return { jobId: id, status: 'accepted' };
}

async enqueueNotification(job: NotificationJob) {
const id =
otification-;
await this.notificationQueue.add('send-notification', job, { jobId: id });
this.logger.log('Enqueued notification job: ' + id);
return { jobId: id, status: 'accepted' };
}
}