diff --git a/docs/api.md b/docs/api.md index 33a7041..f7aeef6 100644 --- a/docs/api.md +++ b/docs/api.md @@ -267,6 +267,41 @@ Asynchronously scan all consumer state files and return their identifiers. --- +#### `eventstore.getProjection(name, [handlers], [initialState], [matcher])` + +```javascript +eventstore.getProjection(name [, handlers] [, initialState] [, matcher]) → Projection +``` + +Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), or restore a previously persisted one when `handlers` is omitted. + +- `handlers`: reducer function `(state, event) => state` or map `{ [eventType]: reducer }` +- `initialState`: initial projection state (default `{}`) +- `matcher`: optional object/function matcher (same shape as stream/query matchers) + + +--- + +#### `consumer.project(projection)` + +```javascript +consumer.project(projection) +``` + +Attach a projection-like object (`apply(state, event)`) as the consumer `'data'` handler. + +--- + +#### `projection.subscribe(consumer)` + +```javascript +projection.subscribe(consumer) +``` + +Attach this projection to the consumer (same wiring behavior as `consumer.project(projection)`) and persist its definition next to the consumer state file so `eventstore.getConsumer(...)` can restore and reconnect it automatically. + +--- + ### Events emitted | Event | Payload | Description | diff --git a/docs/consumers.md b/docs/consumers.md index 9b24b4b..411c669 100644 --- a/docs/consumers.md +++ b/docs/consumers.md @@ -69,6 +69,36 @@ consumer.setState({ count: 0 }); consumer.setState((state) => ({ ...state, count: state.count + 1 })); ``` +## Projections + +Use a `Projection` to define *how* events are projected into state, then connect it to a `Consumer` for durable continuous updates. + +```javascript +const projection = eventstore.getProjection('orders-total', { + OrderCreated: (state, event) => ({ ...state, total: state.total + (event.payload.amount || 0) }) +}, { total: 0 }); + +const consumer = eventstore.getConsumer('orders', 'orders-projection', projection.initialState); +projection.subscribe(consumer); +``` + +Projections are composable via `CompositeProjection`: + +```javascript +import { CompositeProjection, Projection } from 'event-storage'; + +const count = new Projection('count', { + initialState: 0, + handlers: { OrderCreated: (state) => state + 1 } +}); + +const overview = new CompositeProjection('overview', { + count, + last: { initialState: null, handlers: { OrderCreated: (state, event) => event.payload } } +}); +// overview.state -> { count: number, last: object|null } +``` + ## Resetting a Consumer Force the consumer to reprocess events from a given position: diff --git a/index.js b/index.js index b6d72de..aceb8eb 100644 --- a/index.js +++ b/index.js @@ -3,4 +3,5 @@ export { default as EventStream } from './src/EventStream.js'; export { default as Storage, StorageLockedError } from './src/Storage.js'; export { default as Index } from './src/Index.js'; export { default as Consumer } from './src/Consumer.js'; +export { default as Projection, CompositeProjection } from './src/Projection.js'; export { matches, buildRawBufferMatcher } from './src/utils/metadataUtil.js'; diff --git a/src/CompositeProjection.js b/src/CompositeProjection.js new file mode 100644 index 0000000..91ff9b9 --- /dev/null +++ b/src/CompositeProjection.js @@ -0,0 +1,129 @@ +import { assert } from './utils/util.js'; +import { buildMatcherFromMetadata, buildMetadataForMatcher } from './utils/metadataUtil.js'; + +/** + * Build the CompositeProjection class on top of the provided Projection base class. + * This avoids cyclic imports between Projection and CompositeProjection modules. + * + * @param {typeof import('./Projection.js').default} Projection + * @returns {typeof import('./Projection.js').CompositeProjection} + */ +function createCompositeProjectionClass(Projection) { + return class CompositeProjection extends Projection { + + /** + * @param {string} name + * @param {object} projections + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + */ + constructor(name, projections, options = {}) { + assert(projections && typeof projections === 'object' && !Array.isArray(projections), 'CompositeProjection requires an object map of projections.'); + const normalized = {}; + for (const [projectionName, projection] of Object.entries(projections)) { + normalized[projectionName] = projection instanceof Projection + ? projection + : new Projection(projectionName, projection, options); + } + super(name, { + initialState: Object.fromEntries( + Object.entries(normalized).map(([projectionName, projection]) => [projectionName, projection.initialState]) + ), + handlers: (state) => state, + matcher: options.matcher + }, options); + this.projections = normalized; + this.reset(); + } + + get types() { + const types = new Set(); + for (const projection of Object.values(this.projections)) { + for (const type of projection.types) { + types.add(type); + } + } + return [...types]; + } + + /** + * Apply one event across all child projections and return composed state. + * @param {object} state + * @param {object} event + * @returns {object} + */ + apply(state, event) { + if (!this.matches(event)) { + this.state = state; + return state; + } + const currentState = state || this.initialState; + const nextState = {}; + for (const [name, projection] of Object.entries(this.projections)) { + nextState[name] = projection.apply(currentState[name], event); + } + this.state = nextState; + return nextState; + } + + /** + * Reset all child projections and rebuild composed state. + * @returns {object} + */ + reset() { + for (const projection of Object.values(this.projections)) { + projection.reset(); + } + this.state = Object.fromEntries( + Object.entries(this.projections).map(([projectionName, projection]) => [projectionName, projection.state]) + ); + return this.state; + } + + /** + * Serialize composed projection metadata recursively. + * @param {function(string): string} [hmac] + * @returns {object} + */ + toMetadata(hmac = this.hmac) { + return { + kind: 'composite-projection', + name: this.name, + matcher: this.matcher ? buildMetadataForMatcher(this.matcher, hmac) : null, + projections: Object.fromEntries( + Object.entries(this.projections).map(([name, projection]) => [name, projection.toMetadata(hmac)]) + ) + }; + } + + /** + * Restore a composed projection from serialized metadata. + * @param {object} metadata + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {CompositeProjection} + */ + static fromMetadata(metadata, options = {}) { + const hmac = options.hmac; + const deserializeMatcher = (matcherMetadata) => { + if (!matcherMetadata) { + return undefined; + } + if (typeof matcherMetadata.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); + } + return buildMatcherFromMetadata(matcherMetadata, hmac); + }; + const projections = Object.fromEntries( + Object.entries(metadata.projections || {}).map(([name, projectionMetadata]) => [ + name, + Projection.fromMetadata(projectionMetadata, options) + ]) + ); + return new this(metadata.name, projections, { + ...options, + matcher: deserializeMatcher(metadata.matcher) + }); + } + }; +} + +export { createCompositeProjectionClass }; diff --git a/src/Consumer.js b/src/Consumer.js index 58411b5..18c1555 100644 --- a/src/Consumer.js +++ b/src/Consumer.js @@ -2,26 +2,11 @@ import stream from 'stream'; import fs from 'fs'; import path from 'path'; import { assert } from './utils/util.js'; -import { ensureDirectory } from './utils/fsUtil.js'; +import { ensureDirectory, isSafeRelativeName, resolvePathWithinRoot, safeUnlink, writeFileAtomic } from './utils/fsUtil.js'; import { normalizeConsumerStateArgs } from './utils/apiHelpers.js'; import Storage from './Storage/ReadableStorage.js'; const MAX_CATCHUP_BATCH = 10; -/** - * Safely unlink a file and ignore if it doesn't exist. - * @param {string} filename - */ -const safeUnlink = (filename) => { - /* c8 ignore next */ - try { - fs.unlinkSync(filename); - } catch (e) { - if (e.code !== "ENOENT") { - throw e; - } - } -}; - /** * Implements an event-driven durable Consumer that provides at-least-once delivery semantics or exactly-once processing semantics if only using setState(). */ @@ -54,11 +39,15 @@ class Consumer extends stream.Readable { * @param {string} identifier The unique name to identify this consumer. */ initializeStorage(storage, indexName, identifier) { + assert(indexName === '_all' || isSafeRelativeName(indexName), `Invalid index name "${indexName}" for consumer.`); + assert(isSafeRelativeName(identifier), `Invalid identifier "${identifier}" for consumer.`); this.storage = storage; this.index = this.storage.openIndex(indexName); this.indexName = indexName; + this.identifier = identifier; const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers'); - this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier); + this.fileName = resolvePathWithinRoot(consumerDirectory, `${this.storage.storageFile}.${indexName}.${identifier}`); + ensureDirectory(path.dirname(this.fileName)); if (ensureDirectory(consumerDirectory)) { this.cleanUpFailedWrites(); } @@ -69,11 +58,15 @@ class Consumer extends stream.Readable { * @private */ cleanUpFailedWrites() { - const consumerNamePrefix = path.basename(this.fileName) + '.'; + const consumerBaseName = path.basename(this.fileName); const consumerDirectory = path.dirname(this.fileName); const files = fs.readdirSync(consumerDirectory); for (let file of files) { - if (file.startsWith(consumerNamePrefix)) { + if (!file.startsWith(consumerBaseName + '.')) { + continue; + } + const suffix = file.slice(consumerBaseName.length + 1); + if (/^\d+$/.test(suffix)) { safeUnlink(path.join(consumerDirectory, file)); } } @@ -104,6 +97,32 @@ class Consumer extends stream.Readable { this.consuming = false; } + /** + * Register a projection as `data` event handler. + * @api + * @param {{ apply: function(object, object): object }} projection + */ + project(projection) { + assert(projection && typeof projection.apply === 'function', 'Projection must implement apply(state, event).'); + const projectionFileName = this.fileName ? `${this.fileName}.projection` : null; + const isAlreadySubscribed = this.projection === projection; + const isAlreadyPersisted = projectionFileName + && projection.fileName === projectionFileName + && fs.existsSync(projectionFileName); + if (this.projectionHandler) { + this.removeListener('data', this.projectionHandler); + } + this.projection = projection; + this.projectionHandler = (event) => { + this.setState(projection.apply(this.state, event)); + }; + this.on('data', this.projectionHandler); + if (!isAlreadySubscribed && !isAlreadyPersisted && typeof projection.persist === 'function') { + projection.persist({ fileName: projectionFileName || projection.fileName }); + } + return this; + } + /** * Update the state of this consumer transactionally with the position. * May only be called from within the document handling callback. @@ -118,6 +137,9 @@ class Consumer extends stream.Readable { if (typeof newState === 'function') { newState = newState(this.state); } + if (this.state === newState) { + return; + } this.state = Object.freeze(newState); this.doPersist = persist; } @@ -172,15 +194,7 @@ class Consumer extends stream.Readable { if (fs.existsSync(tmpFile)) { throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`); } - try { - fs.writeFileSync(tmpFile, consumerData); - // If the write fails (half-way), the consumer state file will not be corrupted - fs.renameSync(tmpFile, this.fileName); - this.emit('persisted', consumerState); - } catch (e) { - /* c8 ignore next */ - safeUnlink(tmpFile); - } + writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }, () => this.emit('persisted', consumerState)); }); } diff --git a/src/EventStore.js b/src/EventStore.js index 848fc9e..13494c9 100644 --- a/src/EventStore.js +++ b/src/EventStore.js @@ -6,9 +6,10 @@ import events from 'events'; import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js'; import Index from './Index.js'; import Consumer from './Consumer.js'; +import Projection from './Projection.js'; import { assert, getPropertyAtPath } from './utils/util.js'; -import { ensureDirectory, scanForFiles } from './utils/fsUtil.js'; -import { buildTypeMatcherFn } from './utils/metadataUtil.js'; +import { ensureDirectory, isSafeRelativeName, scanForFiles } from './utils/fsUtil.js'; +import { buildTypeMatcherFn, isPlainObject } from './utils/metadataUtil.js'; import { fixCommitArgumentTypes, parseStreamFromIndexName, normalizePredicateRaw } from './utils/apiHelpers.js'; const ExpectedVersion = { @@ -20,7 +21,6 @@ const ExpectedVersion = { * Default matcher property paths mirroring the Storage default, used for index optimization. */ const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type']; -const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/; const STORAGE_HOOK_EVENTS = new Set(['preCommit', 'preRead']); class OptimisticConcurrencyError extends Error {} @@ -120,6 +120,9 @@ class EventStore extends events.EventEmitter { } } + this.projectionTypeAccessor = this.typeAccessor + ? (event) => this.typeAccessor(event?.payload || event) + : undefined; this.initialize(storeName, storageConfig); } @@ -405,7 +408,7 @@ class EventStore extends events.EventEmitter { return null; } assert(typeof type === 'string', 'typeAccessor must return a string.'); - assert(STREAM_NAME_PATTERN.test(type), `typeAccessor must return a valid stream name. Got: "${type}"`); + assert(isSafeRelativeName(type), `typeAccessor must return a valid stream name. Got: "${type}"`); return type; } @@ -777,11 +780,44 @@ class EventStore extends events.EventEmitter { existingConsumer.stop(); } const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since); + const consumerProjectionFileName = `${consumer.fileName}.projection`; + if (fs.existsSync(consumerProjectionFileName)) { + Projection.restoreFromFile(consumerProjectionFileName, { + hmac: this.storage.hmac, + typeAccessor: this.projectionTypeAccessor + }).subscribe(consumer); + } consumer.streamName = streamName; this.consumers.set(identifier, consumer); return consumer; } + /** + * Get or create a projection with EventStore defaults. + * + * @param {string} name Projection name. + * @param {function(object, object): object|object} [handlers] Projection handlers (reducer fn or reducer map). + * @param {object} [initialState={}] Projection initial state. + * @param {object|function(object): boolean} [matcher] Optional projection matcher. + * @returns {Projection} + */ + getProjection(name, handlers, initialState = {}, matcher) { + assert(typeof name === 'string' && name !== '', 'Must provide a projection name.'); + const projectionFileName = path.join(this.storage.indexDirectory, 'projections', this.storage.storageFile + '.' + name + '.projection'); + const projectionOptions = { + fileName: projectionFileName, + hmac: this.storage.hmac, + typeAccessor: this.projectionTypeAccessor + }; + if (handlers !== undefined) { + const definition = isProjectionDefinitionObject(handlers) + ? handlers + : { handlers, initialState, matcher }; + return new Projection(name, definition, projectionOptions); + } + return Projection.restore(name, projectionOptions); + } + /** * Scan the existing consumers on this EventStore and asynchronously invoke a callback with the parsed list. * @@ -822,6 +858,10 @@ class EventStore extends events.EventEmitter { } +function isProjectionDefinitionObject(value) { + return isPlainObject(value) && Object.hasOwn(value, 'handlers'); +} + EventStore.Storage = Storage; EventStore.Index = Index; diff --git a/src/Projection.js b/src/Projection.js new file mode 100644 index 0000000..a61b556 --- /dev/null +++ b/src/Projection.js @@ -0,0 +1,242 @@ +import fs from 'fs'; +import path from 'path'; +import { assert } from './utils/util.js'; +import { ensureDirectory, writeFileAtomic } from './utils/fsUtil.js'; +import { buildMatcherFromMetadata, buildMetadataForMatcher, matches } from './utils/metadataUtil.js'; +import { createCompositeProjectionClass } from './CompositeProjection.js'; + +const DEFAULT_TYPE_ACCESSOR = (event) => event?.type || event?.payload?.type; + + +class Projection { + + /** + * @param {string} name Projection name. + * @param {{ initialState?: object, handlers: function(object, object): object|object, matcher?: object|function(object): boolean }} [definition] + * @param {{ hmac?: function(string): string, typeAccessor?: function(object): string, fileName?: string }} [options] + */ + constructor(name, definition = {}, options = {}) { + assert(typeof name === 'string' && name !== '', 'Projection must have a name.'); + const { initialState = {}, handlers, matcher } = definition; + assert((typeof handlers === 'function') || (handlers && typeof handlers === 'object' && !Array.isArray(handlers)), 'Projection handlers must be a function or an object map of functions.'); + if (typeof handlers === 'object') { + for (const reducer of Object.values(handlers)) { + assert(typeof reducer === 'function', 'Projection handler maps must contain reducer functions.'); + } + } + this.name = name; + this.initialState = Object.freeze(initialState); + this.handlers = handlers; + this.matcher = matcher; + this.hmac = options.hmac || null; + this.typeAccessor = options.typeAccessor || DEFAULT_TYPE_ACCESSOR; + this.fileName = options.fileName || null; + this.state = this.initialState; + } + + get types() { + if (typeof this.handlers === 'function') { + return []; + } + return Object.keys(this.handlers); + } + + /** + * Apply one event to the provided state and return the next state. + * @param {*} state + * @param {object} event + * @returns {*} + */ + apply(state, event) { + if (!this.matches(event)) { + this.state = state; + return state; + } + let reducer = this.handlers; + if (typeof this.handlers === 'object') { + reducer = this.handlers[this.typeAccessor(event)]; + if (typeof reducer !== 'function') { + this.state = state; + return state; + } + } + const nextState = reducer(state, event); + this.state = nextState; + return nextState; + } + + /** + * Reset to initialState and project all events from the given iterable stream. + * @param {Iterable} stream + * @returns {*} + */ + handle(stream) { + this.reset(); + for (const event of stream) { + this.state = this.apply(this.state, event); + } + return this.state; + } + + /** + * Reset current projection state to its initial state. + * @returns {*} + */ + reset() { + this.state = this.initialState; + return this.state; + } + + /** + * Check whether an event matches this projection's matcher definition. + * @param {object} event + * @returns {boolean} + */ + matches(event) { + if (!this.matcher) { + return true; + } + if (typeof this.matcher === 'function') { + return this.matcher(event); + } + return matches(event, this.matcher); + } + + /** + * Subscribe this projection to a consumer and persist when needed. + * @param {{ project: function(Projection): object }} consumer + * @returns {Projection} + */ + subscribe(consumer) { + assert(consumer && typeof consumer.project === 'function', 'Projection.subscribe expects a Consumer instance.'); + consumer.project(this); + return this; + } + + /** + * Persist projection definition metadata to disk. + * @param {{ hmac?: function(string): string, fileName?: string }} [options] + * @returns {string} Persisted file name. + */ + persist(options = {}) { + const hmac = options.hmac || this.hmac; + const fileName = options.fileName || this.fileName || `${this.name}.projection`; + const metadata = this.toMetadata(hmac); + const tmpFile = fileName + '.tmp'; + ensureDirectory(path.dirname(fileName)); + writeFileAtomic(fileName, JSON.stringify(metadata), { + tmpFileName: tmpFile, + encoding: 'utf8' + }); + this.fileName = fileName; + this.hmac = hmac; + return fileName; + } + + /** + * Serialize this projection definition into trusted metadata. + * @param {function(string): string} [hmac] + * @returns {object} + */ + toMetadata(hmac = this.hmac) { + const serializeFn = (fn) => { + assert(typeof hmac === 'function', 'Must provide options.hmac for function projections.'); + return buildMetadataForMatcher(fn, hmac); + }; + const matcherMetadata = this.matcher ? ( + typeof this.matcher === 'function' ? serializeFn(this.matcher) : buildMetadataForMatcher(this.matcher, hmac) + ) : null; + const handlersMetadata = (typeof this.handlers === 'function') + ? serializeFn(this.handlers) + : Object.fromEntries( + Object.entries(this.handlers).map(([eventType, reducer]) => [eventType, serializeFn(reducer)]) + ); + return { + kind: 'projection', + name: this.name, + initialState: this.initialState, + matcher: matcherMetadata, + handlersKind: typeof this.handlers === 'function' ? 'function' : 'map', + handlers: handlersMetadata + }; + } + + /** + * Restore a projection by name from default or configured file location. + * @param {string} name + * @param {{ fileName?: string, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {Projection} + */ + static restore(name, options = {}) { + assert(typeof name === 'string' && name !== '', 'Projection.restore requires a projection name.'); + const fileName = options.fileName || `${name}.projection`; + return Projection.restoreFromFile(fileName, options); + } + + /** + * Restore a projection from an explicit metadata file path. + * @param {string} fileName + * @param {{ hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {Projection} + */ + static restoreFromFile(fileName, options = {}) { + assert(fs.existsSync(fileName), `Projection file does not exist: ${fileName}`); + const metadata = JSON.parse(fs.readFileSync(fileName, 'utf8')); + return Projection.fromMetadata(metadata, { ...options, fileName }); + } + + /** + * Recreate a projection instance from serialized metadata. + * @param {object} metadata + * @param {{ fileName?: string, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {Projection} + */ + static fromMetadata(metadata, options = {}) { + assert(metadata && typeof metadata === 'object', 'Invalid projection metadata.'); + if (metadata.kind === 'composite-projection') { + return CompositeProjection.fromMetadata(metadata, options); + } + assert(metadata.kind === 'projection', 'Invalid projection metadata kind.'); + const hmac = options.hmac; + const deserialize = (matcherMetadata) => { + if (!matcherMetadata) { + return undefined; + } + if (typeof matcherMetadata.matcher === 'string') { + assert(typeof hmac === 'function', 'Must provide options.hmac to restore function projections.'); + } + return buildMatcherFromMetadata(matcherMetadata, hmac); + }; + const handlers = metadata.handlersKind === 'function' + ? deserialize(metadata.handlers) + : Object.fromEntries( + Object.entries(metadata.handlers || {}).map(([eventType, reducerMetadata]) => [eventType, deserialize(reducerMetadata)]) + ); + const projection = new Projection(metadata.name, { + initialState: metadata.initialState, + matcher: deserialize(metadata.matcher), + handlers + }, { + ...options, + fileName: options.fileName || null + }); + projection.reset(); + return projection; + } + + /** + * Compose multiple projections into one composite projection. + * @param {string} name + * @param {object} projections + * @param {{ matcher?: object|function(object): boolean, hmac?: function(string): string, typeAccessor?: function(object): string }} [options] + * @returns {CompositeProjection} + */ + static compose(name, projections, options = {}) { + return new CompositeProjection(name, projections, options); + } +} + +const CompositeProjection = createCompositeProjectionClass(Projection); + +export default Projection; +export { CompositeProjection }; diff --git a/src/utils/fsUtil.js b/src/utils/fsUtil.js index f84bde5..a718bdb 100644 --- a/src/utils/fsUtil.js +++ b/src/utils/fsUtil.js @@ -2,6 +2,52 @@ import fs from 'fs'; import path from 'path'; import { mkdirpSync } from 'mkdirp'; +const SAFE_RELATIVE_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/; + +// Best-effort cleanup for temporary files after interrupted/failed writes. +function safeUnlink(fileName) { + try { + fs.unlinkSync(fileName); + } catch (e) { + if (e.code !== 'ENOENT') { + throw e; + } + } +} + +// Prevent partially written persistence files from replacing the last valid state. +function writeFileAtomic(fileName, data, options = {}, onSuccess = null) { + const tmpFileName = options.tmpFileName || `${fileName}.tmp`; + const writeOptions = options.encoding ? { encoding: options.encoding } : undefined; + try { + fs.writeFileSync(tmpFileName, data, writeOptions); + fs.renameSync(tmpFileName, fileName); + if (typeof onSuccess === 'function') { + onSuccess(); + } + } catch (e) { + safeUnlink(tmpFileName); + throw e; + } + return fileName; +} + +function isSafeRelativeName(name) { + return typeof name === 'string' + && name !== '' + && SAFE_RELATIVE_NAME_PATTERN.test(name); +} + +function resolvePathWithinRoot(rootDirectory, relativePath) { + const root = path.resolve(rootDirectory); + const resolvedPath = path.resolve(root, relativePath); + const rootRelativePath = path.relative(root, resolvedPath); + if (rootRelativePath.startsWith('..') || path.isAbsolute(rootRelativePath)) { + throw new Error(`Invalid relative path "${relativePath}".`); + } + return resolvedPath; +} + /** * Ensure that the given directory exists. * @param {string} dirName @@ -119,5 +165,9 @@ function scanForFiles(directory, regexPattern, onEach, onDone) { export { ensureDirectory, + safeUnlink, + writeFileAtomic, scanForFiles, + isSafeRelativeName, + resolvePathWithinRoot, }; diff --git a/src/utils/metadataUtil.js b/src/utils/metadataUtil.js index 3316bf3..558406c 100644 --- a/src/utils/metadataUtil.js +++ b/src/utils/metadataUtil.js @@ -387,6 +387,7 @@ function matchesOperatorInBuffer(buffer, startOffset, child) { export { createHmac, + isPlainObject, matches, buildMetadataHeader, buildMetadataForMatcher, diff --git a/test/Consumer.spec.js b/test/Consumer.spec.js index a5de02b..f8d3652 100644 --- a/test/Consumer.spec.js +++ b/test/Consumer.spec.js @@ -3,6 +3,8 @@ import fs from 'fs-extra'; import fsNative from 'fs'; import Storage from '../src/Storage.js'; import Consumer from '../src/Consumer.js'; +import Projection, { CompositeProjection } from '../src/Projection.js'; +import { createHmac } from '../src/utils/metadataUtil.js'; import { fileURLToPath } from 'url'; const __dirname = fileURLToPath(new URL('.', import.meta.url)); @@ -39,6 +41,14 @@ describe('Consumer', function() { expect(() => new Consumer(storage, 'foobar')).to.throwError(/identifier/); }); + it('throws for invalid consumer index names', function() { + expect(() => new Consumer(storage, '../foobar', 'consumer1')).to.throwError(/Invalid index name/); + }); + + it('throws for invalid consumer identifiers', function() { + expect(() => new Consumer(storage, 'foobar', '../consumer1')).to.throwError(/Invalid identifier/); + }); + it('creates consumer directory if not existing', function() { consumer = new Consumer(storage, 'foobar', 'consumer1'); expect(fs.existsSync(dataDirectory + '/consumers')).to.be(true); @@ -345,26 +355,28 @@ describe('Consumer', function() { }, 15); }); - it('swallows persistence write errors and removes temp files', function(done) { + it('rethrows persistence write errors and removes temp files', function() { consumer = new Consumer(storage, 'foobar', 'consumer-1'); consumer.position = 1; consumer.state = Object.freeze({ foo: 1 }); const tmpFile = consumer.fileName + '.1'; const originalWriteFileSync = fsNative.writeFileSync; + const originalSetImmediate = global.setImmediate; + global.setImmediate = (fn) => { + fn(); + return null; + }; fsNative.writeFileSync = () => { const error = new Error('disk full'); error.code = 'ENOSPC'; throw error; }; - consumer.persist(); - - setTimeout(() => { - fsNative.writeFileSync = originalWriteFileSync; - expect(fs.existsSync(tmpFile)).to.be(false); - done(); - }, 15); + expect(() => consumer.persist()).to.throwError(/disk full/); + fsNative.writeFileSync = originalWriteFileSync; + global.setImmediate = originalSetImmediate; + expect(fs.existsSync(tmpFile)).to.be(false); }); it('restores state after reopening', function(done) { @@ -519,6 +531,132 @@ describe('Consumer', function() { }); }); + it('can attach projections from a reducer function', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-projection', { count: 0 }); + new Projection('consumer-projection', { + initialState: { count: 0 }, + handlers: (state, event) => ({ ...state, count: state.count + event.id }) + }, { + hmac: createHmac('test-secret') + }).subscribe(consumer); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(6); + done(); + }); + + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('consumer.project persists a projection when attaching', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-project-method', { count: 0 }); + const projection = new Projection('consumer-project-method', { + initialState: { count: 0 }, + handlers: { + Foobar: (state, event) => ({ ...state, count: state.count + event.id }) + } + }, { hmac: createHmac('test-secret') }); + + consumer.project(projection); + expect(fs.existsSync(`${consumer.fileName}.projection`)).to.be(true); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(6); + done(); + }); + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('can attach and restore projections from event-type reducer maps', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', { count: 0 }); + const projection = new Projection('consumer-projection-map', { + initialState: { count: 0 }, + handlers: { + Foobar: (state, event) => ({ ...state, count: state.count + event.id }), + Bazinga: (state) => state + } + }, { hmac: createHmac('test-secret') }); + projection.subscribe(consumer); + + consumer.on('caught-up', () => { + consumer.stop(); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-map', {}); + Projection.restoreFromFile(`${consumer.fileName}.projection`, { + hmac: createHmac('test-secret') + }).subscribe(consumer); + consumer.on('progress', () => { + if (consumer.state.count === 10) { + done(); + } + }); + storage.write({ type: 'Foobar', id: 4 }); + }); + + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('throws if function projection is restored without trusted hmac', function() { + consumer = new Consumer(storage, 'foobar', 'consumer-projection-hmac'); + new Projection('consumer-projection-hmac', { + initialState: {}, + handlers: (state, event) => ({ ...state, lastId: event.id }) + }, { + hmac: createHmac('test-secret') + }).subscribe(consumer); + expect(() => Projection.restoreFromFile(`${consumer.fileName}.projection`, { + hmac: createHmac('wrong-secret') + })).to.throwError(/Invalid HMAC/); + }); + + it('can attach a projection instance and restore it on reopen', function(done) { + consumer = new Consumer(storage, 'foobar', 'consumer-projection-instance', { count: 0 }); + const projection = new Projection('consumer-projection-instance', { + initialState: { count: 0 }, + handlers: { + Foobar: (state, event) => ({ ...state, count: state.count + event.id }) + } + }, { + hmac: createHmac('test-secret') + }); + projection.subscribe(consumer); + consumer.on('caught-up', () => { + expect(consumer.state.count).to.be(6); + consumer.stop(); + consumer = new Consumer(storage, 'foobar', 'consumer-projection-instance', {}); + Projection.restoreFromFile(`${consumer.fileName}.projection`, { + hmac: createHmac('test-secret') + }).subscribe(consumer); + consumer.on('progress', () => { + if (consumer.state.count === 10) { + done(); + } + }); + storage.write({ type: 'Foobar', id: 4 }); + }); + storage.write({ type: 'Foobar', id: 1 }); + storage.write({ type: 'Foobar', id: 2 }); + storage.write({ type: 'Foobar', id: 3 }); + }); + + it('supports composite projections', function() { + const projection = new CompositeProjection('overview', { + count: { + initialState: 0, + handlers: { Foobar: (state) => state + 1 } + }, + last: { + initialState: null, + handlers: { Foobar: (state, event) => event.id || state } + } + }); + projection.handle([{ type: 'Foobar', id: 1 }, { type: 'Foobar', id: 2 }]); + expect(projection.state).to.eql({ count: 2, last: 2 }); + }); + it('can build consistency guards (aggregates)', function(done) { const guard = new Consumer(storage, 'foobar', 'unique-bar-guard'); guard.apply = function(event) { diff --git a/test/EventStore.spec.js b/test/EventStore.spec.js index 970f47e..423907c 100644 --- a/test/EventStore.spec.js +++ b/test/EventStore.spec.js @@ -4,6 +4,8 @@ import fsSync from 'fs'; import path from 'path'; import EventStore, { ExpectedVersion, OptimisticConcurrencyError, CommitCondition, LOCK_RECLAIM } from '../src/EventStore.js'; import Consumer from '../src/Consumer.js'; +import Projection from '../src/Projection.js'; +import { createHmac } from '../src/utils/metadataUtil.js'; import { fileURLToPath } from 'url'; const __dirname = fileURLToPath(new URL('.', import.meta.url)); @@ -1378,6 +1380,74 @@ describe('EventStore', function() { eventstore.commit('bar', { foo: 'baz', id: 2 }); }); + it('restores a projected consumer with eventStore typeAccessor defaults', function(done) { + eventstore = new EventStore({ + storageDirectory, + typeAccessor: 'type', + storageConfig: { + hmacSecret: 'test-secret' + } + }); + eventstore.createEventStream('user-stream', (event) => event.stream === 'user-stream'); + + const consumer = eventstore.getConsumer('user-stream', 'user-counter', { count: 0 }); + new Projection('user-counter', { + initialState: { count: 0 }, + handlers: { + UserCreated: (state) => ({ ...state, count: state.count + 1 }) + } + }, { + hmac: eventstore.storage.hmac + }).subscribe(consumer); + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 1 }]); + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 2 }]); + + consumer.on('progress', () => { + if (consumer.state.count !== 2) { + return; + } + eventstore.close(); + eventstore = new EventStore({ + storageDirectory, + typeAccessor: 'type', + storageConfig: { + hmacSecret: 'test-secret' + } + }); + const reopened = eventstore.getConsumer('user-stream', 'user-counter', { count: 0 }); + reopened.on('progress', () => { + if (reopened.state.count === 3) { + done(); + } + }); + eventstore.commit('user-stream', [{ type: 'UserCreated', id: 3 }]); + }); + }); + }); + + describe('getProjection', function() { + + it('creates and restores persisted projections with EventStore defaults', function() { + eventstore = new EventStore({ + storageDirectory, + typeAccessor: 'type', + storageConfig: { + hmacSecret: 'test-secret' + } + }); + const projection = eventstore.getProjection('user-count', { + UserCreated: (state) => state + 1 + }, 0); + projection.persist(); + + const restored = eventstore.getProjection('user-count'); + const state = restored.handle([ + { payload: { type: 'UserCreated' } }, + { payload: { type: 'UserCreated' } } + ]); + expect(state).to.be(2); + }); + it('rebinds an existing identifier to a different stream when requested', function(done) { eventstore = new EventStore({ storageDirectory diff --git a/test/util.spec.js b/test/util.spec.js index b9adc01..6124af0 100644 --- a/test/util.spec.js +++ b/test/util.spec.js @@ -2,7 +2,7 @@ import expect from 'expect.js'; import fs from 'fs-extra'; import path from 'path'; import { iterate } from '../src/utils/util.js'; -import { scanForFiles } from '../src/utils/fsUtil.js'; +import { isSafeRelativeName, resolvePathWithinRoot, scanForFiles } from '../src/utils/fsUtil.js'; import { fileURLToPath } from 'url'; const __dirname = path.dirname(fileURLToPath(import.meta.url)); @@ -90,6 +90,26 @@ describe('util', function() { }); }); + describe('file path helpers', function() { + + it('detects safe relative names', function() { + expect(isSafeRelativeName('stream-orders')).to.be(true); + expect(isSafeRelativeName('../orders')).to.be(false); + }); + + it('resolves paths within a root directory', function() { + const root = path.join(testDir, 'root'); + const resolved = resolvePathWithinRoot(root, 'a/b.file'); + expect(resolved.startsWith(path.resolve(root))).to.be(true); + }); + + it('rejects traversal paths outside of root directory', function() { + const root = path.join(testDir, 'root'); + expect(() => resolvePathWithinRoot(root, '../outside.file')).to.throwError(/Invalid relative path/); + }); + + }); + }); });