Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 150 additions & 85 deletions listener/src/api/events-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ import * as StellarSDK from '@stellar/stellar-sdk';
import { eventRegistry } from '../store/event-registry';
import { preferenceStore } from '../store/preference-store';
import { PreferencesUpdateInput } from '../types/preferences';
import { NotificationAPI } from '../services/notification-api';
import { NotificationType } from '../types/scheduled-notification';
import logger from '../utils/logger';
import { generateRequestId } from '../utils/request-id';
import { generateRequestId, resolveCorrelationId } from '../utils/request-id';
import {
verifySignature,
extractSignature,
extractKeyId,
getSecretForKey,
collectRawBody,
} from '../services/webhook-verifier';
import { WebhookSecret } from '../types';
import { RateLimitConfig } from '../types';
import { WebhookSecret, RateLimitConfig } from '../types';
import { RateLimiter } from './rate-limiter';

export interface EventsServerOptions {
Expand Down Expand Up @@ -133,11 +134,14 @@ export function createEventsServer(options: EventsServerOptions): http.Server {

const server = http.createServer((req, res) => {
const requestId = generateRequestId();
const correlationId = resolveCorrelationId(req.headers['x-correlation-id']);
const startTime = Date.now();

res.setHeader('Access-Control-Allow-Origin', corsOrigin);
res.setHeader('Access-Control-Allow-Methods', 'GET, PUT, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type');
res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, OPTIONS');
res.setHeader('Access-Control-Allow-Headers', 'Content-Type, X-API-Key, Authorization, X-Correlation-Id');
res.setHeader('X-Request-Id', requestId);
res.setHeader('X-Correlation-Id', correlationId);

if (req.method === 'OPTIONS') {
res.writeHead(204);
Expand All @@ -147,6 +151,20 @@ export function createEventsServer(options: EventsServerOptions): http.Server {

const url = new URL(req.url ?? '/', 'http://localhost');

// GET /health
if (req.method === 'GET' && url.pathname === '/health') {
buildHealthResponse(options).then((health) => {
const httpStatus = health.status === 'error' ? 503 : 200;
res.writeHead(httpStatus, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(health));
}).catch((err) => {
logger.error('Health check failed unexpectedly', { error: err, requestId, correlationId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'error', detail: 'Internal health check failure' }));
});
return;
}

// GET /api/events
if (req.method === 'GET' && url.pathname.startsWith('/api/events')) {
const limitParam = url.searchParams.get('limit');
Expand All @@ -156,59 +174,35 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
? eventRegistry.getEvents(limit)
: eventRegistry.getEvents();

logger.info('Handling GET /api/events', {
requestId,
limit: limit ?? 'all',
});

const events =
limit !== undefined && !Number.isNaN(limit)
? eventRegistry.getEvents(limit)
: eventRegistry.getEvents();

res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(
JSON.stringify({
count: eventRegistry.count(),
events,
})
);

logger.info('GET /api/events complete', {
requestId,
returned: events.length,
durationMs: Date.now() - startTime,
});
return;
}
logger.info('Handling GET /api/events', { requestId, correlationId, limit: limit ?? 'all' });

// Schedule notification endpoint
if (req.method === 'POST' && req.url === '/api/schedule') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ count: eventRegistry.count(), events }));

let body = '';
req.on('data', (chunk) => {
body += chunk.toString();
});
logger.info('GET /api/events complete', {
requestId,
correlationId,
returned: events.length,
durationMs: Date.now() - startTime,
});
return;
}

if (req.method === 'POST' && req.url === '/api/webhooks') {
// POST /api/webhooks
if (req.method === 'POST' && url.pathname === '/api/webhooks') {
collectRawBody(req).then((rawBody) => {
const signatureHeader = extractSignature(req.headers);
const keyId = extractKeyId(req.headers);

if (!signatureHeader) {
logger.warn('Webhook missing signature header', { requestId });
logger.warn('Webhook missing signature header', { requestId, correlationId });
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Missing signature header' }));
return;
}

if (!keyId) {
logger.warn('Webhook missing key-id header', { requestId });
logger.warn('Webhook missing key-id header', { requestId, correlationId });
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Missing key-id header' }));
return;
Expand All @@ -218,66 +212,133 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
const secret = getSecretForKey(secrets, keyId);

if (!secret) {
logger.warn('Webhook unknown key-id', { requestId, keyId });
logger.warn('Webhook unknown key-id', { requestId, correlationId, keyId });
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Unknown key-id' }));
return;
}

const isValid = verifySignature(rawBody, signatureHeader, secret);

if (!isValid) {
logger.warn('Webhook invalid signature', { requestId, keyId });
if (!verifySignature(rawBody, signatureHeader, secret)) {
logger.warn('Webhook invalid signature', { requestId, correlationId, keyId });
res.writeHead(401, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid signature' }));
return;
}

logger.info('Webhook received and verified', { requestId, keyId });

logger.info('Webhook received and verified', { requestId, correlationId, keyId });
res.writeHead(202, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'accepted' }));
}).catch((err) => {
logger.error('Failed to read webhook body', { requestId, error: err });
logger.error('Failed to read webhook body', { requestId, correlationId, error: err });
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Failed to read request body' }));
});
// Schedule notification endpoint
if (req.method === 'POST' && req.url === '/api/schedule') {
return;
}

// POST /api/schedule
if (req.method === 'POST' && url.pathname === '/api/schedule') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
req.on('end', async () => {
try {
const data = JSON.parse(body);

// Validate required fields
if (!data.executeAt || !data.payload || !data.targetRecipient) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Missing required fields: executeAt, payload, targetRecipient' }));
return;
}

const executeAt = new Date(data.executeAt);
if (isNaN(executeAt.getTime())) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'executeAt is not a valid date' }));
return;
}

const notificationId = await options.notificationAPI!.scheduleNotification({
payload: data.payload,
notificationType: data.notificationType || NotificationType.DISCORD,
targetRecipient: data.targetRecipient,
executeAt,
maxRetries: data.maxRetries,
priority: data.priority,
eventId: data.eventId,
contractAddress: data.contractAddress,
metadata: data.metadata,
});
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ count: eventRegistry.count(), events }));
return;
}

let body = '';
req.on('data', (chunk) => { body += chunk.toString(); });
req.on('end', async () => {
try {
const data = JSON.parse(body);

if (!data.executeAt || !data.payload || !data.targetRecipient) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Missing required fields: executeAt, payload, targetRecipient' }));
return;
}

const executeAt = new Date(data.executeAt);
if (isNaN(executeAt.getTime())) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'executeAt is not a valid date' }));
return;
}

const notificationId = await options.notificationAPI!.scheduleNotification({
payload: data.payload,
notificationType: data.notificationType || NotificationType.DISCORD,
targetRecipient: data.targetRecipient,
executeAt,
maxRetries: data.maxRetries,
priority: data.priority,
eventId: data.eventId,
contractAddress: data.contractAddress,
metadata: data.metadata,
});

res.writeHead(201, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ id: notificationId }));

logger.info('Notification scheduled via API', { requestId, correlationId, notificationId, executeAt: data.executeAt });
} catch (error) {
logger.error('Failed to schedule notification', { error, requestId, correlationId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
}
});
return;
}

// GET /api/schedule/stats
if (req.method === 'GET' && url.pathname === '/api/schedule/stats') {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}

options.notificationAPI.getStatistics()
.then((stats) => {
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(stats));
})
.catch((error) => {
logger.error('Failed to get scheduler stats', { error, requestId, correlationId });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
});
return;
}

// GET /api/schedule/:id
if (req.method === 'GET' && url.pathname.startsWith('/api/schedule/')) {
if (!options.notificationAPI) {
res.writeHead(503, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Scheduler not enabled' }));
return;
}

const id = parseInt(url.pathname.split('/').pop() || '', 10);
if (isNaN(id)) {
res.writeHead(400, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Invalid notification ID' }));
return;
}

options.notificationAPI.getNotification(id)
.then((notification) => {
if (!notification) {
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Notification not found' }));
return;
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(notification));
})
.catch((error) => {
logger.error('Failed to get notification', { error, requestId, correlationId, id });
res.writeHead(500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: (error as Error).message }));
});
return;
}

Expand Down Expand Up @@ -315,6 +376,10 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
});
return;
}

logger.warn('Unhandled request', { requestId, correlationId, method: req.method, url: req.url });
res.writeHead(404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: 'Not found' }));
});

if (rateLimiter) {
Expand Down
66 changes: 66 additions & 0 deletions listener/src/tests/events-server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import http from 'http';
import { createEventsServer } from '../api/events-server';

const TEST_PORT = 19876;

function makeRequest(
path: string,
options: { headers?: Record<string, string>; method?: string } = {}
): Promise<{ status: number; headers: http.IncomingHttpHeaders }> {
return new Promise((resolve, reject) => {
const req = http.request(
{ hostname: '127.0.0.1', port: TEST_PORT, path, method: options.method ?? 'GET', headers: options.headers },
(res) => {
res.resume(); // drain body
resolve({ status: res.statusCode ?? 0, headers: res.headers });
}
);
req.on('error', reject);
req.end();
});
}

describe('correlation ID propagation', () => {
let server: http.Server;

beforeAll((done) => {
server = createEventsServer({
port: TEST_PORT,
stellarRpcUrl: 'http://localhost:8000',
});
server.listen(TEST_PORT, done);
});

afterAll((done) => {
server.close(done);
});

test('generates a correlation ID when none is provided', async () => {
const { headers } = await makeRequest('/api/events');
expect(headers['x-correlation-id']).toBeTruthy();
});

test('echoes back the caller-supplied correlation ID', async () => {
const myId = 'my-trace-abc123';
const { headers } = await makeRequest('/api/events', {
headers: { 'x-correlation-id': myId },
});
expect(headers['x-correlation-id']).toBe(myId);
});

test('always includes x-request-id alongside correlation ID', async () => {
const { headers } = await makeRequest('/api/events');
expect(headers['x-request-id']).toBeTruthy();
expect(headers['x-correlation-id']).toBeTruthy();
expect(headers['x-request-id']).not.toBe(headers['x-correlation-id']);
});

test('correlation ID flows through on 404 responses', async () => {
const myId = 'trace-404-test';
const { status, headers } = await makeRequest('/no-such-route', {
headers: { 'x-correlation-id': myId },
});
expect(status).toBe(404);
expect(headers['x-correlation-id']).toBe(myId);
});
});
Loading
Loading