Extract durable projections into Projection class with minimal Consumer/EventStore wiring#320
Extract durable projections into Projection class with minimal Consumer/EventStore wiring#320Copilot wants to merge 4 commits into
Conversation
|
@copilot I'd rather like to extract the Projection concept into an own class that should only encapsulate the execution, and optional persisting and restoring of the projection definition. It holds the concept of how events are projected into state, as opposed to the consumer that holds the concept of what events should be consumed (through stream and persistent position). Or put differently, a projection is an in-memory model for interpretation of events, while the consumer is a persistent iterator over events. Combining both gives a full read model that can be defined at runtime but stays persistent. Alone, the projection can be used for ad-hoc decision models. Also, a Projection bridges the gap between Consumer and DCB, where not a single stream is the relevant point of Here's some examples of how the usage should look (taking inspiration from https://dcb.events/topics/projections/#basic-functionality) const userCount = new Projection('userCount', {
initialState: 0,
handlers: {
UserCreated: (state, event) => state + 1
UserDeleted: (state, event) => state - 1
},
matcher: {} // an optional stream matcher predicate like in query() and getEventStream()
}, {
typeAccessor: (event) => event.payload?.type
});
// OR
const userCount = eventStore.getProjection('userCount', {
initialState: 0,
handlers: {
UserCreated: (state, event) => state + 1
UserDeleted: (state, event) => state - 1
},
matcher: {} // an optional stream matcher predicate like in query() and getEventStream()
}); // Passes the typeAccessor from event store config
const newStateAfterCreate = userCount.apply(currentState, new UserCreated(...)); // apply a single event
// Ad-hoc usage
const { stream, condition } = eventStore.query(userCount.types, userCount.matcher);
const count = userCount.handle(stream); // reset state to initialState and handle all events in a stream, calling projection.state = projection.apply(projection.state, event) for each
userCount.state; // 4 === count
// t.b.d. what if the userCount has projected some events before and we have the same events here again. There's two options: handle = reset to initialState before processing, making a Projection handle call a real ad-hoc one shot path
// OR Projection remembers last seen global event position and user needs to do eventStore.query(userCount.types, userCount.matcher, userCount.position)
if (count === X) {
eventStore.commit('user-monitoring', new UserLimitReached(), condition);
}
// Connect a projection to a durable consumer to keep a continuous model
const userCountConsumer = eventStore.getConsumer('userCount');
userCountConsumer.project(userCount); // Initializes Projection with initialState and registers the apply method as the consumer 'data' handler like this: on('data', event => consumer.setState(state => projection.apply(state, event)))
userCountConsumer.state; // 4
// Next time the Consumer is fetched via getConsumer('userCount'), the projection is restored and registered automatically
// Alternatively, the other way around:
userCount.subscribe(userCountConsumer); // Will make this projection durable and continue from where the consumer was last persisted
userCount.state; // 4
userCount.persist(hmac); // stores the projection definition, using the name as file name and buildMetadataForMatcher to safely store the handler functions and definition. All handler functions are toString()'d before saving, so the definition object can directly be JSON.stringify'd
// restores the projection definition
const userCount = Projection.restore('userCount', hmac, {
typeAccessor: (event) => event.payload?.type
});
// Projection composition
const userOverview = new CompsiteProjection('userOverview', {
// composition of existing projections
userCount,
lastUser,
...,
// inline projection, maybe not needed
userLoss: {
initialState: {},
handlers: {
UserDeleted: (state, event) => (period) => ({ [period]: (state[period] || 0) + 1 })(YearMonth(event.payload.createdAt))
}
}
}, {
typeAccessor: (event) => event.payload?.type
});
userOverview.handle(stream); // delegates to the respective sub-projection handlers or inline projection handlers
userOverview.types; // combination of all projections' event types
userOverview.state; // { userCount: 4, lastUser: { name: 'John Doe', createdAt: '2026-05-29T14:35:13' }, userLoss: { '2026-05': 1 } } |
Implemented this in commit Projection logic is now extracted into |
| } | ||
| const consumer = new Consumer(this.storage, streamName === '_all' ? '_all' : 'stream-' + streamName, identifier, initialState, since); | ||
| const projectionTypeAccessor = this.typeAccessor | ||
| ? (event) => this.typeAccessor(event?.payload || event) |
There was a problem hiding this comment.
I see we have an ambiguity here between EventStore typeAccessor, which receives the "domain event" (because we need it before persisting a domain event to ensure existence of a Type stream), while the Projection receives "stored events" containing payload and metadata. I think this needs to be cleaned up a bit, because we also have the matcher in the query and EventStream, which acts on pure domain events but receives the storage event metadata as additional parameter. At the consumer, which acts upon the Storage layer, we see the "stored event", which implies some knowledge about the storage schema, that is suboptimal. But since the 'data' event on the Consumer is supposed to be compatible with the nodejs Stream.Readable interface, it receives only one parameter. So the Consumer 'data' event needs to stay low level, but maybe there is a second layer missing, which destructures this into an (payload, metadata) event and then Projection in turn could subscribe to that? It would bypass the autostart of Stream.Readable though.
So for now we stay with this.
This change introduces a dedicated
Projectionconcept and keeps durable projection restore with HMAC verification, while cleanly separating projection logic (how events are interpreted) from consumer iteration/persistence (what is consumed and where).New
ProjectionAPIProjectionclass for projection execution and optional persistence/restore of projection definitions.(state, event) => state{ [eventType]: (state, event) => state }matcher,typeAccessor, andhmac.projection.apply(...),projection.handle(stream),projection.persist(...), and restore helpers.Composable projections
CompositeProjectionto combine projections into a single composed read model.typesand nested composedstate.Consumer ↔ Projection integration
consumer.project(projection)now provides minimal runtime wiring only (attachesprojection.applyas'data'handler).projection.subscribe(consumer)is the durable integration path:EventStore integration
eventstore.getProjection(name[, definition]):typeAccessor, store-level HMAC)definitionis omitted.eventstore.getConsumer(...)to restore and reconnect an existing consumer-side projection sidecar automatically.Projection persistence + trusted restore
buildMetadataForMatcher(...).Consumer file handling hardening
Projection.Docs and behavioral coverage
EventStore -> getProjection(name) -> projection.subscribe(consumer)(orconsumer.project(projection)for non-durable wiring)consumer.createProjection(...)usage and docs.