Skip to content
35 changes: 35 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,41 @@ Asynchronously scan all consumer state files and return their identifiers.

---

#### `eventstore.getProjection(name, [handlers], [initialState], [matcher])`

```javascript
eventstore.getProjection(name [, handlers] [, initialState] [, matcher]) → Projection
```

Create a `Projection` with EventStore defaults (`typeAccessor`, storage HMAC), or restore a previously persisted one when `handlers` is omitted.

- `handlers`: reducer function `(state, event) => state` or map `{ [eventType]: reducer }`
- `initialState`: initial projection state (default `{}`)
- `matcher`: optional object/function matcher (same shape as stream/query matchers)


---

#### `consumer.project(projection)`

```javascript
consumer.project(projection)
```

Attach a projection-like object (`apply(state, event)`) as the consumer `'data'` handler.

---

#### `projection.subscribe(consumer)`

```javascript
projection.subscribe(consumer)
```

Attach this projection to the consumer (same wiring behavior as `consumer.project(projection)`) and persist its definition next to the consumer state file so `eventstore.getConsumer(...)` can restore and reconnect it automatically.

---

### Events emitted

| Event | Payload | Description |
Expand Down
30 changes: 30 additions & 0 deletions docs/consumers.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ consumer.setState({ count: 0 });
consumer.setState((state) => ({ ...state, count: state.count + 1 }));
```

## Projections

Use a `Projection` to define *how* events are projected into state, then connect it to a `Consumer` for durable continuous updates.

```javascript
const projection = eventstore.getProjection('orders-total', {
OrderCreated: (state, event) => ({ ...state, total: state.total + (event.payload.amount || 0) })
}, { total: 0 });

const consumer = eventstore.getConsumer('orders', 'orders-projection', projection.initialState);
projection.subscribe(consumer);
```

Projections are composable via `CompositeProjection`:

```javascript
import { CompositeProjection, Projection } from 'event-storage';

const count = new Projection('count', {
initialState: 0,
handlers: { OrderCreated: (state) => state + 1 }
});

const overview = new CompositeProjection('overview', {
Comment thread
albe marked this conversation as resolved.
count,
last: { initialState: null, handlers: { OrderCreated: (state, event) => event.payload } }
});
// overview.state -> { count: number, last: object|null }
```
Comment thread
albe marked this conversation as resolved.

## Resetting a Consumer

Force the consumer to reprocess events from a given position:
Expand Down
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export { default as EventStream } from './src/EventStream.js';
export { default as Storage, StorageLockedError } from './src/Storage.js';
export { default as Index } from './src/Index.js';
export { default as Consumer } from './src/Consumer.js';
export { default as Projection, CompositeProjection } from './src/Projection.js';
export { matches, buildRawBufferMatcher } from './src/utils/metadataUtil.js';
65 changes: 42 additions & 23 deletions src/Consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,11 @@ import stream from 'stream';
import fs from 'fs';
import path from 'path';
import { assert } from './utils/util.js';
import { ensureDirectory } from './utils/fsUtil.js';
import { ensureDirectory, isSafeRelativeName, resolvePathWithinRoot, safeUnlink, writeFileAtomic } from './utils/fsUtil.js';
import { normalizeConsumerStateArgs } from './utils/apiHelpers.js';
import Storage from './Storage/ReadableStorage.js';
const MAX_CATCHUP_BATCH = 10;

/**
* Safely unlink a file and ignore if it doesn't exist.
* @param {string} filename
*/
const safeUnlink = (filename) => {
/* c8 ignore next */
try {
fs.unlinkSync(filename);
} catch (e) {
if (e.code !== "ENOENT") {
throw e;
}
}
};

/**
* Implements an event-driven durable Consumer that provides at-least-once delivery semantics or exactly-once processing semantics if only using setState().
*/
Expand Down Expand Up @@ -54,11 +39,15 @@ class Consumer extends stream.Readable {
* @param {string} identifier The unique name to identify this consumer.
*/
initializeStorage(storage, indexName, identifier) {
assert(indexName === '_all' || isSafeRelativeName(indexName), `Invalid index name "${indexName}" for consumer.`);
assert(isSafeRelativeName(identifier), `Invalid identifier "${identifier}" for consumer.`);
this.storage = storage;
this.index = this.storage.openIndex(indexName);
this.indexName = indexName;
this.identifier = identifier;
const consumerDirectory = path.join(this.storage.indexDirectory, 'consumers');
this.fileName = path.join(consumerDirectory, this.storage.storageFile + '.' + indexName + '.' + identifier);
this.fileName = resolvePathWithinRoot(consumerDirectory, `${this.storage.storageFile}.${indexName}.${identifier}`);
ensureDirectory(path.dirname(this.fileName));
if (ensureDirectory(consumerDirectory)) {
this.cleanUpFailedWrites();
}
Expand All @@ -69,11 +58,15 @@ class Consumer extends stream.Readable {
* @private
*/
cleanUpFailedWrites() {
const consumerNamePrefix = path.basename(this.fileName) + '.';
const consumerBaseName = path.basename(this.fileName);
Comment thread
albe marked this conversation as resolved.
const consumerDirectory = path.dirname(this.fileName);
const files = fs.readdirSync(consumerDirectory);
for (let file of files) {
if (file.startsWith(consumerNamePrefix)) {
if (!file.startsWith(consumerBaseName + '.')) {
continue;
}
const suffix = file.slice(consumerBaseName.length + 1);
if (/^\d+$/.test(suffix)) {
safeUnlink(path.join(consumerDirectory, file));
}
}
Expand Down Expand Up @@ -104,6 +97,32 @@ class Consumer extends stream.Readable {
this.consuming = false;
}

/**
* Register a projection as `data` event handler.
* @api
* @param {{ apply: function(object, object): object }} projection
*/
project(projection) {
assert(projection && typeof projection.apply === 'function', 'Projection must implement apply(state, event).');
const projectionFileName = this.fileName ? `${this.fileName}.projection` : null;
const isAlreadySubscribed = this.projection === projection;
const isAlreadyPersisted = projectionFileName
&& projection.fileName === projectionFileName
&& fs.existsSync(projectionFileName);
if (this.projectionHandler) {
this.removeListener('data', this.projectionHandler);
}
this.projection = projection;
this.projectionHandler = (event) => {
this.setState(projection.apply(this.state, event));
};
this.on('data', this.projectionHandler);
if (!isAlreadySubscribed && !isAlreadyPersisted && typeof projection.persist === 'function') {
projection.persist({ fileName: projectionFileName || projection.fileName });
}
return this;
Comment thread
albe marked this conversation as resolved.
}

/**
* Update the state of this consumer transactionally with the position.
* May only be called from within the document handling callback.
Expand All @@ -118,6 +137,9 @@ class Consumer extends stream.Readable {
if (typeof newState === 'function') {
newState = newState(this.state);
}
if (this.state === newState) {
return;
}
this.state = Object.freeze(newState);
this.doPersist = persist;
}
Expand Down Expand Up @@ -173,10 +195,7 @@ class Consumer extends stream.Readable {
throw new Error(`Trying to update consumer ${this.name} concurrently. Keep each single consumer within a single process.`);
}
try {
fs.writeFileSync(tmpFile, consumerData);
// If the write fails (half-way), the consumer state file will not be corrupted
fs.renameSync(tmpFile, this.fileName);
this.emit('persisted', consumerState);
writeFileAtomic(this.fileName, consumerData, { tmpFileName: tmpFile }, () => this.emit('persisted', consumerState));
} catch (e) {
/* c8 ignore next */
safeUnlink(tmpFile);
Expand Down
48 changes: 44 additions & 4 deletions src/EventStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import events from 'events';
import Storage, { ReadOnly as ReadOnlyStorage, LOCK_THROW, LOCK_RECLAIM } from './Storage.js';
import Index from './Index.js';
import Consumer from './Consumer.js';
import Projection from './Projection.js';
import { assert, getPropertyAtPath } from './utils/util.js';
import { ensureDirectory, scanForFiles } from './utils/fsUtil.js';
import { buildTypeMatcherFn } from './utils/metadataUtil.js';
import { ensureDirectory, isSafeRelativeName, scanForFiles } from './utils/fsUtil.js';
import { buildTypeMatcherFn, isPlainObject } from './utils/metadataUtil.js';
import { fixCommitArgumentTypes, parseStreamFromIndexName, normalizePredicateRaw } from './utils/apiHelpers.js';

const ExpectedVersion = {
Expand All @@ -20,7 +21,6 @@ const ExpectedVersion = {
* Default matcher property paths mirroring the Storage default, used for index optimization.
*/
const DEFAULT_MATCHER_PROPERTIES = ['stream', 'payload.type'];
const STREAM_NAME_PATTERN = /^[A-Za-z0-9][A-Za-z0-9_]*(?:[\/:@~+=\-#.][A-Za-z0-9_]+)*$/;
const STORAGE_HOOK_EVENTS = new Set(['preCommit', 'preRead']);

class OptimisticConcurrencyError extends Error {}
Expand Down Expand Up @@ -120,6 +120,9 @@ class EventStore extends events.EventEmitter {
}
}

this.projectionTypeAccessor = this.typeAccessor
? (event) => this.typeAccessor(event?.payload || event)
: undefined;
this.initialize(storeName, storageConfig);
}

Expand Down Expand Up @@ -405,7 +408,7 @@ class EventStore extends events.EventEmitter {
return null;
}
assert(typeof type === 'string', 'typeAccessor must return a string.');
assert(STREAM_NAME_PATTERN.test(type), `typeAccessor must return a valid stream name. Got: "${type}"`);
assert(isSafeRelativeName(type), `typeAccessor must return a valid stream name. Got: "${type}"`);
return type;
}

Expand Down Expand Up @@ -777,11 +780,44 @@ class EventStore extends events.EventEmitter {
existingConsumer.stop();
}
const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since);
const consumerProjectionFileName = `${consumer.fileName}.projection`;
if (fs.existsSync(consumerProjectionFileName)) {
Projection.restoreFromFile(consumerProjectionFileName, {
hmac: this.storage.hmac,
typeAccessor: this.projectionTypeAccessor
}).subscribe(consumer);
}
consumer.streamName = streamName;
this.consumers.set(identifier, consumer);
return consumer;
}

/**
* Get or create a projection with EventStore defaults.
*
* @param {string} name Projection name.
* @param {function(object, object): object|object} [handlers] Projection handlers (reducer fn or reducer map).
* @param {object} [initialState={}] Projection initial state.
* @param {object|function(object): boolean} [matcher] Optional projection matcher.
* @returns {Projection}
*/
getProjection(name, handlers, initialState = {}, matcher) {
assert(typeof name === 'string' && name !== '', 'Must provide a projection name.');
const projectionFileName = path.join(this.storage.indexDirectory, 'projections', this.storage.storageFile + '.' + name + '.projection');
const projectionOptions = {
fileName: projectionFileName,
hmac: this.storage.hmac,
typeAccessor: this.projectionTypeAccessor
};
if (handlers !== undefined) {
const definition = isProjectionDefinitionObject(handlers)
? handlers
: { handlers, initialState, matcher };
return new Projection(name, definition, projectionOptions);
}
return Projection.restore(name, projectionOptions);
}

/**
* Scan the existing consumers on this EventStore and asynchronously invoke a callback with the parsed list.
*
Expand Down Expand Up @@ -822,6 +858,10 @@ class EventStore extends events.EventEmitter {
}


function isProjectionDefinitionObject(value) {
return isPlainObject(value) && Object.hasOwn(value, 'handlers');
}

EventStore.Storage = Storage;
EventStore.Index = Index;

Expand Down
Loading