Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,245 changes: 1,218 additions & 27 deletions listener/package-lock.json

Large diffs are not rendered by default.

302 changes: 167 additions & 135 deletions listener/src/api/events-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import { NotificationAPI } from '../services/notification-api';
import { NotificationType } from '../types/scheduled-notification';
import logger from '../utils/logger';
import { generateRequestId } from '../utils/request-id';
import { RateLimitConfig } from '../types';
import { RateLimiter } from './rate-limiter';

export interface EventsServerOptions {
port: number;
corsOrigin?: string;
stellarRpcUrl: string;
discordWebhookUrl?: string;
notificationAPI?: NotificationAPI | null;
rateLimit?: RateLimitConfig;
}

type ServiceStatus = 'ok' | 'error' | 'not_configured';
Expand Down Expand Up @@ -117,14 +120,15 @@ async function buildHealthResponse(options: EventsServerOptions): Promise<Health

export function createEventsServer(options: EventsServerOptions): http.Server {
const corsOrigin = options.corsOrigin ?? 'http://localhost:5173';
const rateLimiter = options.rateLimit ? new RateLimiter(options.rateLimit) : undefined;

return http.createServer((req, res) => {
const server = http.createServer((req, res) => {
const requestId = generateRequestId();
const startTime = Date.now();

res.setHeader('Access-Control-Allow-Origin', corsOrigin);
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, X-API-Key, Authorization');
res.setHeader('X-Request-Id', requestId);

if (req.method === 'OPTIONS') {
Expand All @@ -133,166 +137,194 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
return;
}

if (req.method === 'GET' && req.url === '/health') {
buildHealthResponse(options).then((health) => {
const httpStatus = health.status === 'error' ? 503 : 200;
res.writeHead(httpStatus, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(health));
}).catch((err) => {
logger.error('Health check failed unexpectedly', { error: err });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'error', detail: 'Internal health check failure' }));
});
return;
}
const executeRoute = () => {
if (req.method === 'GET' && req.url === '/health') {
buildHealthResponse(options).then((health) => {
const httpStatus = health.status === 'error' ? 503 : 200;
res.writeHead(httpStatus, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(health));
}).catch((err) => {
logger.error('Health check failed unexpectedly', { error: err });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'error', detail: 'Internal health check failure' }));
});
return;
}

if (req.method === 'GET' && req.url?.startsWith('/api/events')) {
const url = new URL(req.url, 'http://localhost');
const limitParam = url.searchParams.get('limit');
const limit = limitParam ? parseInt(limitParam, 10) : undefined;
if (req.method === 'GET' && req.url?.startsWith('/api/events')) {
const url = new URL(req.url, 'http://localhost');
const limitParam = url.searchParams.get('limit');
const limit = limitParam ? parseInt(limitParam, 10) : undefined;

logger.info('Handling GET /api/events', {
requestId,
limit: limit ?? 'all',
});
logger.info('Handling GET /api/events', {
requestId,
limit: limit ?? 'all',
});

const events =
limit !== undefined && !Number.isNaN(limit)
? eventRegistry.getEvents(limit)
: eventRegistry.getEvents();
const events =
limit !== undefined && !Number.isNaN(limit)
? eventRegistry.getEvents(limit)
: eventRegistry.getEvents();

res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({
count: eventRegistry.count(),
events,
})
);

logger.info('GET /api/events complete', {
requestId,
returned: events.length,
durationMs: Date.now() - startTime,
});
return;
}

res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({
count: eventRegistry.count(),
events,
})
);
// Schedule notification endpoint
if (req.method === 'POST' && req.url === '/api/schedule') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}

logger.info('GET /api/events complete', {
requestId,
returned: events.length,
durationMs: Date.now() - startTime,
});
return;
}
let body = '';
req.on('data', (chunk) => {
body += chunk.toString();
});

// Schedule notification endpoint
if (req.method === 'POST' && req.url === '/api/schedule') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
req.on('end', async () => {
try {
const data = JSON.parse(body);

// Validate required fields
if (!data.executeAt || !data.payload || !data.targetRecipient) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Missing required fields: executeAt, payload, targetRecipient' }));
return;
}

const notificationId = await options.notificationAPI!.scheduleNotification({
payload: data.payload,
notificationType: data.notificationType || NotificationType.DISCORD,
targetRecipient: data.targetRecipient,
executeAt: new Date(data.executeAt),
maxRetries: data.maxRetries,
priority: data.priority,
eventId: data.eventId,
contractAddress: data.contractAddress,
metadata: data.metadata,
});

res.writeHead(201, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ id: notificationId }));

logger.info('Notification scheduled via API', {
requestId,
notificationId,
executeAt: data.executeAt,
});
} catch (error) {
logger.error('Failed to schedule notification', { error, requestId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
}
});
return;
}

let body = '';
req.on('data', (chunk) => {
body += chunk.toString();
});

req.on('end', async () => {
try {
const data = JSON.parse(body);

// Validate required fields
if (!data.executeAt || !data.payload || !data.targetRecipient) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Missing required fields: executeAt, payload, targetRecipient' }));
return;
}
// Get scheduler statistics endpoint
if (req.method === 'GET' && req.url === '/api/schedule/stats') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}

const notificationId = await options.notificationAPI!.scheduleNotification({
payload: data.payload,
notificationType: data.notificationType || NotificationType.DISCORD,
targetRecipient: data.targetRecipient,
executeAt: new Date(data.executeAt),
maxRetries: data.maxRetries,
priority: data.priority,
eventId: data.eventId,
contractAddress: data.contractAddress,
metadata: data.metadata,
options.notificationAPI.getStatistics()
.then((stats) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(stats));
})
.catch((error) => {
logger.error('Failed to get scheduler stats', { error, requestId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
});
return;
}

res.writeHead(201, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ id: notificationId }));
// Get specific notification endpoint
if (req.method === 'GET' && req.url?.startsWith('/api/schedule/')) {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}

logger.info('Notification scheduled via API', {
requestId,
notificationId,
executeAt: data.executeAt,
});
} catch (error) {
logger.error('Failed to schedule notification', { error, requestId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
const id = parseInt(req.url.split('/').pop() || '', 10);
if (isNaN(id)) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid notification ID' }));
return;
}
});
return;
}

// Get scheduler statistics endpoint
if (req.method === 'GET' && req.url === '/api/schedule/stats') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
options.notificationAPI.getNotification(id)
.then((notification) => {
if (!notification) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Notification not found' }));
return;
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(notification));
})
.catch((error) => {
logger.error('Failed to get notification', { error, requestId, id });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
});
return;
}

options.notificationAPI.getStatistics()
.then((stats) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(stats));
})
.catch((error) => {
logger.error('Failed to get scheduler stats', { error, requestId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
});
return;
}

// Get specific notification endpoint
if (req.method === 'GET' && req.url?.startsWith('/api/schedule/')) {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}
logger.warn('Unhandled request', {
requestId,
method: req.method,
url: req.url,
});

const id = parseInt(req.url.split('/').pop() || '', 10);
if (isNaN(id)) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid notification ID' }));
return;
}
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
};

options.notificationAPI.getNotification(id)
.then((notification) => {
if (!notification) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Notification not found' }));
return;
if (rateLimiter && req.url?.startsWith('/api/')) {
rateLimiter.handle(req, res, requestId)
.then((allowed) => {
if (allowed) {
executeRoute();
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(notification));
})
.catch((error) => {
logger.error('Failed to get notification', { error, requestId, id });
.catch((err) => {
logger.error('Rate limiter execution error', { error: err, requestId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
res.end(JSON.stringify({ error: 'Internal server error' }));
});
return;
} else {
executeRoute();
}
});

logger.warn('Unhandled request', {
requestId,
method: req.method,
url: req.url,
});
if (rateLimiter) {
const originalClose = server.close.bind(server);
server.close = (callback?: (err?: Error) => void) => {
rateLimiter.destroy();
return originalClose(callback);
};
}

res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
});
return server;
}

export function startEventsServer(options: EventsServerOptions): http.Server {
Expand Down
Loading
Loading