From da912a33ced78c98181f7235de09682541eae488 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 17:19:17 +0000 Subject: [PATCH 1/8] Add consumer projection persistence support --- docs/api.md | 21 +++++++++++ docs/consumers.md | 26 +++++++++++++ src/Consumer.js | 86 ++++++++++++++++++++++++++++++++++++++++++- test/Consumer.spec.js | 42 +++++++++++++++++++++ 4 files changed, 173 insertions(+), 2 deletions(-) diff --git a/docs/api.md b/docs/api.md index 7791dc1a..26c80309 100644 --- a/docs/api.md +++ b/docs/api.md @@ -260,6 +260,27 @@ Asynchronously scan all consumer state files and return their identifiers. --- +#### `consumer.createProjection(projectionFn, [options])` + +```javascript +consumer.createProjection(projectionFn [, options]) +``` + +Register a reducer-style projection as the consumer's `'data'` handler and persist it so it is auto-restored when reopening the same consumer. + +`projectionFn` can be either: + +- a reducer function: `(state, event) => state` +- an object map: `{ [eventType]: (state, event) => state }` + +Options: + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `options.hmac` | `function(string): string` | storage HMAC | HMAC function used to sign/verify serialized function projections. Required for trusted function projection restore. | + +--- + ### Events emitted | Event | Payload | Description | diff --git a/docs/consumers.md b/docs/consumers.md index 9b24b4b7..e27ea5d1 100644 --- a/docs/consumers.md +++ b/docs/consumers.md @@ -69,6 +69,32 @@ consumer.setState({ count: 0 }); consumer.setState((state) => ({ ...state, count: state.count + 1 })); ``` +## Projections + +Instead of registering `'data'` manually, you can register a projection reducer via `createProjection`. +The reducer is persisted and automatically reloaded when the same consumer is reopened. + +```javascript +import crypto from 'crypto'; + +const hmac = (code) => crypto.createHmac('sha256', 'your-private-secret').update(code).digest('hex'); + +const consumer = eventstore.getConsumer('orders', 'orders-projection', { total: 0 }); +consumer.createProjection( + (state, event) => ({ ...state, total: state.total + (event.amount || 0) }), + { hmac } +); +``` + +You can also pass per-event reducers: + +```javascript +consumer.createProjection({ + OrderCreated: (state, event) => ({ ...state, orders: [...state.orders, event] }), + OrderCancelled: (state, event) => ({ ...state, cancelled: state.cancelled + 1 }) +}, { hmac }); +``` + ## Resetting a Consumer Force the consumer to reprocess events from a given position: diff --git a/src/Consumer.js b/src/Consumer.js index 9b2e813d..faefe7b8 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -3,6 +3,7 @@ import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; import { ensureDirectory } from './utils/fsUtil.js'; +import { buildMetadataForMatcher, buildMatcherFromMetadata } from './utils/metadataUtil.js'; import Storage from './Storage/ReadableStorage.js'; const MAX_CATCHUP_BATCH = 10; @@ -33,7 +34,7 @@ class Consumer extends stream.Readable { * @param {object} [initialState={}] The initial state of the consumer. * @param {number} [startFrom=0] The revision to start from within the index to consume. */ - constructor(storage, indexName, identifier, initialState = {}, startFrom = 0) { + constructor(storage, indexName, identifier, initialState = {}, startFrom = 0, options = {}) { super({ objectMode: true }); assert(storage instanceof Storage, 'Must provide a storage for the consumer.'); @@ -42,6 +43,7 @@ class Consumer extends stream.Readable { this.initializeStorage(storage, indexName, identifier); this.restoreState(initialState, startFrom); + this.restoreProjection(options); this.handler = this.handleNewDocument.bind(this); this.on('error', () => (this.handleDocument = false)); } @@ -58,6 +60,7 @@ class Consumer extends stream.Readable { this.indexName = indexName; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); + this.projectionFileName = this.fileName + '.projection'; if (ensureDirectory(consumerDirectory)) { this.cleanUpFailedWrites(); } @@ -72,7 +75,8 @@ class Consumer extends stream.Readable { const consumerDirectory = path.dirname(this.fileName); const files = fs.readdirSync(consumerDirectory); for (let file of files) { - if (file.startsWith(consumerNamePrefix)) { + const suffix = file.slice(consumerNamePrefix.length); + if (file.startsWith(consumerNamePrefix) && /^\d+$/.test(suffix)) { safeUnlink(path.join(consumerDirectory, file)); } } @@ -106,6 +110,84 @@ class Consumer extends stream.Readable { this.consuming = false; } + /** + * Restore a persisted projection and register it as data handler. + * @private + * @param {object} [options={}] + * @param {function(string): string} [options.hmac] + */ + restoreProjection(options = {}) { + this.projection = null; + this.projectionHandler = null; + if (!this.projectionFileName || !fs.existsSync(this.projectionFileName)) { + return; + } + const projectionMetadata = JSON.parse(fs.readFileSync(this.projectionFileName, 'utf8')); + const hmac = options.hmac || this.storage.hmac; + if (typeof projectionMetadata.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore a function projection.'); + } + const projection = buildMatcherFromMetadata(projectionMetadata, hmac); + this.registerProjection(projection); + } + + /** + * Register a projection function as `data` event handler. + * @private + * @param {function|object} projection + */ + registerProjection(projection) { + if (this.projectionHandler) { + this.removeListener('data', this.projectionHandler); + } + this.projection = projection; + this.projectionHandler = (event) => { + let reducer = projection; + if (typeof projection === 'object') { + const type = event?.type || event?.payload?.type; + reducer = projection[type]; + if (typeof reducer !== 'function') { + return; + } + } + this.setState(reducer(this.state, event)); + }; + this.on('data', this.projectionHandler); + } + + /** + * Create and persist a projection reducer and register it as data handler. + * + * @api + * @param {function(object, object): object|object} projectionFn + * @param {object} [options] + * @param {function(string): string} [options.hmac] Required for function projections. + */ + createProjection(projectionFn, options = {}) { + assert((typeof projectionFn === 'function') || (projectionFn && typeof projectionFn === 'object' && !Array.isArray(projectionFn)), 'Projection must be a reducer function or an object map of reducer functions.'); + if (typeof projectionFn === 'object') { + for (const reducer of Object.values(projectionFn)) { + assert(typeof reducer === 'function', 'Projection object values must be reducer functions.'); + } + } + const hmac = options.hmac || this.storage.hmac; + if (typeof projectionFn === 'function') { + assert(typeof hmac === 'function', 'Must provide options.hmac for function projections.'); + } + + const projectionMetadata = buildMetadataForMatcher(projectionFn, hmac); + const tmpProjectionFileName = this.projectionFileName + '.tmp'; + try { + fs.writeFileSync(tmpProjectionFileName, JSON.stringify(projectionMetadata), 'utf8'); + fs.renameSync(tmpProjectionFileName, this.projectionFileName); + } catch (e) { + safeUnlink(tmpProjectionFileName); + throw e; + } + + this.registerProjection(projectionFn); + } + /** * Update the state of this consumer transactionally with the position. * May only be called from within the document handling callback. diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index a5de02b7..b3f10e6e 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -3,6 +3,7 @@ import fs from 'fs-extra'; import fsNative from 'fs'; import Storage from '../src/Storage.js'; import Consumer from '../src/Consumer.js'; +import { createHmac } from '../src/utils/metadataUtil.js'; import { fileURLToPath } from 'url'; const __dirname = fileURLToPath(new URL('.', import.meta.url)); @@ -519,6 +520,47 @@ describe('Consumer', function() { }); }); + it('can create projections from a reducer function', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-projection', { count: 0 }); + consumer.createProjection((state, event) => ({ ...state, count: state.count + event.id }), { hmac: createHmac('test-secret') }); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(6); + done(); + }); + + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('can create projections from event-type reducer maps and restore them on reopen', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', { count: 0 }); + consumer.createProjection({ + Foobar: (state, event) => ({ ...state, count: state.count + event.id }), + Bazinga: (state) => state + }, { hmac: createHmac('test-secret') }); + + consumer.on('caught-up', () => { + consumer.stop(); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-map'); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(10); + done(); + }); + storage.write({ type: 'Foobar', id: 4 }); + }); + + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('throws if function projection is restored without trusted hmac', function() { + consumer = new Consumer(storage, 'foobar', 'consumer-projection-hmac'); + consumer.createProjection((state, event) => ({ ...state, lastId: event.id }), { hmac: createHmac('test-secret') }); + expect(() => new Consumer(storage, 'foobar', 'consumer-projection-hmac', {}, 0, { hmac: createHmac('wrong-secret') })).to.throwError(/Invalid HMAC/); + }); + it('can build consistency guards (aggregates)', function(done) { const guard = new Consumer(storage, 'foobar', 'unique-bar-guard'); guard.apply = function(event) { From e8095680483bc743b365b3e0a4f3ba084c0c393a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 29 May 2026 17:25:45 +0000 Subject: [PATCH 2/8] Implement durable consumer projections with HMAC validation --- docs/api.md | 2 +- src/Consumer.js | 54 +++++++++++++++++++++++++++++++++---------- test/Consumer.spec.js | 9 ++++---- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/docs/api.md b/docs/api.md index 26c80309..9e12183d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -277,7 +277,7 @@ Options: | Parameter | Type | Default | Description | |-----------|------|---------|-------------| -| `options.hmac` | `function(string): string` | storage HMAC | HMAC function used to sign/verify serialized function projections. Required for trusted function projection restore. | +| `options.hmac` | `function(string): string` | storage HMAC | HMAC function used to sign/verify serialized projections. Required when no storage-level HMAC is configured. | --- diff --git a/src/Consumer.js b/src/Consumer.js index faefe7b8..51092eb3 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -71,12 +71,17 @@ class Consumer extends stream.Readable { * @private */ cleanUpFailedWrites() { - const consumerNamePrefix = path.basename(this.fileName) + '.'; + const consumerBaseName = path.basename(this.fileName); + const projectionBaseName = consumerBaseName + '.projection'; + const escapedConsumerBaseName = consumerBaseName.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); + const failedStateFilePattern = new RegExp(`^${escapedConsumerBaseName}\\.\\d+$`); const consumerDirectory = path.dirname(this.fileName); const files = fs.readdirSync(consumerDirectory); for (let file of files) { - const suffix = file.slice(consumerNamePrefix.length); - if (file.startsWith(consumerNamePrefix) && /^\d+$/.test(suffix)) { + if (file === projectionBaseName) { + continue; + } + if (file === projectionBaseName + '.tmp' || failedStateFilePattern.test(file)) { safeUnlink(path.join(consumerDirectory, file)); } } @@ -124,13 +129,33 @@ class Consumer extends stream.Readable { } const projectionMetadata = JSON.parse(fs.readFileSync(this.projectionFileName, 'utf8')); const hmac = options.hmac || this.storage.hmac; - if (typeof projectionMetadata.matcher === 'string') { - assert(typeof hmac === 'function', 'Must provide options.hmac to restore a function projection.'); - } - const projection = buildMatcherFromMetadata(projectionMetadata, hmac); + const projection = this.deserializeProjectionMetadata(projectionMetadata, hmac); this.registerProjection(projection); } + /** + * @private + * @param {{kind?: string, projection?: object, matcher?: string|object, hmac?: string}} metadata + * @param {function(string): string} hmac + * @returns {function|object} + */ + deserializeProjectionMetadata(metadata, hmac) { + assert(metadata && typeof metadata === 'object', 'Invalid projection metadata.'); + if (metadata.kind === 'map') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore mapped function projections.'); + const projectionMap = {}; + for (const [eventType, reducerMetadata] of Object.entries(metadata.projection || {})) { + projectionMap[eventType] = buildMatcherFromMetadata(reducerMetadata, hmac); + } + return projectionMap; + } + assert(metadata.kind === 'function', 'Invalid projection metadata kind.'); + if (typeof metadata.projection?.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); + } + return buildMatcherFromMetadata(metadata.projection, hmac); + } + /** * Register a projection function as `data` event handler. * @private @@ -144,6 +169,7 @@ class Consumer extends stream.Readable { this.projectionHandler = (event) => { let reducer = projection; if (typeof projection === 'object') { + // Direct Storage consumers emit `{ type }`, EventStore consumers emit `{ payload: { type } }`. const type = event?.type || event?.payload?.type; reducer = projection[type]; if (typeof reducer !== 'function') { @@ -171,11 +197,15 @@ class Consumer extends stream.Readable { } } const hmac = options.hmac || this.storage.hmac; - if (typeof projectionFn === 'function') { - assert(typeof hmac === 'function', 'Must provide options.hmac for function projections.'); - } - - const projectionMetadata = buildMetadataForMatcher(projectionFn, hmac); + assert(typeof hmac === 'function', 'Must provide options.hmac for projections.'); + const projectionMetadata = (typeof projectionFn === 'function') + ? { kind: 'function', projection: buildMetadataForMatcher(projectionFn, hmac) } + : { + kind: 'map', + projection: Object.fromEntries( + Object.entries(projectionFn).map(([eventType, reducer]) => [eventType, buildMetadataForMatcher(reducer, hmac)]) + ) + }; const tmpProjectionFileName = this.projectionFileName + '.tmp'; try { fs.writeFileSync(tmpProjectionFileName, JSON.stringify(projectionMetadata), 'utf8'); diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index b3f10e6e..db669117 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -542,10 +542,11 @@ describe('Consumer', function() { consumer.on('caught-up', () => { consumer.stop(); - consumer = new Consumer(storage, 'foobar', 'consumer-projection-map'); - consumer.on('caught-up', () => { - expect(consumer.state.count).to.be(10); - done(); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', {}, 0, { hmac: createHmac('test-secret') }); + consumer.on('progress', () => { + if (consumer.state.count === 10) { + done(); + } }); storage.write({ type: 'Foobar', id: 4 }); }); From 2e6f8498752454fc47cf64ecb1608ac765eff617 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 30 May 2026 20:01:39 +0000 Subject: [PATCH 3/8] Refactor projections into Projection class and integrate Consumer/EventStore APIs --- docs/api.md | 40 ++++++ docs/consumers.md | 30 +++-- index.js | 1 + src/Consumer.js | 99 +++++--------- src/EventStore.js | 35 ++++- src/Projection.js | 276 ++++++++++++++++++++++++++++++++++++++++ test/Consumer.spec.js | 43 +++++++ test/EventStore.spec.js | 71 +++++++++++ 8 files changed, 520 insertions(+), 75 deletions(-) create mode 100644 src/Projection.js diff --git a/docs/api.md b/docs/api.md index 9e12183d..4db01f3d 100644 --- a/docs/api.md +++ b/docs/api.md @@ -281,6 +281,46 @@ Options: --- +#### `eventstore.getProjection(name, [definition], [options])` + +```javascript +eventstore.getProjection(name [, definition [, options]]) → Projection +``` + +Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), or restore a previously persisted one when `definition` is omitted. + +`definition` shape: + +```javascript +{ + initialState: any, + handlers: (state, event) => state | { [eventType]: (state, event) => state }, + matcher: object|function // optional +} +``` + +--- + +#### `consumer.project(projection, [options])` + +```javascript +consumer.project(projection [, options]) +``` + +Attach a `Projection` instance to a durable consumer and persist its definition for automatic restore when reopening the same consumer. + +--- + +#### `projection.subscribe(consumer, [options])` + +```javascript +projection.subscribe(consumer [, options]) +``` + +Alias for `consumer.project(projection, options)`. + +--- + ### Events emitted | Event | Payload | Description | diff --git a/docs/consumers.md b/docs/consumers.md index e27ea5d1..a6ced869 100644 --- a/docs/consumers.md +++ b/docs/consumers.md @@ -71,22 +71,25 @@ consumer.setState((state) => ({ ...state, count: state.count + 1 })); ## Projections -Instead of registering `'data'` manually, you can register a projection reducer via `createProjection`. -The reducer is persisted and automatically reloaded when the same consumer is reopened. +Use a `Projection` to define *how* events are projected into state, then connect it to a `Consumer` for durable continuous updates. ```javascript import crypto from 'crypto'; const hmac = (code) => crypto.createHmac('sha256', 'your-private-secret').update(code).digest('hex'); -const consumer = eventstore.getConsumer('orders', 'orders-projection', { total: 0 }); -consumer.createProjection( - (state, event) => ({ ...state, total: state.total + (event.amount || 0) }), - { hmac } -); +const projection = eventstore.getProjection('orders-total', { + initialState: { total: 0 }, + handlers: { + OrderCreated: (state, event) => ({ ...state, total: state.total + (event.payload.amount || 0) }) + } +}, { hmac }); + +const consumer = eventstore.getConsumer('orders', 'orders-projection', projection.initialState); +consumer.project(projection); ``` -You can also pass per-event reducers: +You can still use `consumer.createProjection(...)` for the shorter in-place API: ```javascript consumer.createProjection({ @@ -95,6 +98,17 @@ consumer.createProjection({ }, { hmac }); ``` +Projections are composable via `CompositeProjection`: + +```javascript +import { CompositeProjection } from 'event-storage'; + +const overview = new CompositeProjection('overview', { + count: { initialState: 0, handlers: { OrderCreated: (state) => state + 1 } }, + last: { initialState: null, handlers: { OrderCreated: (state, event) => event.payload } } +}); +``` + ## Resetting a Consumer Force the consumer to reprocess events from a given position: diff --git a/index.js b/index.js index 4cfbb2de..0253e0ab 100644 --- a/index.js +++ b/index.js @@ -3,3 +3,4 @@ export { default as EventStream } from './src/EventStream.js'; export { default as Storage, StorageLockedError } from './src/Storage.js'; export { default as Index } from './src/Index.js'; export { default as Consumer } from './src/Consumer.js'; +export { default as Projection, CompositeProjection } from './src/Projection.js'; diff --git a/src/Consumer.js b/src/Consumer.js index 51092eb3..948f0bad 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -3,7 +3,7 @@ import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; import { ensureDirectory } from './utils/fsUtil.js'; -import { buildMetadataForMatcher, buildMatcherFromMetadata } from './utils/metadataUtil.js'; +import Projection from './Projection.js'; import Storage from './Storage/ReadableStorage.js'; const MAX_CATCHUP_BATCH = 10; @@ -58,6 +58,7 @@ class Consumer extends stream.Readable { this.storage = storage; this.index = this.storage.openIndex(indexName); this.indexName = indexName; + this.identifier = identifier; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); this.projectionFileName = this.fileName + '.projection'; @@ -127,58 +128,39 @@ class Consumer extends stream.Readable { if (!this.projectionFileName || !fs.existsSync(this.projectionFileName)) { return; } - const projectionMetadata = JSON.parse(fs.readFileSync(this.projectionFileName, 'utf8')); - const hmac = options.hmac || this.storage.hmac; - const projection = this.deserializeProjectionMetadata(projectionMetadata, hmac); - this.registerProjection(projection); - } - - /** - * @private - * @param {{kind?: string, projection?: object, matcher?: string|object, hmac?: string}} metadata - * @param {function(string): string} hmac - * @returns {function|object} - */ - deserializeProjectionMetadata(metadata, hmac) { - assert(metadata && typeof metadata === 'object', 'Invalid projection metadata.'); - if (metadata.kind === 'map') { - assert(typeof hmac === 'function', 'Must provide options.hmac to restore mapped function projections.'); - const projectionMap = {}; - for (const [eventType, reducerMetadata] of Object.entries(metadata.projection || {})) { - projectionMap[eventType] = buildMatcherFromMetadata(reducerMetadata, hmac); - } - return projectionMap; - } - assert(metadata.kind === 'function', 'Invalid projection metadata kind.'); - if (typeof metadata.projection?.matcher === 'string') { - assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); - } - return buildMatcherFromMetadata(metadata.projection, hmac); + const projection = Projection.restoreFromFile(this.projectionFileName, { + hmac: options.hmac || this.storage.hmac, + typeAccessor: options.typeAccessor + }); + this.project(projection, { persist: false }); } /** * Register a projection function as `data` event handler. * @private - * @param {function|object} projection + * @param {Projection} projection + * @param {object} [options] + * @param {boolean} [options.persist=true] + * @param {function(string): string} [options.hmac] */ - registerProjection(projection) { + project(projection, options = {}) { + assert(projection instanceof Projection, 'Projection must be an instance of Projection.'); + const { persist = true, hmac } = options; if (this.projectionHandler) { this.removeListener('data', this.projectionHandler); } this.projection = projection; this.projectionHandler = (event) => { - let reducer = projection; - if (typeof projection === 'object') { - // Direct Storage consumers emit `{ type }`, EventStore consumers emit `{ payload: { type } }`. - const type = event?.type || event?.payload?.type; - reducer = projection[type]; - if (typeof reducer !== 'function') { - return; - } - } - this.setState(reducer(this.state, event)); + this.setState(projection.apply(this.state, event)); }; this.on('data', this.projectionHandler); + if (persist) { + projection.persist({ + fileName: this.projectionFileName, + hmac: hmac || projection.hmac || this.storage.hmac + }); + } + return projection; } /** @@ -190,32 +172,17 @@ class Consumer extends stream.Readable { * @param {function(string): string} [options.hmac] Required for function projections. */ createProjection(projectionFn, options = {}) { - assert((typeof projectionFn === 'function') || (projectionFn && typeof projectionFn === 'object' && !Array.isArray(projectionFn)), 'Projection must be a reducer function or an object map of reducer functions.'); - if (typeof projectionFn === 'object') { - for (const reducer of Object.values(projectionFn)) { - assert(typeof reducer === 'function', 'Projection object values must be reducer functions.'); - } - } - const hmac = options.hmac || this.storage.hmac; - assert(typeof hmac === 'function', 'Must provide options.hmac for projections.'); - const projectionMetadata = (typeof projectionFn === 'function') - ? { kind: 'function', projection: buildMetadataForMatcher(projectionFn, hmac) } - : { - kind: 'map', - projection: Object.fromEntries( - Object.entries(projectionFn).map(([eventType, reducer]) => [eventType, buildMetadataForMatcher(reducer, hmac)]) - ) - }; - const tmpProjectionFileName = this.projectionFileName + '.tmp'; - try { - fs.writeFileSync(tmpProjectionFileName, JSON.stringify(projectionMetadata), 'utf8'); - fs.renameSync(tmpProjectionFileName, this.projectionFileName); - } catch (e) { - safeUnlink(tmpProjectionFileName); - throw e; - } - - this.registerProjection(projectionFn); + const projection = projectionFn instanceof Projection + ? projectionFn + : new Projection(options.name || this.identifier, { + initialState: this.state, + matcher: options.matcher, + handlers: projectionFn + }, { + hmac: options.hmac || this.storage.hmac, + typeAccessor: options.typeAccessor + }); + return this.project(projection, options); } /** diff --git a/src/EventStore.js b/src/EventStore.js index 80ae74ec..035feb25 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -6,6 +6,7 @@ import events from 'events'; import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js'; import Index from './Index.js'; import Consumer from './Consumer.js'; +import Projection from './Projection.js'; import { assert, getPropertyAtPath } from './utils/util.js'; import { ensureDirectory, scanForFiles } from './utils/fsUtil.js'; import { buildTypeMatcherFn } from './utils/metadataUtil.js'; @@ -784,12 +785,44 @@ class EventStore extends events.EventEmitter { if (this.consumers.has(identifier)) { return this.consumers.get(identifier); } - const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since); + const projectionTypeAccessor = this.typeAccessor + ? (event) => this.typeAccessor(event?.payload || event) + : undefined; + const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since, { + hmac: this.storage.hmac, + typeAccessor: projectionTypeAccessor + }); consumer.streamName = streamName; this.consumers.set(identifier, consumer); return consumer; } + /** + * Get or create a projection with EventStore defaults. + * + * @param {string} name Projection name. + * @param {object} [definition] Projection definition. + * @param {object} [options] Projection options. + * @returns {Projection} + */ + getProjection(name, definition, options = {}) { + assert(typeof name === 'string' && name !== '', 'Must provide a projection name.'); + const projectionTypeAccessor = this.typeAccessor + ? (event) => this.typeAccessor(event?.payload || event) + : options.typeAccessor; + const projectionFileName = path.join(this.storage.indexDirectory, 'projections', this.storage.storageFile + '.' + name + '.projection'); + const projectionOptions = { + ...options, + fileName: options.fileName || projectionFileName, + hmac: options.hmac || this.storage.hmac, + typeAccessor: projectionTypeAccessor + }; + if (definition) { + return new Projection(name, definition, projectionOptions); + } + return Projection.restore(name, projectionOptions); + } + /** * Scan the existing consumers on this EventStore and asynchronously invoke a callback with the parsed list. * diff --git a/src/Projection.js b/src/Projection.js new file mode 100644 index 00000000..7b580b65 --- /dev/null +++ b/src/Projection.js @@ -0,0 +1,276 @@ +import fs from 'fs'; +import path from 'path'; +import { assert } from './utils/util.js'; +import { ensureDirectory } from './utils/fsUtil.js'; +import { buildMatcherFromMetadata, buildMetadataForMatcher, matches } from './utils/metadataUtil.js'; + +const DEFAULT_TYPE_ACCESSOR = (event) => event?.type || event?.payload?.type; + +const safeUnlink = (filename) => { + try { + fs.unlinkSync(filename); + } catch (e) { + if (e.code !== "ENOENT") { + throw e; + } + } +}; + +class Projection { + + constructor(name, definition = {}, options = {}) { + assert(typeof name === 'string' && name !== '', 'Projection must have a name.'); + const { initialState = {}, handlers, matcher } = definition; + assert((typeof handlers === 'function') || (handlers && typeof handlers === 'object' && !Array.isArray(handlers)), 'Projection handlers must be a function or an object map of functions.'); + if (typeof handlers === 'object') { + for (const reducer of Object.values(handlers)) { + assert(typeof reducer === 'function', 'Projection handler maps must contain reducer functions.'); + } + } + this.name = name; + this.initialState = Object.freeze(initialState); + this.handlers = handlers; + this.matcher = matcher; + this.hmac = options.hmac || null; + this.typeAccessor = options.typeAccessor || DEFAULT_TYPE_ACCESSOR; + this.fileName = options.fileName || null; + this.state = this.initialState; + } + + get types() { + if (typeof this.handlers === 'function') { + return []; + } + return Object.keys(this.handlers); + } + + apply(state, event) { + if (!this.matches(event)) { + this.state = state; + return state; + } + let reducer = this.handlers; + if (typeof this.handlers === 'object') { + reducer = this.handlers[this.typeAccessor(event)]; + if (typeof reducer !== 'function') { + this.state = state; + return state; + } + } + const nextState = reducer(state, event); + this.state = nextState; + return nextState; + } + + handle(stream) { + this.reset(); + for (const event of stream) { + this.state = this.apply(this.state, event); + } + return this.state; + } + + reset() { + this.state = this.initialState; + return this.state; + } + + matches(event) { + if (!this.matcher) { + return true; + } + if (typeof this.matcher === 'function') { + return this.matcher(event); + } + return matches(event, this.matcher); + } + + subscribe(consumer, options = {}) { + assert(consumer && typeof consumer.project === 'function', 'Projection.subscribe expects a Consumer instance.'); + consumer.project(this, options); + return this; + } + + persist(options = {}) { + const hmac = options.hmac || this.hmac; + const fileName = options.fileName || this.fileName || `${this.name}.projection`; + const metadata = this.toMetadata(hmac); + const tmpFile = fileName + '.tmp'; + ensureDirectory(path.dirname(fileName)); + try { + fs.writeFileSync(tmpFile, JSON.stringify(metadata), 'utf8'); + fs.renameSync(tmpFile, fileName); + } catch (e) { + safeUnlink(tmpFile); + throw e; + } + this.fileName = fileName; + this.hmac = hmac; + return fileName; + } + + toMetadata(hmac = this.hmac) { + const serializeFn = (fn) => { + assert(typeof hmac === 'function', 'Must provide options.hmac for function projections.'); + return buildMetadataForMatcher(fn, hmac); + }; + const matcherMetadata = this.matcher ? ( + typeof this.matcher === 'function' ? serializeFn(this.matcher) : buildMetadataForMatcher(this.matcher, hmac) + ) : null; + const handlersMetadata = (typeof this.handlers === 'function') + ? serializeFn(this.handlers) + : Object.fromEntries( + Object.entries(this.handlers).map(([eventType, reducer]) => [eventType, serializeFn(reducer)]) + ); + return { + kind: 'projection', + name: this.name, + initialState: this.initialState, + matcher: matcherMetadata, + handlersKind: typeof this.handlers === 'function' ? 'function' : 'map', + handlers: handlersMetadata + }; + } + + static restore(name, options = {}) { + assert(typeof name === 'string' && name !== '', 'Projection.restore requires a projection name.'); + const fileName = options.fileName || `${name}.projection`; + return Projection.restoreFromFile(fileName, options); + } + + static restoreFromFile(fileName, options = {}) { + assert(fs.existsSync(fileName), `Projection file does not exist: ${fileName}`); + const metadata = JSON.parse(fs.readFileSync(fileName, 'utf8')); + return Projection.fromMetadata(metadata, { ...options, fileName }); + } + + static fromMetadata(metadata, options = {}) { + assert(metadata && typeof metadata === 'object', 'Invalid projection metadata.'); + if (metadata.kind === 'composite-projection') { + return CompositeProjection.fromMetadata(metadata, options); + } + assert(metadata.kind === 'projection', 'Invalid projection metadata kind.'); + const hmac = options.hmac; + const deserialize = (matcherMetadata) => { + if (!matcherMetadata) { + return undefined; + } + if (typeof matcherMetadata.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); + } + return buildMatcherFromMetadata(matcherMetadata, hmac); + }; + const handlers = metadata.handlersKind === 'function' + ? deserialize(metadata.handlers) + : Object.fromEntries( + Object.entries(metadata.handlers || {}).map(([eventType, reducerMetadata]) => [eventType, deserialize(reducerMetadata)]) + ); + const projection = new Projection(metadata.name, { + initialState: metadata.initialState, + matcher: deserialize(metadata.matcher), + handlers + }, { + ...options, + fileName: options.fileName || null + }); + projection.reset(); + return projection; + } + + static compose(name, projections, options = {}) { + return new CompositeProjection(name, projections, options); + } +} + +class CompositeProjection extends Projection { + + constructor(name, projections, options = {}) { + assert(projections && typeof projections === 'object' && !Array.isArray(projections), 'CompositeProjection requires an object map of projections.'); + const normalized = {}; + for (const [projectionName, projection] of Object.entries(projections)) { + normalized[projectionName] = projection instanceof Projection + ? projection + : new Projection(projectionName, projection, options); + } + super(name, { + initialState: Object.fromEntries( + Object.entries(normalized).map(([projectionName, projection]) => [projectionName, projection.initialState]) + ), + handlers: (state) => state, + matcher: options.matcher + }, options); + this.projections = normalized; + this.reset(); + } + + get types() { + const types = new Set(); + for (const projection of Object.values(this.projections)) { + for (const type of projection.types) { + types.add(type); + } + } + return [...types]; + } + + apply(state, event) { + if (!this.matches(event)) { + this.state = state; + return state; + } + const currentState = state || this.initialState; + const nextState = {}; + for (const [name, projection] of Object.entries(this.projections)) { + nextState[name] = projection.apply(currentState[name], event); + } + this.state = nextState; + return nextState; + } + + reset() { + for (const projection of Object.values(this.projections)) { + projection.reset(); + } + this.state = Object.fromEntries( + Object.entries(this.projections).map(([projectionName, projection]) => [projectionName, projection.state]) + ); + return this.state; + } + + toMetadata(hmac = this.hmac) { + return { + kind: 'composite-projection', + name: this.name, + matcher: this.matcher ? buildMetadataForMatcher(this.matcher, hmac) : null, + projections: Object.fromEntries( + Object.entries(this.projections).map(([name, projection]) => [name, projection.toMetadata(hmac)]) + ) + }; + } + + static fromMetadata(metadata, options = {}) { + const hmac = options.hmac; + const deserializeMatcher = (matcherMetadata) => { + if (!matcherMetadata) { + return undefined; + } + if (typeof matcherMetadata.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); + } + return buildMatcherFromMetadata(matcherMetadata, hmac); + }; + const projections = Object.fromEntries( + Object.entries(metadata.projections || {}).map(([name, projectionMetadata]) => [ + name, + Projection.fromMetadata(projectionMetadata, options) + ]) + ); + return new CompositeProjection(metadata.name, projections, { + ...options, + matcher: deserializeMatcher(metadata.matcher) + }); + } +} + +export default Projection; +export { CompositeProjection }; diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index db669117..47cf53b0 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -3,6 +3,7 @@ import fs from 'fs-extra'; import fsNative from 'fs'; import Storage from '../src/Storage.js'; import Consumer from '../src/Consumer.js'; +import Projection, { CompositeProjection } from '../src/Projection.js'; import { createHmac } from '../src/utils/metadataUtil.js'; import { fileURLToPath } from 'url'; @@ -562,6 +563,48 @@ describe('Consumer', function() { expect(() => new Consumer(storage, 'foobar', 'consumer-projection-hmac', {}, 0, { hmac: createHmac('wrong-secret') })).to.throwError(/Invalid HMAC/); }); + it('can attach a projection instance and restore it on reopen', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-projection-instance', { count: 0 }); + const projection = new Projection('consumer-projection-instance', { + initialState: { count: 0 }, + handlers: { + Foobar: (state, event) => ({ ...state, count: state.count + event.id }) + } + }, { + hmac: createHmac('test-secret') + }); + consumer.project(projection); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(6); + consumer.stop(); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-instance', {}, 0, { hmac: createHmac('test-secret') }); + consumer.on('progress', () => { + if (consumer.state.count === 10) { + done(); + } + }); + storage.write({ type: 'Foobar', id: 4 }); + }); + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('supports composite projections', function() { + const projection = new CompositeProjection('overview', { + count: { + initialState: 0, + handlers: { Foobar: (state) => state + 1 } + }, + last: { + initialState: null, + handlers: { Foobar: (state, event) => event.id || state } + } + }); + projection.handle([{ type: 'Foobar', id: 1 }, { type: 'Foobar', id: 2 }]); + expect(projection.state).to.eql({ count: 2, last: 2 }); + }); + it('can build consistency guards (aggregates)', function(done) { const guard = new Consumer(storage, 'foobar', 'unique-bar-guard'); guard.apply = function(event) { diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 8e56f02d..5ef7e282 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -4,6 +4,8 @@ import fsSync from 'fs'; import path from 'path'; import EventStore, { ExpectedVersion, OptimisticConcurrencyError, CommitCondition, LOCK_RECLAIM } from '../src/EventStore.js'; import Consumer from '../src/Consumer.js'; +import Projection from '../src/Projection.js'; +import { createHmac } from '../src/utils/metadataUtil.js'; import { fileURLToPath } from 'url'; const __dirname = fileURLToPath(new URL('.', import.meta.url)); @@ -1377,6 +1379,75 @@ describe('EventStore', function() { eventstore.commit('foo', { foo: 'bar', id: 1 }); eventstore.commit('bar', { foo: 'baz', id: 2 }); }); + + it('restores a projected consumer with eventStore typeAccessor defaults', function(done) { + eventstore = new EventStore({ + storageDirectory, + typeAccessor: 'type', + storageConfig: { + hmac: createHmac('test-secret') + } + }); + eventstore.createEventStream('user-stream', (event) => event.stream === 'user-stream'); + + const consumer = eventstore.getConsumer('user-stream', 'user-counter', { count: 0 }); + consumer.project(new Projection('user-counter', { + initialState: { count: 0 }, + handlers: { + UserCreated: (state) => ({ ...state, count: state.count + 1 }) + } + })); + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }]); + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 2 }]); + + consumer.on('progress', () => { + if (consumer.state.count !== 2) { + return; + } + eventstore.close(); + eventstore = new EventStore({ + storageDirectory, + typeAccessor: 'type', + storageConfig: { + hmac: createHmac('test-secret') + } + }); + const reopened = eventstore.getConsumer('user-stream', 'user-counter', { count: 0 }); + reopened.on('progress', () => { + if (reopened.state.count === 3) { + done(); + } + }); + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 3 }]); + }); + }); + }); + + describe('getProjection', function() { + + it('creates and restores persisted projections with EventStore defaults', function() { + eventstore = new EventStore({ + storageDirectory, + typeAccessor: 'type', + storageConfig: { + hmac: createHmac('test-secret') + } + }); + const projection = eventstore.getProjection('user-count', { + initialState: 0, + handlers: { + UserCreated: (state) => state + 1 + } + }); + projection.persist(); + + const restored = eventstore.getProjection('user-count'); + const state = restored.handle([ + { payload: { type: 'UserCreated' } }, + { payload: { type: 'UserCreated' } } + ]); + expect(state).to.be(2); + }); }); describe('scanConsumers', function() { From 2f46addf2ee5c9a25ec8efad991f23d844b37c6b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 31 May 2026 11:06:30 +0000 Subject: [PATCH 4/8] Address projection API review feedback --- docs/api.md | 37 +++++---------------- docs/consumers.md | 17 ++-------- src/Consumer.js | 74 +++++------------------------------------ src/EventStore.js | 25 ++++++++------ src/Projection.js | 12 +++++-- test/Consumer.spec.js | 44 ++++++++++++++++++------ test/EventStore.spec.js | 6 ++-- 7 files changed, 80 insertions(+), 135 deletions(-) diff --git a/docs/api.md b/docs/api.md index 4db01f3d..fa6ac740 100644 --- a/docs/api.md +++ b/docs/api.md @@ -260,31 +260,10 @@ Asynchronously scan all consumer state files and return their identifiers. --- -#### `consumer.createProjection(projectionFn, [options])` +#### `eventstore.getProjection(name, [definition])` ```javascript -consumer.createProjection(projectionFn [, options]) -``` - -Register a reducer-style projection as the consumer's `'data'` handler and persist it so it is auto-restored when reopening the same consumer. - -`projectionFn` can be either: - -- a reducer function: `(state, event) => state` -- an object map: `{ [eventType]: (state, event) => state }` - -Options: - -| Parameter | Type | Default | Description | -|-----------|------|---------|-------------| -| `options.hmac` | `function(string): string` | storage HMAC | HMAC function used to sign/verify serialized projections. Required when no storage-level HMAC is configured. | - ---- - -#### `eventstore.getProjection(name, [definition], [options])` - -```javascript -eventstore.getProjection(name [, definition [, options]]) → Projection +eventstore.getProjection(name [, definition]) → Projection ``` Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), or restore a previously persisted one when `definition` is omitted. @@ -301,23 +280,23 @@ Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), o --- -#### `consumer.project(projection, [options])` +#### `consumer.project(projection)` ```javascript -consumer.project(projection [, options]) +consumer.project(projection) ``` -Attach a `Projection` instance to a durable consumer and persist its definition for automatic restore when reopening the same consumer. +Attach a projection-like object (`apply(state, event)`) as the consumer `'data'` handler. --- -#### `projection.subscribe(consumer, [options])` +#### `projection.subscribe(consumer)` ```javascript -projection.subscribe(consumer [, options]) +projection.subscribe(consumer) ``` -Alias for `consumer.project(projection, options)`. +Attach this projection to the consumer and persist its definition next to the consumer state file so `eventstore.getConsumer(...)` can restore and reconnect it automatically. --- diff --git a/docs/consumers.md b/docs/consumers.md index a6ced869..9e34348c 100644 --- a/docs/consumers.md +++ b/docs/consumers.md @@ -74,28 +74,15 @@ consumer.setState((state) => ({ ...state, count: state.count + 1 })); Use a `Projection` to define *how* events are projected into state, then connect it to a `Consumer` for durable continuous updates. ```javascript -import crypto from 'crypto'; - -const hmac = (code) => crypto.createHmac('sha256', 'your-private-secret').update(code).digest('hex'); - const projection = eventstore.getProjection('orders-total', { initialState: { total: 0 }, handlers: { OrderCreated: (state, event) => ({ ...state, total: state.total + (event.payload.amount || 0) }) } -}, { hmac }); +}); const consumer = eventstore.getConsumer('orders', 'orders-projection', projection.initialState); -consumer.project(projection); -``` - -You can still use `consumer.createProjection(...)` for the shorter in-place API: - -```javascript -consumer.createProjection({ - OrderCreated: (state, event) => ({ ...state, orders: [...state.orders, event] }), - OrderCancelled: (state, event) => ({ ...state, cancelled: state.cancelled + 1 }) -}, { hmac }); +projection.subscribe(consumer); ``` Projections are composable via `CompositeProjection`: diff --git a/src/Consumer.js b/src/Consumer.js index 948f0bad..2609651e 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -3,7 +3,6 @@ import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; import { ensureDirectory } from './utils/fsUtil.js'; -import Projection from './Projection.js'; import Storage from './Storage/ReadableStorage.js'; const MAX_CATCHUP_BATCH = 10; @@ -34,7 +33,7 @@ class Consumer extends stream.Readable { * @param {object} [initialState={}] The initial state of the consumer. * @param {number} [startFrom=0] The revision to start from within the index to consume. */ - constructor(storage, indexName, identifier, initialState = {}, startFrom = 0, options = {}) { + constructor(storage, indexName, identifier, initialState = {}, startFrom = 0) { super({ objectMode: true }); assert(storage instanceof Storage, 'Must provide a storage for the consumer.'); @@ -43,7 +42,6 @@ class Consumer extends stream.Readable { this.initializeStorage(storage, indexName, identifier); this.restoreState(initialState, startFrom); - this.restoreProjection(options); this.handler = this.handleNewDocument.bind(this); this.on('error', () => (this.handleDocument = false)); } @@ -61,7 +59,6 @@ class Consumer extends stream.Readable { this.identifier = identifier; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); - this.projectionFileName = this.fileName + '.projection'; if (ensureDirectory(consumerDirectory)) { this.cleanUpFailedWrites(); } @@ -73,16 +70,12 @@ class Consumer extends stream.Readable { */ cleanUpFailedWrites() { const consumerBaseName = path.basename(this.fileName); - const projectionBaseName = consumerBaseName + '.projection'; const escapedConsumerBaseName = consumerBaseName.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); const failedStateFilePattern = new RegExp(`^${escapedConsumerBaseName}\\.\\d+$`); const consumerDirectory = path.dirname(this.fileName); const files = fs.readdirSync(consumerDirectory); for (let file of files) { - if (file === projectionBaseName) { - continue; - } - if (file === projectionBaseName + '.tmp' || failedStateFilePattern.test(file)) { + if (failedStateFilePattern.test(file)) { safeUnlink(path.join(consumerDirectory, file)); } } @@ -117,35 +110,12 @@ class Consumer extends stream.Readable { } /** - * Restore a persisted projection and register it as data handler. - * @private - * @param {object} [options={}] - * @param {function(string): string} [options.hmac] - */ - restoreProjection(options = {}) { - this.projection = null; - this.projectionHandler = null; - if (!this.projectionFileName || !fs.existsSync(this.projectionFileName)) { - return; - } - const projection = Projection.restoreFromFile(this.projectionFileName, { - hmac: options.hmac || this.storage.hmac, - typeAccessor: options.typeAccessor - }); - this.project(projection, { persist: false }); - } - - /** - * Register a projection function as `data` event handler. - * @private - * @param {Projection} projection - * @param {object} [options] - * @param {boolean} [options.persist=true] - * @param {function(string): string} [options.hmac] + * Register a projection as `data` event handler. + * @api + * @param {{ apply: function(object, object): object }} projection */ - project(projection, options = {}) { - assert(projection instanceof Projection, 'Projection must be an instance of Projection.'); - const { persist = true, hmac } = options; + project(projection) { + assert(projection && typeof projection.apply === 'function', 'Projection must implement apply(state, event).'); if (this.projectionHandler) { this.removeListener('data', this.projectionHandler); } @@ -154,35 +124,7 @@ class Consumer extends stream.Readable { this.setState(projection.apply(this.state, event)); }; this.on('data', this.projectionHandler); - if (persist) { - projection.persist({ - fileName: this.projectionFileName, - hmac: hmac || projection.hmac || this.storage.hmac - }); - } - return projection; - } - - /** - * Create and persist a projection reducer and register it as data handler. - * - * @api - * @param {function(object, object): object|object} projectionFn - * @param {object} [options] - * @param {function(string): string} [options.hmac] Required for function projections. - */ - createProjection(projectionFn, options = {}) { - const projection = projectionFn instanceof Projection - ? projectionFn - : new Projection(options.name || this.identifier, { - initialState: this.state, - matcher: options.matcher, - handlers: projectionFn - }, { - hmac: options.hmac || this.storage.hmac, - typeAccessor: options.typeAccessor - }); - return this.project(projection, options); + return this; } /** diff --git a/src/EventStore.js b/src/EventStore.js index 035feb25..e82d4212 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -121,6 +121,9 @@ class EventStore extends events.EventEmitter { } this.initialize(storeName, storageConfig); + this.projectionHmac = typeof storageConfig.hmac === 'function' + ? storageConfig.hmac + : this.storage.hmac; } /** @@ -788,10 +791,14 @@ class EventStore extends events.EventEmitter { const projectionTypeAccessor = this.typeAccessor ? (event) => this.typeAccessor(event?.payload || event) : undefined; - const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since, { - hmac: this.storage.hmac, - typeAccessor: projectionTypeAccessor - }); + const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since); + const consumerProjectionFileName = `${consumer.fileName}.projection`; + if (fs.existsSync(consumerProjectionFileName)) { + Projection.restoreFromFile(consumerProjectionFileName, { + hmac: this.projectionHmac, + typeAccessor: projectionTypeAccessor + }).subscribe(consumer); + } consumer.streamName = streamName; this.consumers.set(identifier, consumer); return consumer; @@ -802,19 +809,17 @@ class EventStore extends events.EventEmitter { * * @param {string} name Projection name. * @param {object} [definition] Projection definition. - * @param {object} [options] Projection options. * @returns {Projection} */ - getProjection(name, definition, options = {}) { + getProjection(name, definition) { assert(typeof name === 'string' && name !== '', 'Must provide a projection name.'); const projectionTypeAccessor = this.typeAccessor ? (event) => this.typeAccessor(event?.payload || event) - : options.typeAccessor; + : undefined; const projectionFileName = path.join(this.storage.indexDirectory, 'projections', this.storage.storageFile + '.' + name + '.projection'); const projectionOptions = { - ...options, - fileName: options.fileName || projectionFileName, - hmac: options.hmac || this.storage.hmac, + fileName: projectionFileName, + hmac: this.projectionHmac, typeAccessor: projectionTypeAccessor }; if (definition) { diff --git a/src/Projection.js b/src/Projection.js index 7b580b65..09c7e5b2 100644 --- a/src/Projection.js +++ b/src/Projection.js @@ -85,9 +85,17 @@ class Projection { return matches(event, this.matcher); } - subscribe(consumer, options = {}) { + subscribe(consumer) { assert(consumer && typeof consumer.project === 'function', 'Projection.subscribe expects a Consumer instance.'); - consumer.project(this, options); + const projectionFileName = consumer.fileName ? `${consumer.fileName}.projection` : null; + const isAlreadySubscribed = consumer.projection === this; + const isAlreadyPersisted = projectionFileName && this.fileName === projectionFileName && fs.existsSync(projectionFileName); + consumer.project(this); + if (!isAlreadySubscribed && !isAlreadyPersisted) { + this.persist({ + fileName: projectionFileName || this.fileName + }); + } return this; } diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index 47cf53b0..760315f3 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -521,9 +521,14 @@ describe('Consumer', function() { }); }); - it('can create projections from a reducer function', function(done) { + it('can attach projections from a reducer function', function(done) { consumer = new Consumer(storage, 'foobar', 'consumer-projection', { count: 0 }); - consumer.createProjection((state, event) => ({ ...state, count: state.count + event.id }), { hmac: createHmac('test-secret') }); + new Projection('consumer-projection', { + initialState: { count: 0 }, + handlers: (state, event) => ({ ...state, count: state.count + event.id }) + }, { + hmac: createHmac('test-secret') + }).subscribe(consumer); consumer.on('caught-up', () => { expect(consumer.state.count).to.be(6); done(); @@ -534,16 +539,23 @@ describe('Consumer', function() { storage.write({ type: 'Foobar', id: 3 }); }); - it('can create projections from event-type reducer maps and restore them on reopen', function(done) { + it('can attach and restore projections from event-type reducer maps', function(done) { consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', { count: 0 }); - consumer.createProjection({ - Foobar: (state, event) => ({ ...state, count: state.count + event.id }), - Bazinga: (state) => state + const projection = new Projection('consumer-projection-map', { + initialState: { count: 0 }, + handlers: { + Foobar: (state, event) => ({ ...state, count: state.count + event.id }), + Bazinga: (state) => state + } }, { hmac: createHmac('test-secret') }); + projection.subscribe(consumer); consumer.on('caught-up', () => { consumer.stop(); - consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', {}, 0, { hmac: createHmac('test-secret') }); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', {}); + Projection.restoreFromFile(`${consumer.fileName}.projection`, { + hmac: createHmac('test-secret') + }).subscribe(consumer); consumer.on('progress', () => { if (consumer.state.count === 10) { done(); @@ -559,8 +571,15 @@ describe('Consumer', function() { it('throws if function projection is restored without trusted hmac', function() { consumer = new Consumer(storage, 'foobar', 'consumer-projection-hmac'); - consumer.createProjection((state, event) => ({ ...state, lastId: event.id }), { hmac: createHmac('test-secret') }); - expect(() => new Consumer(storage, 'foobar', 'consumer-projection-hmac', {}, 0, { hmac: createHmac('wrong-secret') })).to.throwError(/Invalid HMAC/); + new Projection('consumer-projection-hmac', { + initialState: {}, + handlers: (state, event) => ({ ...state, lastId: event.id }) + }, { + hmac: createHmac('test-secret') + }).subscribe(consumer); + expect(() => Projection.restoreFromFile(`${consumer.fileName}.projection`, { + hmac: createHmac('wrong-secret') + })).to.throwError(/Invalid HMAC/); }); it('can attach a projection instance and restore it on reopen', function(done) { @@ -573,11 +592,14 @@ describe('Consumer', function() { }, { hmac: createHmac('test-secret') }); - consumer.project(projection); + projection.subscribe(consumer); consumer.on('caught-up', () => { expect(consumer.state.count).to.be(6); consumer.stop(); - consumer = new Consumer(storage, 'foobar', 'consumer-projection-instance', {}, 0, { hmac: createHmac('test-secret') }); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-instance', {}); + Projection.restoreFromFile(`${consumer.fileName}.projection`, { + hmac: createHmac('test-secret') + }).subscribe(consumer); consumer.on('progress', () => { if (consumer.state.count === 10) { done(); diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 5ef7e282..58a12eda 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -1391,12 +1391,14 @@ describe('EventStore', function() { eventstore.createEventStream('user-stream', (event) => event.stream === 'user-stream'); const consumer = eventstore.getConsumer('user-stream', 'user-counter', { count: 0 }); - consumer.project(new Projection('user-counter', { + new Projection('user-counter', { initialState: { count: 0 }, handlers: { UserCreated: (state) => ({ ...state, count: state.count + 1 }) } - })); + }, { + hmac: createHmac('test-secret') + }).subscribe(consumer); eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }]); eventstore.commit('user-stream', [{ type: 'UserCreated', id: 2 }]); From a222f4ce4ac8d9d79af686d7dcc54a2c07855926 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:06:07 +0000 Subject: [PATCH 5/8] Address projection API and utility review feedback --- docs/api.md | 19 +++++++------------ docs/consumers.md | 16 +++++++++------- src/Consumer.js | 19 ++----------------- src/EventStore.js | 31 +++++++++++++++---------------- src/Projection.js | 22 +++++----------------- src/utils/fsUtil.js | 39 +++++++++++++++++++++++++++++++++++++++ test/EventStore.spec.js | 15 ++++++--------- 7 files changed, 83 insertions(+), 78 deletions(-) diff --git a/docs/api.md b/docs/api.md index fa6ac740..e4885bf1 100644 --- a/docs/api.md +++ b/docs/api.md @@ -260,23 +260,18 @@ Asynchronously scan all consumer state files and return their identifiers. --- -#### `eventstore.getProjection(name, [definition])` +#### `eventstore.getProjection(name, [handlers], [initialState], [matcher])` ```javascript -eventstore.getProjection(name [, definition]) → Projection +eventstore.getProjection(name [, handlers] [, initialState] [, matcher]) → Projection ``` -Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), or restore a previously persisted one when `definition` is omitted. +Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), or restore a previously persisted one when `handlers` is omitted. -`definition` shape: +- `handlers`: reducer function `(state, event) => state` or map `{ [eventType]: reducer }` +- `initialState`: initial projection state (default `{}`) +- `matcher`: optional object/function matcher (same shape as stream/query matchers) -```javascript -{ - initialState: any, - handlers: (state, event) => state | { [eventType]: (state, event) => state }, - matcher: object|function // optional -} -``` --- @@ -296,7 +291,7 @@ Attach a projection-like object (`apply(state, event)`) as the consumer `'data'` projection.subscribe(consumer) ``` -Attach this projection to the consumer and persist its definition next to the consumer state file so `eventstore.getConsumer(...)` can restore and reconnect it automatically. +Attach this projection to the consumer (same wiring behavior as `consumer.project(projection)`) and persist its definition next to the consumer state file so `eventstore.getConsumer(...)` can restore and reconnect it automatically. --- diff --git a/docs/consumers.md b/docs/consumers.md index 9e34348c..75436a92 100644 --- a/docs/consumers.md +++ b/docs/consumers.md @@ -75,11 +75,8 @@ Use a `Projection` to define *how* events are projected into state, then connect ```javascript const projection = eventstore.getProjection('orders-total', { - initialState: { total: 0 }, - handlers: { - OrderCreated: (state, event) => ({ ...state, total: state.total + (event.payload.amount || 0) }) - } -}); + OrderCreated: (state, event) => ({ ...state, total: state.total + (event.payload.amount || 0) }) +}, { total: 0 }); const consumer = eventstore.getConsumer('orders', 'orders-projection', projection.initialState); projection.subscribe(consumer); @@ -88,10 +85,15 @@ projection.subscribe(consumer); Projections are composable via `CompositeProjection`: ```javascript -import { CompositeProjection } from 'event-storage'; +import { CompositeProjection, Projection } from 'event-storage'; + +const count = new Projection('count', { + initialState: 0, + handlers: { OrderCreated: (state) => state + 1 } +}); const overview = new CompositeProjection('overview', { - count: { initialState: 0, handlers: { OrderCreated: (state) => state + 1 } }, + count, last: { initialState: null, handlers: { OrderCreated: (state, event) => event.payload } } }); ``` diff --git a/src/Consumer.js b/src/Consumer.js index 2609651e..4dc90110 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -2,24 +2,10 @@ import stream from 'stream'; import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; -import { ensureDirectory } from './utils/fsUtil.js'; +import { ensureDirectory, safeUnlink, writeFileAtomic } from './utils/fsUtil.js'; import Storage from './Storage/ReadableStorage.js'; const MAX_CATCHUP_BATCH = 10; -/** - * Safely unlink a file and ignore if it doesn't exist. - * @param {string} filename - */ -const safeUnlink = (filename) => { - /* istanbul ignore next */ - try { - fs.unlinkSync(filename); - } catch (e) { - if (e.code !== "ENOENT") { - throw e; - } - } -}; /** * Implements an event-driven durable Consumer that provides at-least-once delivery semantics or exactly-once processing semantics if only using setState(). @@ -197,9 +183,8 @@ class Consumer extends stream.Readable { throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`); } try { - fs.writeFileSync(tmpFile, consumerData); // If the write fails (half-way), the consumer state file will not be corrupted - fs.renameSync(tmpFile, this.fileName); + writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }); this.emit('persisted', consumerState); } catch (e) { /* istanbul ignore next */ diff --git a/src/EventStore.js b/src/EventStore.js index e82d4212..ce6d7e77 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -120,10 +120,10 @@ class EventStore extends events.EventEmitter { } } + this.projectionTypeAccessor = this.typeAccessor + ? (event) => this.typeAccessor(event?.payload || event) + : undefined; this.initialize(storeName, storageConfig); - this.projectionHmac = typeof storageConfig.hmac === 'function' - ? storageConfig.hmac - : this.storage.hmac; } /** @@ -788,15 +788,12 @@ class EventStore extends events.EventEmitter { if (this.consumers.has(identifier)) { return this.consumers.get(identifier); } - const projectionTypeAccessor = this.typeAccessor - ? (event) => this.typeAccessor(event?.payload || event) - : undefined; const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since); const consumerProjectionFileName = `${consumer.fileName}.projection`; if (fs.existsSync(consumerProjectionFileName)) { Projection.restoreFromFile(consumerProjectionFileName, { - hmac: this.projectionHmac, - typeAccessor: projectionTypeAccessor + hmac: this.storage.hmac, + typeAccessor: this.projectionTypeAccessor }).subscribe(consumer); } consumer.streamName = streamName; @@ -808,21 +805,23 @@ class EventStore extends events.EventEmitter { * Get or create a projection with EventStore defaults. * * @param {string} name Projection name. - * @param {object} [definition] Projection definition. + * @param {function(object, object): object|object} [handlers] Projection handlers (reducer fn or reducer map). + * @param {object} [initialState={}] Projection initial state. + * @param {object|function(object): boolean} [matcher] Optional projection matcher. * @returns {Projection} */ - getProjection(name, definition) { + getProjection(name, handlers, initialState = {}, matcher) { assert(typeof name === 'string' && name !== '', 'Must provide a projection name.'); - const projectionTypeAccessor = this.typeAccessor - ? (event) => this.typeAccessor(event?.payload || event) - : undefined; const projectionFileName = path.join(this.storage.indexDirectory, 'projections', this.storage.storageFile + '.' + name + '.projection'); const projectionOptions = { fileName: projectionFileName, - hmac: this.projectionHmac, - typeAccessor: projectionTypeAccessor + hmac: this.storage.hmac, + typeAccessor: this.projectionTypeAccessor }; - if (definition) { + if (handlers !== undefined) { + const definition = (handlers && typeof handlers === 'object' && !Array.isArray(handlers) && Object.prototype.hasOwnProperty.call(handlers, 'handlers')) + ? handlers + : { handlers, initialState, matcher }; return new Projection(name, definition, projectionOptions); } return Projection.restore(name, projectionOptions); diff --git a/src/Projection.js b/src/Projection.js index 09c7e5b2..742c2efc 100644 --- a/src/Projection.js +++ b/src/Projection.js @@ -1,20 +1,11 @@ import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; -import { ensureDirectory } from './utils/fsUtil.js'; +import { ensureDirectory, writeFileAtomic } from './utils/fsUtil.js'; import { buildMatcherFromMetadata, buildMetadataForMatcher, matches } from './utils/metadataUtil.js'; const DEFAULT_TYPE_ACCESSOR = (event) => event?.type || event?.payload?.type; -const safeUnlink = (filename) => { - try { - fs.unlinkSync(filename); - } catch (e) { - if (e.code !== "ENOENT") { - throw e; - } - } -}; class Projection { @@ -105,13 +96,10 @@ class Projection { const metadata = this.toMetadata(hmac); const tmpFile = fileName + '.tmp'; ensureDirectory(path.dirname(fileName)); - try { - fs.writeFileSync(tmpFile, JSON.stringify(metadata), 'utf8'); - fs.renameSync(tmpFile, fileName); - } catch (e) { - safeUnlink(tmpFile); - throw e; - } + writeFileAtomic(fileName, JSON.stringify(metadata), { + tmpFileName: tmpFile, + encoding: 'utf8' + }); this.fileName = fileName; this.hmac = hmac; return fileName; diff --git a/src/utils/fsUtil.js b/src/utils/fsUtil.js index 1dd2086f..37cafbe2 100644 --- a/src/utils/fsUtil.js +++ b/src/utils/fsUtil.js @@ -2,6 +2,43 @@ import fs from 'fs'; import path from 'path'; import { mkdirpSync } from 'mkdirp'; +/** + * Safely unlink a file and ignore ENOENT. + * @param {string} fileName + */ +function safeUnlink(fileName) { + try { + fs.unlinkSync(fileName); + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } +} + +/** + * Atomically write a file by writing to a temporary file first and then renaming it. + * @param {string} fileName + * @param {string|Buffer} data + * @param {{ tmpFileName?: string, encoding?: BufferEncoding }} [options] + * @returns {string} + */ +function writeFileAtomic(fileName, data, options = {}) { + const tmpFileName = options.tmpFileName || `${fileName}.tmp`; + try { + if (options.encoding) { + fs.writeFileSync(tmpFileName, data, options.encoding); + } else { + fs.writeFileSync(tmpFileName, data); + } + fs.renameSync(tmpFileName, fileName); + } catch (e) { + safeUnlink(tmpFileName); + throw e; + } + return fileName; +} + /** * Ensure that the given directory exists. * @param {string} dirName @@ -119,5 +156,7 @@ function scanForFiles(directory, regexPattern, onEach, onDone) { export { ensureDirectory, + safeUnlink, + writeFileAtomic, scanForFiles, }; diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 58a12eda..1a7ac4a5 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -1385,7 +1385,7 @@ describe('EventStore', function() { storageDirectory, typeAccessor: 'type', storageConfig: { - hmac: createHmac('test-secret') + hmacSecret: 'test-secret' } }); eventstore.createEventStream('user-stream', (event) => event.stream === 'user-stream'); @@ -1397,7 +1397,7 @@ describe('EventStore', function() { UserCreated: (state) => ({ ...state, count: state.count + 1 }) } }, { - hmac: createHmac('test-secret') + hmac: eventstore.storage.hmac }).subscribe(consumer); eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }]); eventstore.commit('user-stream', [{ type: 'UserCreated', id: 2 }]); @@ -1411,7 +1411,7 @@ describe('EventStore', function() { storageDirectory, typeAccessor: 'type', storageConfig: { - hmac: createHmac('test-secret') + hmacSecret: 'test-secret' } }); const reopened = eventstore.getConsumer('user-stream', 'user-counter', { count: 0 }); @@ -1432,15 +1432,12 @@ describe('EventStore', function() { storageDirectory, typeAccessor: 'type', storageConfig: { - hmac: createHmac('test-secret') + hmacSecret: 'test-secret' } }); const projection = eventstore.getProjection('user-count', { - initialState: 0, - handlers: { - UserCreated: (state) => state + 1 - } - }); + UserCreated: (state) => state + 1 + }, 0); projection.persist(); const restored = eventstore.getProjection('user-count'); From f9e7fdd711878afe8ccb420c72c17d74ec0878e1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Jun 2026 17:09:08 +0000 Subject: [PATCH 6/8] Refine projection API docs and atomic fs helpers --- src/EventStore.js | 9 ++++++++- src/utils/fsUtil.js | 20 ++++---------------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/EventStore.js b/src/EventStore.js index ce6d7e77..a56d5e37 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -819,7 +819,7 @@ class EventStore extends events.EventEmitter { typeAccessor: this.projectionTypeAccessor }; if (handlers !== undefined) { - const definition = (handlers && typeof handlers === 'object' && !Array.isArray(handlers) && Object.prototype.hasOwnProperty.call(handlers, 'handlers')) + const definition = isProjectionDefinitionObject(handlers) ? handlers : { handlers, initialState, matcher }; return new Projection(name, definition, projectionOptions); @@ -883,6 +883,13 @@ function normalizePredicateRaw(predicate, raw) { return { predicate, raw }; } +function isProjectionDefinitionObject(value) { + return value + && typeof value === 'object' + && !Array.isArray(value) + && Object.hasOwn(value, 'handlers'); +} + EventStore.Storage = Storage; EventStore.Index = Index; diff --git a/src/utils/fsUtil.js b/src/utils/fsUtil.js index 37cafbe2..08ea7f86 100644 --- a/src/utils/fsUtil.js +++ b/src/utils/fsUtil.js @@ -2,10 +2,7 @@ import fs from 'fs'; import path from 'path'; import { mkdirpSync } from 'mkdirp'; -/** - * Safely unlink a file and ignore ENOENT. - * @param {string} fileName - */ +// Best-effort cleanup for temporary files after interrupted/failed writes. function safeUnlink(fileName) { try { fs.unlinkSync(fileName); @@ -16,21 +13,12 @@ function safeUnlink(fileName) { } } -/** - * Atomically write a file by writing to a temporary file first and then renaming it. - * @param {string} fileName - * @param {string|Buffer} data - * @param {{ tmpFileName?: string, encoding?: BufferEncoding }} [options] - * @returns {string} - */ +// Prevent partially written persistence files from replacing the last valid state. function writeFileAtomic(fileName, data, options = {}) { const tmpFileName = options.tmpFileName || `${fileName}.tmp`; + const writeOptions = options.encoding ? { encoding: options.encoding } : undefined; try { - if (options.encoding) { - fs.writeFileSync(tmpFileName, data, options.encoding); - } else { - fs.writeFileSync(tmpFileName, data); - } + fs.writeFileSync(tmpFileName, data, writeOptions); fs.renameSync(tmpFileName, fileName); } catch (e) { safeUnlink(tmpFileName); From 30a73dbc5b14ef4af5b34a132d379ad91aa56820 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 4 Jun 2026 20:16:01 +0000 Subject: [PATCH 7/8] Address remaining projection and consumer review feedback --- docs/consumers.md | 1 + src/Consumer.js | 30 ++++++++---- src/EventStore.js | 12 ++--- src/Projection.js | 99 +++++++++++++++++++++++++++++++++++---- src/utils/fsUtil.js | 25 +++++++++- src/utils/metadataUtil.js | 1 + test/Consumer.spec.js | 28 +++++++++++ test/util.spec.js | 22 ++++++++- 8 files changed, 192 insertions(+), 26 deletions(-) diff --git a/docs/consumers.md b/docs/consumers.md index 75436a92..411c6694 100644 --- a/docs/consumers.md +++ b/docs/consumers.md @@ -96,6 +96,7 @@ const overview = new CompositeProjection('overview', { count, last: { initialState: null, handlers: { OrderCreated: (state, event) => event.payload } } }); +// overview.state -> { count: number, last: object|null } ``` ## Resetting a Consumer diff --git a/src/Consumer.js b/src/Consumer.js index 4dc90110..778980c7 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -2,7 +2,7 @@ import stream from 'stream'; import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; -import { ensureDirectory, safeUnlink, writeFileAtomic } from './utils/fsUtil.js'; +import { ensureDirectory, isSafeRelativeName, resolvePathWithinRoot, safeUnlink, writeFileAtomic } from './utils/fsUtil.js'; import Storage from './Storage/ReadableStorage.js'; const MAX_CATCHUP_BATCH = 10; @@ -39,12 +39,15 @@ class Consumer extends stream.Readable { * @param {string} identifier The unique name to identify this consumer. */ initializeStorage(storage, indexName, identifier) { + assert(indexName === '_all' || isSafeRelativeName(indexName), `Invalid index name "${indexName}" for consumer.`); + assert(isSafeRelativeName(identifier), `Invalid identifier "${identifier}" for consumer.`); this.storage = storage; this.index = this.storage.openIndex(indexName); this.indexName = indexName; this.identifier = identifier; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); - this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); + this.fileName = resolvePathWithinRoot(consumerDirectory, `${this.storage.storageFile}.${indexName}.${identifier}`); + ensureDirectory(path.dirname(this.fileName)); if (ensureDirectory(consumerDirectory)) { this.cleanUpFailedWrites(); } @@ -56,12 +59,14 @@ class Consumer extends stream.Readable { */ cleanUpFailedWrites() { const consumerBaseName = path.basename(this.fileName); - const escapedConsumerBaseName = consumerBaseName.replace(/[.*+?^${}()|[\]\\]/g, '\\$&'); - const failedStateFilePattern = new RegExp(`^${escapedConsumerBaseName}\\.\\d+$`); const consumerDirectory = path.dirname(this.fileName); const files = fs.readdirSync(consumerDirectory); for (let file of files) { - if (failedStateFilePattern.test(file)) { + if (!file.startsWith(consumerBaseName + '.')) { + continue; + } + const suffix = file.slice(consumerBaseName.length + 1); + if (/^\d+$/.test(suffix)) { safeUnlink(path.join(consumerDirectory, file)); } } @@ -102,6 +107,11 @@ class Consumer extends stream.Readable { */ project(projection) { assert(projection && typeof projection.apply === 'function', 'Projection must implement apply(state, event).'); + const projectionFileName = this.fileName ? `${this.fileName}.projection` : null; + const isAlreadySubscribed = this.projection === projection; + const isAlreadyPersisted = projectionFileName + && projection.fileName === projectionFileName + && fs.existsSync(projectionFileName); if (this.projectionHandler) { this.removeListener('data', this.projectionHandler); } @@ -110,6 +120,9 @@ class Consumer extends stream.Readable { this.setState(projection.apply(this.state, event)); }; this.on('data', this.projectionHandler); + if (!isAlreadySubscribed && !isAlreadyPersisted && typeof projection.persist === 'function') { + projection.persist({ fileName: projectionFileName || projection.fileName }); + } return this; } @@ -127,6 +140,9 @@ class Consumer extends stream.Readable { if (typeof newState === 'function') { newState = newState(this.state); } + if (this.state === newState) { + return; + } this.state = Object.freeze(newState); this.doPersist = persist; } @@ -183,9 +199,7 @@ class Consumer extends stream.Readable { throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`); } try { - // If the write fails (half-way), the consumer state file will not be corrupted - writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }); - this.emit('persisted', consumerState); + writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }, () => this.emit('persisted', consumerState)); } catch (e) { /* istanbul ignore next */ safeUnlink(tmpFile); diff --git a/src/EventStore.js b/src/EventStore.js index a56d5e37..28462339 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -8,8 +8,8 @@ import Index from './Index.js'; import Consumer from './Consumer.js'; import Projection from './Projection.js'; import { assert, getPropertyAtPath } from './utils/util.js'; -import { ensureDirectory, scanForFiles } from './utils/fsUtil.js'; -import { buildTypeMatcherFn } from './utils/metadataUtil.js'; +import { ensureDirectory, isSafeRelativeName, scanForFiles } from './utils/fsUtil.js'; +import { buildTypeMatcherFn, isPlainObject } from './utils/metadataUtil.js'; const ExpectedVersion = { Any: -1, @@ -20,7 +20,6 @@ const ExpectedVersion = { * Default matcher property paths mirroring the Storage default, used for index optimization. */ const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type']; -const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/; const STORAGE_HOOK_EVENTS = new Set(['preCommit', 'preRead']); class OptimisticConcurrencyError extends Error {} @@ -435,7 +434,7 @@ class EventStore extends events.EventEmitter { return null; } assert(typeof type === 'string', 'typeAccessor must return a string.'); - assert(STREAM_NAME_PATTERN.test(type), `typeAccessor must return a valid stream name. Got: "${type}"`); + assert(isSafeRelativeName(type), `typeAccessor must return a valid stream name. Got: "${type}"`); return type; } @@ -884,10 +883,7 @@ function normalizePredicateRaw(predicate, raw) { } function isProjectionDefinitionObject(value) { - return value - && typeof value === 'object' - && !Array.isArray(value) - && Object.hasOwn(value, 'handlers'); + return isPlainObject(value) && Object.hasOwn(value, 'handlers'); } EventStore.Storage = Storage; diff --git a/src/Projection.js b/src/Projection.js index 742c2efc..5425efe7 100644 --- a/src/Projection.js +++ b/src/Projection.js @@ -9,6 +9,11 @@ const DEFAULT_TYPE_ACCESSOR = (event) => event?.type || event?.payload?.type; class Projection { + /** + * @param {string} name Projection name. + * @param {{ initialState?: object, handlers: function(object, object): object|object, matcher?: object|function(object): boolean }} [definition] + * @param {{ hmac?: function(string): string, typeAccessor?: function(object): string, fileName?: string }} [options] + */ constructor(name, definition = {}, options = {}) { assert(typeof name === 'string' && name !== '', 'Projection must have a name.'); const { initialState = {}, handlers, matcher } = definition; @@ -35,6 +40,12 @@ class Projection { return Object.keys(this.handlers); } + /** + * Apply one event to the provided state and return the next state. + * @param {*} state + * @param {object} event + * @returns {*} + */ apply(state, event) { if (!this.matches(event)) { this.state = state; @@ -53,6 +64,11 @@ class Projection { return nextState; } + /** + * Reset to initialState and project all events from the given iterable stream. + * @param {Iterable} stream + * @returns {*} + */ handle(stream) { this.reset(); for (const event of stream) { @@ -61,11 +77,20 @@ class Projection { return this.state; } + /** + * Reset current projection state to its initial state. + * @returns {*} + */ reset() { this.state = this.initialState; return this.state; } + /** + * Check whether an event matches this projection's matcher definition. + * @param {object} event + * @returns {boolean} + */ matches(event) { if (!this.matcher) { return true; @@ -76,20 +101,22 @@ class Projection { return matches(event, this.matcher); } + /** + * Subscribe this projection to a consumer and persist when needed. + * @param {{ project: function(Projection): object }} consumer + * @returns {Projection} + */ subscribe(consumer) { assert(consumer && typeof consumer.project === 'function', 'Projection.subscribe expects a Consumer instance.'); - const projectionFileName = consumer.fileName ? `${consumer.fileName}.projection` : null; - const isAlreadySubscribed = consumer.projection === this; - const isAlreadyPersisted = projectionFileName && this.fileName === projectionFileName && fs.existsSync(projectionFileName); consumer.project(this); - if (!isAlreadySubscribed && !isAlreadyPersisted) { - this.persist({ - fileName: projectionFileName || this.fileName - }); - } return this; } + /** + * Persist projection definition metadata to disk. + * @param {{ hmac?: function(string): string, fileName?: string }} [options] + * @returns {string} Persisted file name. + */ persist(options = {}) { const hmac = options.hmac || this.hmac; const fileName = options.fileName || this.fileName || `${this.name}.projection`; @@ -105,6 +132,11 @@ class Projection { return fileName; } + /** + * Serialize this projection definition into trusted metadata. + * @param {function(string): string} [hmac] + * @returns {object} + */ toMetadata(hmac = this.hmac) { const serializeFn = (fn) => { assert(typeof hmac === 'function', 'Must provide options.hmac for function projections.'); @@ -128,18 +160,36 @@ class Projection { }; } + /** + * Restore a projection by name from default or configured file location. + * @param {string} name + * @param {{ fileName?: string, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {Projection} + */ static restore(name, options = {}) { assert(typeof name === 'string' && name !== '', 'Projection.restore requires a projection name.'); const fileName = options.fileName || `${name}.projection`; return Projection.restoreFromFile(fileName, options); } + /** + * Restore a projection from an explicit metadata file path. + * @param {string} fileName + * @param {{ hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {Projection} + */ static restoreFromFile(fileName, options = {}) { assert(fs.existsSync(fileName), `Projection file does not exist: ${fileName}`); const metadata = JSON.parse(fs.readFileSync(fileName, 'utf8')); return Projection.fromMetadata(metadata, { ...options, fileName }); } + /** + * Recreate a projection instance from serialized metadata. + * @param {object} metadata + * @param {{ fileName?: string, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {Projection} + */ static fromMetadata(metadata, options = {}) { assert(metadata && typeof metadata === 'object', 'Invalid projection metadata.'); if (metadata.kind === 'composite-projection') { @@ -173,6 +223,13 @@ class Projection { return projection; } + /** + * Compose multiple projections into one composite projection. + * @param {string} name + * @param {object} projections + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {CompositeProjection} + */ static compose(name, projections, options = {}) { return new CompositeProjection(name, projections, options); } @@ -180,6 +237,11 @@ class Projection { class CompositeProjection extends Projection { + /** + * @param {string} name + * @param {object} projections + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + */ constructor(name, projections, options = {}) { assert(projections && typeof projections === 'object' && !Array.isArray(projections), 'CompositeProjection requires an object map of projections.'); const normalized = {}; @@ -209,6 +271,12 @@ class CompositeProjection extends Projection { return [...types]; } + /** + * Apply one event across all child projections and return composed state. + * @param {object} state + * @param {object} event + * @returns {object} + */ apply(state, event) { if (!this.matches(event)) { this.state = state; @@ -223,6 +291,10 @@ class CompositeProjection extends Projection { return nextState; } + /** + * Reset all child projections and rebuild composed state. + * @returns {object} + */ reset() { for (const projection of Object.values(this.projections)) { projection.reset(); @@ -233,6 +305,11 @@ class CompositeProjection extends Projection { return this.state; } + /** + * Serialize composed projection metadata recursively. + * @param {function(string): string} [hmac] + * @returns {object} + */ toMetadata(hmac = this.hmac) { return { kind: 'composite-projection', @@ -244,6 +321,12 @@ class CompositeProjection extends Projection { }; } + /** + * Restore a composed projection from serialized metadata. + * @param {object} metadata + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {CompositeProjection} + */ static fromMetadata(metadata, options = {}) { const hmac = options.hmac; const deserializeMatcher = (matcherMetadata) => { diff --git a/src/utils/fsUtil.js b/src/utils/fsUtil.js index 08ea7f86..17e38fce 100644 --- a/src/utils/fsUtil.js +++ b/src/utils/fsUtil.js @@ -2,6 +2,8 @@ import fs from 'fs'; import path from 'path'; import { mkdirpSync } from 'mkdirp'; +const SAFE_RELATIVE_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/; + // Best-effort cleanup for temporary files after interrupted/failed writes. function safeUnlink(fileName) { try { @@ -14,12 +16,15 @@ function safeUnlink(fileName) { } // Prevent partially written persistence files from replacing the last valid state. -function writeFileAtomic(fileName, data, options = {}) { +function writeFileAtomic(fileName, data, options = {}, onSuccess = null) { const tmpFileName = options.tmpFileName || `${fileName}.tmp`; const writeOptions = options.encoding ? { encoding: options.encoding } : undefined; try { fs.writeFileSync(tmpFileName, data, writeOptions); fs.renameSync(tmpFileName, fileName); + if (typeof onSuccess === 'function') { + onSuccess(); + } } catch (e) { safeUnlink(tmpFileName); throw e; @@ -27,6 +32,22 @@ function writeFileAtomic(fileName, data, options = {}) { return fileName; } +function isSafeRelativeName(name) { + return typeof name === 'string' + && name !== '' + && SAFE_RELATIVE_NAME_PATTERN.test(name); +} + +function resolvePathWithinRoot(rootDirectory, relativePath) { + const root = path.resolve(rootDirectory); + const resolvedPath = path.resolve(root, relativePath); + const rootRelativePath = path.relative(root, resolvedPath); + if (rootRelativePath.startsWith('..') || path.isAbsolute(rootRelativePath)) { + throw new Error(`Invalid relative path "${relativePath}".`); + } + return resolvedPath; +} + /** * Ensure that the given directory exists. * @param {string} dirName @@ -147,4 +168,6 @@ export { safeUnlink, writeFileAtomic, scanForFiles, + isSafeRelativeName, + resolvePathWithinRoot, }; diff --git a/src/utils/metadataUtil.js b/src/utils/metadataUtil.js index fa089fca..19764532 100644 --- a/src/utils/metadataUtil.js +++ b/src/utils/metadataUtil.js @@ -238,6 +238,7 @@ function matchesNode(buffer, startOffset, node) { export { createHmac, + isPlainObject, matches, buildMetadataHeader, buildMetadataForMatcher, diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index 760315f3..e53d8ddd 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -41,6 +41,14 @@ describe('Consumer', function() { expect(() => new Consumer(storage, 'foobar')).to.throwError(/identifier/); }); + it('throws for invalid consumer index names', function() { + expect(() => new Consumer(storage, '../foobar', 'consumer1')).to.throwError(/Invalid index name/); + }); + + it('throws for invalid consumer identifiers', function() { + expect(() => new Consumer(storage, 'foobar', '../consumer1')).to.throwError(/Invalid identifier/); + }); + it('creates consumer directory if not existing', function() { consumer = new Consumer(storage, 'foobar', 'consumer1'); expect(fs.existsSync(dataDirectory + '/consumers')).to.be(true); @@ -539,6 +547,26 @@ describe('Consumer', function() { storage.write({ type: 'Foobar', id: 3 }); }); + it('consumer.project persists a projection when attaching', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-project-method', { count: 0 }); + const projection = new Projection('consumer-project-method', { + initialState: { count: 0 }, + handlers: { + Foobar: (state, event) => ({ ...state, count: state.count + event.id }) + } + }, { hmac: createHmac('test-secret') }); + + consumer.project(projection); + expect(fs.existsSync(`${consumer.fileName}.projection`)).to.be(true); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(6); + done(); + }); + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + it('can attach and restore projections from event-type reducer maps', function(done) { consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', { count: 0 }); const projection = new Projection('consumer-projection-map', { diff --git a/test/util.spec.js b/test/util.spec.js index b9adc01e..6124af08 100644 --- a/test/util.spec.js +++ b/test/util.spec.js @@ -2,7 +2,7 @@ import expect from 'expect.js'; import fs from 'fs-extra'; import path from 'path'; import { iterate } from '../src/utils/util.js'; -import { scanForFiles } from '../src/utils/fsUtil.js'; +import { isSafeRelativeName, resolvePathWithinRoot, scanForFiles } from '../src/utils/fsUtil.js'; import { fileURLToPath } from 'url'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); @@ -90,6 +90,26 @@ describe('util', function() { }); }); + describe('file path helpers', function() { + + it('detects safe relative names', function() { + expect(isSafeRelativeName('stream-orders')).to.be(true); + expect(isSafeRelativeName('../orders')).to.be(false); + }); + + it('resolves paths within a root directory', function() { + const root = path.join(testDir, 'root'); + const resolved = resolvePathWithinRoot(root, 'a/b.file'); + expect(resolved.startsWith(path.resolve(root))).to.be(true); + }); + + it('rejects traversal paths outside of root directory', function() { + const root = path.join(testDir, 'root'); + expect(() => resolvePathWithinRoot(root, '../outside.file')).to.throwError(/Invalid relative path/); + }); + + }); + }); }); From ecc828265c9db48979faacf770e32788b211c5d1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Jun 2026 19:23:07 +0000 Subject: [PATCH 8/8] refactor projection composition and simplify consumer persist --- src/CompositeProjection.js | 129 +++++++++++++++++++++++++++++++++++++ src/Consumer.js | 7 +- src/Projection.js | 117 +-------------------------------- test/Consumer.spec.js | 18 +++--- 4 files changed, 142 insertions(+), 129 deletions(-) create mode 100644 src/CompositeProjection.js diff --git a/src/CompositeProjection.js b/src/CompositeProjection.js new file mode 100644 index 00000000..91ff9b9f --- /dev/null +++ b/src/CompositeProjection.js @@ -0,0 +1,129 @@ +import { assert } from './utils/util.js'; +import { buildMatcherFromMetadata, buildMetadataForMatcher } from './utils/metadataUtil.js'; + +/** + * Build the CompositeProjection class on top of the provided Projection base class. + * This avoids cyclic imports between Projection and CompositeProjection modules. + * + * @param {typeof import('./Projection.js').default} Projection + * @returns {typeof import('./Projection.js').CompositeProjection} + */ +function createCompositeProjectionClass(Projection) { + return class CompositeProjection extends Projection { + + /** + * @param {string} name + * @param {object} projections + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + */ + constructor(name, projections, options = {}) { + assert(projections && typeof projections === 'object' && !Array.isArray(projections), 'CompositeProjection requires an object map of projections.'); + const normalized = {}; + for (const [projectionName, projection] of Object.entries(projections)) { + normalized[projectionName] = projection instanceof Projection + ? projection + : new Projection(projectionName, projection, options); + } + super(name, { + initialState: Object.fromEntries( + Object.entries(normalized).map(([projectionName, projection]) => [projectionName, projection.initialState]) + ), + handlers: (state) => state, + matcher: options.matcher + }, options); + this.projections = normalized; + this.reset(); + } + + get types() { + const types = new Set(); + for (const projection of Object.values(this.projections)) { + for (const type of projection.types) { + types.add(type); + } + } + return [...types]; + } + + /** + * Apply one event across all child projections and return composed state. + * @param {object} state + * @param {object} event + * @returns {object} + */ + apply(state, event) { + if (!this.matches(event)) { + this.state = state; + return state; + } + const currentState = state || this.initialState; + const nextState = {}; + for (const [name, projection] of Object.entries(this.projections)) { + nextState[name] = projection.apply(currentState[name], event); + } + this.state = nextState; + return nextState; + } + + /** + * Reset all child projections and rebuild composed state. + * @returns {object} + */ + reset() { + for (const projection of Object.values(this.projections)) { + projection.reset(); + } + this.state = Object.fromEntries( + Object.entries(this.projections).map(([projectionName, projection]) => [projectionName, projection.state]) + ); + return this.state; + } + + /** + * Serialize composed projection metadata recursively. + * @param {function(string): string} [hmac] + * @returns {object} + */ + toMetadata(hmac = this.hmac) { + return { + kind: 'composite-projection', + name: this.name, + matcher: this.matcher ? buildMetadataForMatcher(this.matcher, hmac) : null, + projections: Object.fromEntries( + Object.entries(this.projections).map(([name, projection]) => [name, projection.toMetadata(hmac)]) + ) + }; + } + + /** + * Restore a composed projection from serialized metadata. + * @param {object} metadata + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {CompositeProjection} + */ + static fromMetadata(metadata, options = {}) { + const hmac = options.hmac; + const deserializeMatcher = (matcherMetadata) => { + if (!matcherMetadata) { + return undefined; + } + if (typeof matcherMetadata.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); + } + return buildMatcherFromMetadata(matcherMetadata, hmac); + }; + const projections = Object.fromEntries( + Object.entries(metadata.projections || {}).map(([name, projectionMetadata]) => [ + name, + Projection.fromMetadata(projectionMetadata, options) + ]) + ); + return new this(metadata.name, projections, { + ...options, + matcher: deserializeMatcher(metadata.matcher) + }); + } + }; +} + +export { createCompositeProjectionClass }; diff --git a/src/Consumer.js b/src/Consumer.js index 7d379942..18c15550 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -194,12 +194,7 @@ class Consumer extends stream.Readable { if (fs.existsSync(tmpFile)) { throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`); } - try { - writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }, () => this.emit('persisted', consumerState)); - } catch (e) { - /* c8 ignore next */ - safeUnlink(tmpFile); - } + writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }, () => this.emit('persisted', consumerState)); }); } diff --git a/src/Projection.js b/src/Projection.js index 5425efe7..a61b5569 100644 --- a/src/Projection.js +++ b/src/Projection.js @@ -3,6 +3,7 @@ import path from 'path'; import { assert } from './utils/util.js'; import { ensureDirectory, writeFileAtomic } from './utils/fsUtil.js'; import { buildMatcherFromMetadata, buildMetadataForMatcher, matches } from './utils/metadataUtil.js'; +import { createCompositeProjectionClass } from './CompositeProjection.js'; const DEFAULT_TYPE_ACCESSOR = (event) => event?.type || event?.payload?.type; @@ -235,121 +236,7 @@ class Projection { } } -class CompositeProjection extends Projection { - - /** - * @param {string} name - * @param {object} projections - * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] - */ - constructor(name, projections, options = {}) { - assert(projections && typeof projections === 'object' && !Array.isArray(projections), 'CompositeProjection requires an object map of projections.'); - const normalized = {}; - for (const [projectionName, projection] of Object.entries(projections)) { - normalized[projectionName] = projection instanceof Projection - ? projection - : new Projection(projectionName, projection, options); - } - super(name, { - initialState: Object.fromEntries( - Object.entries(normalized).map(([projectionName, projection]) => [projectionName, projection.initialState]) - ), - handlers: (state) => state, - matcher: options.matcher - }, options); - this.projections = normalized; - this.reset(); - } - - get types() { - const types = new Set(); - for (const projection of Object.values(this.projections)) { - for (const type of projection.types) { - types.add(type); - } - } - return [...types]; - } - - /** - * Apply one event across all child projections and return composed state. - * @param {object} state - * @param {object} event - * @returns {object} - */ - apply(state, event) { - if (!this.matches(event)) { - this.state = state; - return state; - } - const currentState = state || this.initialState; - const nextState = {}; - for (const [name, projection] of Object.entries(this.projections)) { - nextState[name] = projection.apply(currentState[name], event); - } - this.state = nextState; - return nextState; - } - - /** - * Reset all child projections and rebuild composed state. - * @returns {object} - */ - reset() { - for (const projection of Object.values(this.projections)) { - projection.reset(); - } - this.state = Object.fromEntries( - Object.entries(this.projections).map(([projectionName, projection]) => [projectionName, projection.state]) - ); - return this.state; - } - - /** - * Serialize composed projection metadata recursively. - * @param {function(string): string} [hmac] - * @returns {object} - */ - toMetadata(hmac = this.hmac) { - return { - kind: 'composite-projection', - name: this.name, - matcher: this.matcher ? buildMetadataForMatcher(this.matcher, hmac) : null, - projections: Object.fromEntries( - Object.entries(this.projections).map(([name, projection]) => [name, projection.toMetadata(hmac)]) - ) - }; - } - - /** - * Restore a composed projection from serialized metadata. - * @param {object} metadata - * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] - * @returns {CompositeProjection} - */ - static fromMetadata(metadata, options = {}) { - const hmac = options.hmac; - const deserializeMatcher = (matcherMetadata) => { - if (!matcherMetadata) { - return undefined; - } - if (typeof matcherMetadata.matcher === 'string') { - assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); - } - return buildMatcherFromMetadata(matcherMetadata, hmac); - }; - const projections = Object.fromEntries( - Object.entries(metadata.projections || {}).map(([name, projectionMetadata]) => [ - name, - Projection.fromMetadata(projectionMetadata, options) - ]) - ); - return new CompositeProjection(metadata.name, projections, { - ...options, - matcher: deserializeMatcher(metadata.matcher) - }); - } -} +const CompositeProjection = createCompositeProjectionClass(Projection); export default Projection; export { CompositeProjection }; diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index e53d8ddd..f8d36528 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -355,26 +355,28 @@ describe('Consumer', function() { }, 15); }); - it('swallows persistence write errors and removes temp files', function(done) { + it('rethrows persistence write errors and removes temp files', function() { consumer = new Consumer(storage, 'foobar', 'consumer-1'); consumer.position = 1; consumer.state = Object.freeze({ foo: 1 }); const tmpFile = consumer.fileName + '.1'; const originalWriteFileSync = fsNative.writeFileSync; + const originalSetImmediate = global.setImmediate; + global.setImmediate = (fn) => { + fn(); + return null; + }; fsNative.writeFileSync = () => { const error = new Error('disk full'); error.code = 'ENOSPC'; throw error; }; - consumer.persist(); - - setTimeout(() => { - fsNative.writeFileSync = originalWriteFileSync; - expect(fs.existsSync(tmpFile)).to.be(false); - done(); - }, 15); + expect(() => consumer.persist()).to.throwError(/disk full/); + fsNative.writeFileSync = originalWriteFileSync; + global.setImmediate = originalSetImmediate; + expect(fs.existsSync(tmpFile)).to.be(false); }); it('restores state after reopening', function(done) {