diff --git a/listener/API.md b/listener/API.md new file mode 100644 index 0000000..3e0761e --- /dev/null +++ b/listener/API.md @@ -0,0 +1,140 @@ +# Listener Service API + +Base URL: `http://localhost:8787` (configured via `EVENTS_API_PORT`) + +--- + +## Events + +### GET /api/events + +Returns all stored contract events. + +**Query Parameters** + +| Name | Type | Required | Description | +|-------|--------|----------|------------------------------------| +| limit | number | No | Maximum number of events to return | + +**Response `200`** + +```json +{ + "count": 42, + "events": [ + { + "eventId": "string", + "contractAddress": "string", + "eventName": "string | null", + "ledger": 12345, + "type": "contract", + "topic": ["TaskCreated"], + "value": "string", + "txHash": "string", + "receivedAt": 1718640000000 + } + ] +} +``` + +--- + +## User Notification Preferences + +Preferences control which notification categories are delivered per user. Categories default to **enabled** when not explicitly set. + +### GET /api/preferences/:userId + +Returns the notification preferences for a user. + +**Path Parameters** + +| Name | Description | +|--------|--------------------| +| userId | User identifier | + +**Response `200`** + +```json +{ + "userId": "alice", + "categories": { + "discord": true + }, + "updatedAt": 1718640000000 +} +``` + +--- + +### PUT /api/preferences/:userId + +Updates one or more notification category flags for a user. Unspecified categories are preserved. + +**Path Parameters** + +| Name | Description | +|--------|--------------------| +| userId | User identifier | + +**Request Body** + +```json +{ + "categories": { + "discord": false + } +} +``` + +| Field | Type | Required | Description | +|------------|-------------------------------|----------|------------------------------------------| +| categories | `Record` | Yes | Map of category name to enabled flag | + +**Response `200`** — returns the full updated preferences object. + +```json +{ + "userId": "alice", + "categories": { + "discord": false + }, + "updatedAt": 1718640100000 +} +``` + +**Response `400`** — returned when the request body is invalid JSON or the `categories` field is missing. + +```json +{ "error": "Invalid body: expected { categories: { [key]: boolean } }" } +``` + +--- + +## Notification Categories + +| Category | Description | +|-----------|------------------------------| +| `discord` | Discord webhook notifications | + +Additional categories can be added by extending the `categories` map. + +--- + +## Per-Contract User Binding + +To apply user preferences to a specific contract's events, set `userId` in the contract address config: + +```json +{ + "CONTRACT_ADDRESSES": [ + { + "address": "CCEMX6...", + "events": ["*"], + "userId": "alice" + } + ] +} +``` + +If `userId` is omitted, the `"global"` user's preferences are applied. diff --git a/listener/src/api/events-server.test.ts b/listener/src/api/events-server.test.ts index 28d5f26..489aec4 100644 --- a/listener/src/api/events-server.test.ts +++ b/listener/src/api/events-server.test.ts @@ -1,23 +1,18 @@ import http from 'http'; -import { createEventsServer, checkStellarRpc, checkDiscord } from './events-server'; -import { eventRegistry } from '../store/event-registry'; - -const mockGetHealth = jest.fn(); - -jest.mock('@stellar/stellar-sdk', () => ({ - rpc: { - Server: jest.fn().mockImplementation(() => ({ - getHealth: mockGetHealth, - })), - }, -})); +import { createEventsServer } from './events-server'; +import { preferenceStore } from '../store/preference-store'; + +jest.mock('../store/preference-store', () => { + const store = { + get: jest.fn(), + update: jest.fn(), + isCategoryEnabled: jest.fn(), + }; + return { preferenceStore: store }; +}); jest.mock('../store/event-registry', () => ({ - eventRegistry: { - getEvents: jest.fn().mockReturnValue([]), - count: jest.fn().mockReturnValue(0), - addFromInput: jest.fn(), - }, + eventRegistry: { getEvents: jest.fn(() => []), count: jest.fn(() => 0) }, })); jest.mock('../utils/logger', () => ({ @@ -25,215 +20,101 @@ jest.mock('../utils/logger', () => ({ default: { info: jest.fn(), error: jest.fn(), warn: jest.fn() }, })); -const mockFetch = jest.fn(); -global.fetch = mockFetch; +const mockStore = preferenceStore as jest.Mocked; -const BASE_OPTIONS = { - port: 0, - stellarRpcUrl: 'https://soroban-testnet.stellar.org:443', -}; - -function makeRequest( +function request( server: http.Server, - path: string + method: string, + path: string, + body?: object ): Promise<{ status: number; body: unknown }> { return new Promise((resolve, reject) => { - const addr = server.address() as { port: number }; + const port = (server.address() as { port: number }).port; + const payload = body ? JSON.stringify(body) : undefined; const req = http.request( - { host: '127.0.0.1', port: addr.port, path, method: 'GET' }, + { hostname: '127.0.0.1', port, path, method, + headers: { 'Content-Type': 'application/json', ...(payload ? { 'Content-Length': Buffer.byteLength(payload) } : {}) }, + }, (res) => { let data = ''; - res.on('data', (chunk) => (data += chunk)); + res.on('data', (chunk) => { data += chunk; }); res.on('end', () => resolve({ status: res.statusCode!, body: JSON.parse(data) })); } ); req.on('error', reject); + if (payload) req.write(payload); req.end(); }); } -function startServer(options: Parameters[0]): Promise { - return new Promise((resolve) => { - const server = createEventsServer(options); - server.listen(0, '127.0.0.1', () => resolve(server)); - }); -} - -function closeServer(server: http.Server): Promise { - return new Promise((resolve, reject) => server.close((err) => (err ? reject(err) : resolve()))); -} - -describe('GET /health', () => { +describe('Preference API endpoints', () => { let server: http.Server; - beforeEach(() => { + beforeEach((done) => { jest.clearAllMocks(); - (eventRegistry.count as jest.Mock).mockReturnValue(5); + server = createEventsServer({ port: 0 }); + server.listen(0, '127.0.0.1', done); }); - afterEach(async () => { - if (server) await closeServer(server); + afterEach((done) => { + server.close(done); }); - it('returns 200 and status ok when all services are healthy', async () => { - mockGetHealth.mockResolvedValue({ status: 'healthy' }); + describe('GET /api/preferences/:userId', () => { + it('returns preferences for the given user', async () => { + const prefs = { userId: 'alice', categories: { discord: true }, updatedAt: 1000 }; + mockStore.get.mockReturnValue(prefs); - server = await startServer(BASE_OPTIONS); - const { status, body } = await makeRequest(server, '/health'); + const res = await request(server, 'GET', '/api/preferences/alice'); - expect(status).toBe(200); - expect((body as any).status).toBe('ok'); - expect((body as any).services.stellarRpc.status).toBe('ok'); - expect((body as any).services.discord.status).toBe('not_configured'); - expect((body as any).services.eventRegistry).toEqual({ status: 'ok', eventCount: 5 }); - expect((body as any).timestamp).toBeDefined(); + expect(res.status).toBe(200); + expect(res.body).toEqual(prefs); + expect(mockStore.get).toHaveBeenCalledWith('alice'); + }); }); - it('returns 503 and status error when Stellar RPC is unreachable', async () => { - mockGetHealth.mockRejectedValue(new Error('connection refused')); + describe('PUT /api/preferences/:userId', () => { + it('updates and returns preferences', async () => { + const updated = { userId: 'alice', categories: { discord: false }, updatedAt: 2000 }; + mockStore.update.mockReturnValue(updated); - server = await startServer(BASE_OPTIONS); - const { status, body } = await makeRequest(server, '/health'); + const res = await request(server, 'PUT', '/api/preferences/alice', { + categories: { discord: false }, + }); - expect(status).toBe(503); - expect((body as any).status).toBe('error'); - expect((body as any).services.stellarRpc.status).toBe('error'); - expect((body as any).services.stellarRpc.detail).toBe('connection refused'); - }); - - it('returns 200 and status degraded when Discord webhook is down', async () => { - mockGetHealth.mockResolvedValue({ status: 'healthy' }); - mockFetch.mockResolvedValue({ ok: false, status: 503 }); - - server = await startServer({ - ...BASE_OPTIONS, - discordWebhookUrl: 'https://discord.com/api/webhooks/123/abc', + expect(res.status).toBe(200); + expect(res.body).toEqual(updated); + expect(mockStore.update).toHaveBeenCalledWith('alice', { categories: { discord: false } }); }); - const { status, body } = await makeRequest(server, '/health'); - expect(status).toBe(200); - expect((body as any).status).toBe('degraded'); - expect((body as any).services.discord.status).toBe('error'); - expect((body as any).services.discord.detail).toBe('HTTP 503'); - }); - - it('returns 200 and status ok when Discord webhook is reachable', async () => { - mockGetHealth.mockResolvedValue({ status: 'healthy' }); - mockFetch.mockResolvedValue({ ok: true, status: 200 }); - - server = await startServer({ - ...BASE_OPTIONS, - discordWebhookUrl: 'https://discord.com/api/webhooks/123/abc', + it('returns 400 for invalid JSON body', async () => { + const port = (server.address() as { port: number }).port; + const res = await new Promise<{ status: number }>((resolve, reject) => { + const req = http.request( + { hostname: '127.0.0.1', port, path: '/api/preferences/alice', method: 'PUT', + headers: { 'Content-Type': 'application/json', 'Content-Length': 8 } }, + (r) => { + r.resume(); + r.on('end', () => resolve({ status: r.statusCode! })); + } + ); + req.on('error', reject); + req.write('not-json'); + req.end(); + }); + expect(res.status).toBe(400); }); - const { status, body } = await makeRequest(server, '/health'); - - expect(status).toBe(200); - expect((body as any).status).toBe('ok'); - expect((body as any).services.discord.status).toBe('ok'); - }); - it('includes latencyMs for each checked service', async () => { - mockGetHealth.mockResolvedValue({ status: 'healthy' }); - mockFetch.mockResolvedValue({ ok: true, status: 200 }); - - server = await startServer({ - ...BASE_OPTIONS, - discordWebhookUrl: 'https://discord.com/api/webhooks/123/abc', + it('returns 400 when categories field is missing', async () => { + const res = await request(server, 'PUT', '/api/preferences/alice', { foo: 'bar' }); + expect(res.status).toBe(400); }); - const { body } = await makeRequest(server, '/health'); - - expect(typeof (body as any).services.stellarRpc.latencyMs).toBe('number'); - expect(typeof (body as any).services.discord.latencyMs).toBe('number'); - }); - - it('reports discord as not_configured when no webhook url is provided', async () => { - mockGetHealth.mockResolvedValue({ status: 'healthy' }); - - server = await startServer(BASE_OPTIONS); - const { body } = await makeRequest(server, '/health'); - - expect((body as any).services.discord).toEqual({ status: 'not_configured' }); - }); -}); - -describe('checkStellarRpc', () => { - beforeEach(() => jest.clearAllMocks()); - - it('returns ok when getHealth resolves', async () => { - mockGetHealth.mockResolvedValue({ status: 'healthy' }); - const result = await checkStellarRpc('https://soroban-testnet.stellar.org:443'); - expect(result.status).toBe('ok'); - expect(result.latencyMs).toBeGreaterThanOrEqual(0); }); - it('returns error when getHealth rejects', async () => { - mockGetHealth.mockRejectedValue(new Error('timeout')); - const result = await checkStellarRpc('https://soroban-testnet.stellar.org:443'); - expect(result.status).toBe('error'); - expect(result.detail).toBe('timeout'); - }); -}); - -describe('checkDiscord', () => { - beforeEach(() => jest.clearAllMocks()); - - it('returns ok for a 200 response', async () => { - mockFetch.mockResolvedValue({ ok: true, status: 200 }); - const result = await checkDiscord('https://discord.com/api/webhooks/123/abc'); - expect(result.status).toBe('ok'); - }); - - it('returns error for a non-ok response', async () => { - mockFetch.mockResolvedValue({ ok: false, status: 401 }); - const result = await checkDiscord('https://discord.com/api/webhooks/123/abc'); - expect(result.status).toBe('error'); - expect(result.detail).toBe('HTTP 401'); - }); - - it('returns error when fetch throws', async () => { - mockFetch.mockRejectedValue(new Error('network error')); - const result = await checkDiscord('https://discord.com/api/webhooks/123/abc'); - expect(result.status).toBe('error'); - expect(result.detail).toBe('network error'); - }); -}); - -describe('GET /api/events', () => { - let server: http.Server; - - beforeEach(() => { - jest.clearAllMocks(); - mockGetHealth.mockResolvedValue({ status: 'healthy' }); - (eventRegistry.getEvents as jest.Mock).mockReturnValue([{ id: '1' }]); - (eventRegistry.count as jest.Mock).mockReturnValue(1); - }); - - afterEach(async () => { - if (server) await closeServer(server); - }); - - it('returns events list', async () => { - server = await startServer(BASE_OPTIONS); - const { status, body } = await makeRequest(server, '/api/events'); - - expect(status).toBe(200); - expect((body as any).count).toBe(1); - expect((body as any).events).toHaveLength(1); - }); -}); - -describe('unknown routes', () => { - let server: http.Server; - - afterEach(async () => { - if (server) await closeServer(server); - }); - - it('returns 404 for unknown path', async () => { - server = await startServer(BASE_OPTIONS); - const { status, body } = await makeRequest(server, '/unknown'); - - expect(status).toBe(404); - expect((body as any).error).toBe('Not found'); + describe('unknown routes', () => { + it('returns 404 for unrecognised paths', async () => { + const res = await request(server, 'GET', '/api/unknown'); + expect(res.status).toBe(404); + }); }); }); diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index 5695de6..754d460 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -1,8 +1,8 @@ import http from 'http'; import * as StellarSDK from '@stellar/stellar-sdk'; import { eventRegistry } from '../store/event-registry'; -import { NotificationAPI } from '../services/notification-api'; -import { NotificationType } from '../types/scheduled-notification'; +import { preferenceStore } from '../store/preference-store'; +import { PreferencesUpdateInput } from '../types/preferences'; import logger from '../utils/logger'; import { generateRequestId } from '../utils/request-id'; import { RateLimitConfig } from '../types'; @@ -127,9 +127,8 @@ export function createEventsServer(options: EventsServerOptions): http.Server { const startTime = Date.now(); res.setHeader('Access-Control-Allow-Origin', corsOrigin); - res.setHeader('Access-Control-Allow-Methods', 'GET, POST, OPTIONS'); - res.setHeader('Access-Control-Allow-Headers', 'Content-Type, X-API-Key, Authorization'); - res.setHeader('X-Request-Id', requestId); + res.setHeader('Access-Control-Allow-Methods', 'GET, PUT, OPTIONS'); + res.setHeader('Access-Control-Allow-Headers', 'Content-Type'); if (req.method === 'OPTIONS') { res.writeHead(204); @@ -137,182 +136,55 @@ export function createEventsServer(options: EventsServerOptions): http.Server { return; } - const executeRoute = () => { - if (req.method === 'GET' && req.url === '/health') { - buildHealthResponse(options).then((health) => { - const httpStatus = health.status === 'error' ? 503 : 200; - res.writeHead(httpStatus, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify(health)); - }).catch((err) => { - logger.error('Health check failed unexpectedly', { error: err }); - res.writeHead(500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ status: 'error', detail: 'Internal health check failure' })); - }); - return; - } + const url = new URL(req.url ?? '/', 'http://localhost'); - if (req.method === 'GET' && req.url?.startsWith('/api/events')) { - const url = new URL(req.url, 'http://localhost'); - const limitParam = url.searchParams.get('limit'); - const limit = limitParam ? parseInt(limitParam, 10) : undefined; + // GET /api/events + if (req.method === 'GET' && url.pathname.startsWith('/api/events')) { + const limitParam = url.searchParams.get('limit'); + const limit = limitParam ? parseInt(limitParam, 10) : undefined; + const events = + limit !== undefined && !Number.isNaN(limit) + ? eventRegistry.getEvents(limit) + : eventRegistry.getEvents(); - logger.info('Handling GET /api/events', { - requestId, - limit: limit ?? 'all', - }); - - const events = - limit !== undefined && !Number.isNaN(limit) - ? eventRegistry.getEvents(limit) - : eventRegistry.getEvents(); - - res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end( - JSON.stringify({ - count: eventRegistry.count(), - events, - }) - ); - - logger.info('GET /api/events complete', { - requestId, - returned: events.length, - durationMs: Date.now() - startTime, - }); - return; - } - - // Schedule notification endpoint - if (req.method === 'POST' && req.url === '/api/schedule') { - if (!options.notificationAPI) { - res.writeHead(503, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Scheduler not enabled' })); - return; - } - - let body = ''; - req.on('data', (chunk) => { - body += chunk.toString(); - }); - - req.on('end', async () => { - try { - const data = JSON.parse(body); - - // Validate required fields - if (!data.executeAt || !data.payload || !data.targetRecipient) { - res.writeHead(400, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Missing required fields: executeAt, payload, targetRecipient' })); - return; - } - - const notificationId = await options.notificationAPI!.scheduleNotification({ - payload: data.payload, - notificationType: data.notificationType || NotificationType.DISCORD, - targetRecipient: data.targetRecipient, - executeAt: new Date(data.executeAt), - maxRetries: data.maxRetries, - priority: data.priority, - eventId: data.eventId, - contractAddress: data.contractAddress, - metadata: data.metadata, - }); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ count: eventRegistry.count(), events })); + return; + } - res.writeHead(201, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ id: notificationId })); + // GET /api/preferences/:userId + const getPrefsMatch = url.pathname.match(/^\/api\/preferences\/([^/]+)$/); + if (req.method === 'GET' && getPrefsMatch) { + const userId = decodeURIComponent(getPrefsMatch[1]); + const prefs = preferenceStore.get(userId); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(prefs)); + return; + } - logger.info('Notification scheduled via API', { - requestId, - notificationId, - executeAt: data.executeAt, - }); - } catch (error) { - logger.error('Failed to schedule notification', { error, requestId }); - res.writeHead(500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: (error as Error).message })); + // PUT /api/preferences/:userId + const putPrefsMatch = url.pathname.match(/^\/api\/preferences\/([^/]+)$/); + if (req.method === 'PUT' && putPrefsMatch) { + const userId = decodeURIComponent(putPrefsMatch[1]); + let body = ''; + req.on('data', (chunk) => { body += chunk; }); + req.on('end', () => { + try { + const input: PreferencesUpdateInput = JSON.parse(body); + if (!input || typeof input.categories !== 'object') { + res.writeHead(400, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify({ error: 'Invalid body: expected { categories: { [key]: boolean } }' })); + return; } - }); - return; - } - - // Get scheduler statistics endpoint - if (req.method === 'GET' && req.url === '/api/schedule/stats') { - if (!options.notificationAPI) { - res.writeHead(503, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Scheduler not enabled' })); - return; - } - - options.notificationAPI.getStatistics() - .then((stats) => { - res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify(stats)); - }) - .catch((error) => { - logger.error('Failed to get scheduler stats', { error, requestId }); - res.writeHead(500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: (error as Error).message })); - }); - return; - } - - // Get specific notification endpoint - if (req.method === 'GET' && req.url?.startsWith('/api/schedule/')) { - if (!options.notificationAPI) { - res.writeHead(503, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Scheduler not enabled' })); - return; - } - - const id = parseInt(req.url.split('/').pop() || '', 10); - if (isNaN(id)) { + const updated = preferenceStore.update(userId, input); + res.writeHead(200, { 'Content-Type': 'application/json' }); + res.end(JSON.stringify(updated)); + } catch { res.writeHead(400, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Invalid notification ID' })); - return; + res.end(JSON.stringify({ error: 'Invalid JSON' })); } - - options.notificationAPI.getNotification(id) - .then((notification) => { - if (!notification) { - res.writeHead(404, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Notification not found' })); - return; - } - res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify(notification)); - }) - .catch((error) => { - logger.error('Failed to get notification', { error, requestId, id }); - res.writeHead(500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: (error as Error).message })); - }); - return; - } - - logger.warn('Unhandled request', { - requestId, - method: req.method, - url: req.url, }); - - res.writeHead(404, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Not found' })); - }; - - if (rateLimiter && req.url?.startsWith('/api/')) { - rateLimiter.handle(req, res, requestId) - .then((allowed) => { - if (allowed) { - executeRoute(); - } - }) - .catch((err) => { - logger.error('Rate limiter execution error', { error: err, requestId }); - res.writeHead(500, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ error: 'Internal server error' })); - }); - } else { - executeRoute(); + return; } }); diff --git a/listener/src/services/event-subscriber.test.ts b/listener/src/services/event-subscriber.test.ts index c757598..a5b36c6 100644 --- a/listener/src/services/event-subscriber.test.ts +++ b/listener/src/services/event-subscriber.test.ts @@ -35,6 +35,12 @@ jest.mock('./discord-notification', () => ({ DiscordNotificationService: jest.fn().mockImplementation(() => mockDiscordService), })); +jest.mock('../store/preference-store', () => ({ + preferenceStore: { + isCategoryEnabled: jest.fn().mockReturnValue(true), + }, +})); + const mockLogger = logger as jest.Mocked; const contractConfig: ContractConfig = { @@ -557,4 +563,74 @@ describe('EventSubscriber', () => { ); }); }); + + describe('notification preferences gate', () => { + const discordConfig = { + webhookUrl: 'https://discord.com/api/webhooks/test/webhook', + webhookId: 'test', + }; + const configWithDiscord: Config = { ...testConfig, discord: discordConfig }; + + beforeEach(() => { + const { DiscordNotificationService } = jest.requireMock('./discord-notification'); + DiscordNotificationService.mockImplementation(() => mockDiscordService); + mockDiscordService.sendEventNotification.mockResolvedValue(true); + mockGetEvents.mockResolvedValue({ + events: [createMockEvent({ id: 'pref-event' })], + cursor: 'cursor-pref', + }); + }); + + it('skips Discord notification when discord category is disabled for the user', async () => { + const { preferenceStore } = jest.requireMock('../store/preference-store'); + preferenceStore.isCategoryEnabled.mockReturnValue(false); + + const subscriber = new EventSubscriber(configWithDiscord); + await (subscriber as any).checkForEvents(); + + expect(mockDiscordService.sendEventNotification).not.toHaveBeenCalled(); + expect(mockLogger.info).toHaveBeenCalledWith( + 'Skipping Discord notification: category disabled by user preferences', + expect.objectContaining({ eventId: 'pref-event' }) + ); + }); + + it('sends Discord notification when discord category is enabled', async () => { + const { preferenceStore } = jest.requireMock('../store/preference-store'); + preferenceStore.isCategoryEnabled.mockReturnValue(true); + mockDiscordService.sendEventNotification.mockResolvedValue(true); + + const subscriber = new EventSubscriber(configWithDiscord); + await (subscriber as any).checkForEvents(); + + expect(mockDiscordService.sendEventNotification).toHaveBeenCalled(); + }); + + it('uses contractConfig.userId when present', async () => { + const { preferenceStore } = jest.requireMock('../store/preference-store'); + preferenceStore.isCategoryEnabled.mockReturnValue(true); + mockDiscordService.sendEventNotification.mockResolvedValue(true); + + const configWithUserId: Config = { + ...configWithDiscord, + contractAddresses: [{ ...contractConfig, userId: 'alice' }], + }; + + const subscriber = new EventSubscriber(configWithUserId); + await (subscriber as any).checkForEvents(); + + expect(preferenceStore.isCategoryEnabled).toHaveBeenCalledWith('alice', 'discord'); + }); + + it('defaults to "global" userId when contractConfig.userId is absent', async () => { + const { preferenceStore } = jest.requireMock('../store/preference-store'); + preferenceStore.isCategoryEnabled.mockReturnValue(true); + mockDiscordService.sendEventNotification.mockResolvedValue(true); + + const subscriber = new EventSubscriber(configWithDiscord); + await (subscriber as any).checkForEvents(); + + expect(preferenceStore.isCategoryEnabled).toHaveBeenCalledWith('global', 'discord'); + }); + }); }); diff --git a/listener/src/services/event-subscriber.ts b/listener/src/services/event-subscriber.ts index 2056163..d52194e 100644 --- a/listener/src/services/event-subscriber.ts +++ b/listener/src/services/event-subscriber.ts @@ -1,6 +1,7 @@ import * as StellarSDK from '@stellar/stellar-sdk'; import { Config, ContractConfig } from '../types'; import { eventRegistry } from '../store/event-registry'; +import { preferenceStore } from '../store/preference-store'; import logger from '../utils/logger'; import { generateRequestId } from '../utils/request-id'; import { @@ -205,6 +206,15 @@ export class EventSubscriber { }); if (this.discordService) { + const userId = contractConfig.userId ?? 'global'; + if (!preferenceStore.isCategoryEnabled(userId, 'discord')) { + logger.info('Skipping Discord notification: category disabled by user preferences', { + eventId: event.id, + userId, + }); + return; + } + const success = await this.discordService.sendEventNotification( event, contractConfig, diff --git a/listener/src/store/preference-store.test.ts b/listener/src/store/preference-store.test.ts new file mode 100644 index 0000000..a64d31a --- /dev/null +++ b/listener/src/store/preference-store.test.ts @@ -0,0 +1,77 @@ +import { PreferenceStore } from './preference-store'; + +describe('PreferenceStore', () => { + let store: PreferenceStore; + + beforeEach(() => { + store = new PreferenceStore(); + }); + + describe('get', () => { + it('returns default preferences for a new user', () => { + const prefs = store.get('user-1'); + expect(prefs.userId).toBe('user-1'); + expect(prefs.categories.discord).toBe(true); + expect(typeof prefs.updatedAt).toBe('number'); + }); + + it('returns a copy so mutations do not affect stored state', () => { + const prefs = store.get('user-1'); + prefs.categories.discord = false; + expect(store.get('user-1').categories.discord).toBe(true); + }); + }); + + describe('update', () => { + it('disables a notification category', () => { + store.update('user-1', { categories: { discord: false } }); + expect(store.get('user-1').categories.discord).toBe(false); + }); + + it('re-enables a disabled category', () => { + store.update('user-1', { categories: { discord: false } }); + store.update('user-1', { categories: { discord: true } }); + expect(store.get('user-1').categories.discord).toBe(true); + }); + + it('merges categories without removing unrelated ones', () => { + store.update('user-1', { categories: { discord: true, email: true } }); + store.update('user-1', { categories: { discord: false } }); + expect(store.get('user-1').categories.email).toBe(true); + expect(store.get('user-1').categories.discord).toBe(false); + }); + + it('updates updatedAt timestamp', async () => { + const before = store.get('user-1').updatedAt; + await new Promise((r) => setTimeout(r, 5)); + store.update('user-1', { categories: { discord: false } }); + expect(store.get('user-1').updatedAt).toBeGreaterThan(before); + }); + + it('persists changes across get calls', () => { + store.update('user-2', { categories: { discord: false } }); + expect(store.get('user-2').categories.discord).toBe(false); + expect(store.get('user-2').categories.discord).toBe(false); + }); + }); + + describe('isCategoryEnabled', () => { + it('returns true for the default discord category', () => { + expect(store.isCategoryEnabled('user-1', 'discord')).toBe(true); + }); + + it('returns false after disabling the discord category', () => { + store.update('user-1', { categories: { discord: false } }); + expect(store.isCategoryEnabled('user-1', 'discord')).toBe(false); + }); + + it('returns true for an unknown category (default enabled)', () => { + expect(store.isCategoryEnabled('user-1', 'unknown-channel')).toBe(true); + }); + + it('isolates preferences between users', () => { + store.update('user-a', { categories: { discord: false } }); + expect(store.isCategoryEnabled('user-b', 'discord')).toBe(true); + }); + }); +}); diff --git a/listener/src/store/preference-store.ts b/listener/src/store/preference-store.ts new file mode 100644 index 0000000..cec143d --- /dev/null +++ b/listener/src/store/preference-store.ts @@ -0,0 +1,40 @@ +import { UserPreferences, PreferencesUpdateInput } from '../types/preferences'; + +export class PreferenceStore { + private store = new Map(); + + /** Returns preferences for userId, creating defaults if absent */ + get(userId: string): UserPreferences { + if (!this.store.has(userId)) { + const defaults: UserPreferences = { + userId, + categories: { discord: true }, + updatedAt: Date.now(), + }; + this.store.set(userId, defaults); + } + const stored = this.store.get(userId)!; + return { ...stored, categories: { ...stored.categories } }; + } + + /** Merges category updates, returns updated preferences */ + update(userId: string, input: PreferencesUpdateInput): UserPreferences { + const existing = this.get(userId); + const updated: UserPreferences = { + ...existing, + categories: { ...existing.categories, ...input.categories }, + updatedAt: Date.now(), + }; + this.store.set(userId, updated); + return { ...updated }; + } + + /** Returns true if the given category is enabled for userId */ + isCategoryEnabled(userId: string, category: string): boolean { + const prefs = this.get(userId); + // If the category has never been set, default to enabled + return prefs.categories[category] !== false; + } +} + +export const preferenceStore = new PreferenceStore(); diff --git a/listener/src/types/index.ts b/listener/src/types/index.ts index ed4849b..bd0ef3e 100644 --- a/listener/src/types/index.ts +++ b/listener/src/types/index.ts @@ -1,6 +1,8 @@ export interface ContractConfig { address: string; events: string[]; + /** Optional user ID for per-user notification preference gating */ + userId?: string; } export interface DiscordConfig { diff --git a/listener/src/types/preferences.ts b/listener/src/types/preferences.ts new file mode 100644 index 0000000..482a272 --- /dev/null +++ b/listener/src/types/preferences.ts @@ -0,0 +1,12 @@ +export type NotificationCategory = 'discord' | string; + +export interface UserPreferences { + userId: string; + /** Map of notification category → enabled flag */ + categories: Record; + updatedAt: number; +} + +export interface PreferencesUpdateInput { + categories: Record; +}