← Back to blog

Event Sourcing from the Ground Up, Part 3: CQRS and Projections

Splitting reads from writes: projections as folds over the log, checkpoints and rebuilds, living with eventual consistency, and integration events.

#event-sourcing#kotlin#architecture#distributed-systems

Part 2 built the write side: an event store, a decider, optimistic concurrency. But it left a dangling question — how do you ask “what’s the total balance across all accounts?” without folding every stream on every request? This part answers it with CQRS and projections, continuing the same Kotlin code. Part 4 covers what happens after year one.


The query problem

Our event store from Part 2 is superb at exactly one read: “give me the history of stream X, in order.” That’s all the write side ever needs — load one account, check one invariant, append. But real applications ask different questions entirely: every account whose balance is above a threshold, the ten most active accounts this week, total deposits this month. Against a log, every one of those is a full scan and a re-fold.

Notice that the write side and the read side want opposite data shapes. Writes want history grouped by entity, so invariants can be checked. Reads want current values, indexed by whatever the screen sorts on. Forcing one model to serve both is how CRUD systems end up with thirty-column tables that are simultaneously bad at validation and bad at queries.

CQRS: two models, one truth

Command Query Responsibility Segregation says: stop forcing it. Keep one model for processing commands and a different model — or several — for answering queries (Fowler, CQRS; the term comes from Greg Young, as a object-level split of Bertrand Meyer’s command/query separation). CQRS does not require event sourcing, and event sourcing does not require CQRS — but they fit unusually well, because an event log is the ideal feedstock for building any number of read models (Azure Architecture Center, CQRS pattern).

The shape of the combined system:

Commands flow through the decider into the event store; projections fold the log into read models; queries hit the read models

The write side is Part 2, unchanged. The new machinery is on the right: projections subscribe to the log and fold it into read models — plain tables, documents, caches, search indexes — and queries never touch the event store at all.

One thing CQRS does not mean: two databases, microservices, or a message bus. Minimal CQRS is two code paths in one process over one database. Resist the enterprise blog posts; the pattern is about model separation, not infrastructure.

Projections are folds too

Here’s the pleasing symmetry. In Part 2, state was events.fold(initialState, ::evolve) over one stream. A projection is the same fold over all streams, into whatever shape a query wants. Same idea, different scope:

interface Projection {
    fun apply(stored: StoredEvent)
}

/** Read model: current balance per account — a screenful, not a history. */
class BalancePerAccount : Projection {
    private val balances = HashMap<String, Long>()

    override fun apply(stored: StoredEvent) {
        when (val e = stored.payload) {
            is AccountEvent.Opened    -> balances[stored.streamId] = 0L
            is AccountEvent.Deposited -> balances.merge(stored.streamId, e.amountCents, Long::plus)
            is AccountEvent.Withdrawn -> balances.merge(stored.streamId, -e.amountCents, Long::plus)
            is AccountEvent.Closed    -> balances.remove(stored.streamId)
        }
    }

    fun balanceOf(accountId: String): Long? = balances[accountId]
    fun richest(n: Int): List<Pair<String, Long>> =
        balances.entries.sortedByDescending { it.value }.take(n).map { it.key to it.value }
}

/** A second read model from the SAME log — added later, backfilled by replay. */
class DailyDepositTotals : Projection {
    private val totals = sortedMapOf<java.time.LocalDate, Long>()

    override fun apply(stored: StoredEvent) {
        val e = stored.payload
        if (e is AccountEvent.Deposited) {
            val day = java.time.LocalDate.ofInstant(stored.recordedAt, java.time.ZoneOffset.UTC)
            totals.merge(day, e.amountCents, Long::plus)
        }
    }

    fun total(day: java.time.LocalDate): Long = totals[day] ?: 0L
}

DailyDepositTotals is the payoff Part 1 promised: nobody thought to track deposits-per-day at design time, but because the log kept everything, the new read model backfills itself from history the moment it’s deployed. In a CRUD system this report starts collecting data the day you ship it; here it starts in the past.

The projection runner and its checkpoint

Something has to feed events to projections and remember how far each one has gotten. That cursor — the checkpoint — is the global position from Part 2’s readAll, and it’s what makes catch-up, restart, and rebuild all the same operation:

class ProjectionRunner(
    private val store: EventStore,
    private val projections: List<Projection>,
) {
    private var checkpoint = -1L

    /** Pull-based catch-up; production systems run this on a loop or a live subscription. */
    fun catchUp() {
        for (stored in store.readAll(afterPosition = checkpoint)) {
            projections.forEach { it.apply(stored) }
            checkpoint = stored.globalPosition
        }
    }
}

fun main() {
    val store = InMemoryEventStore()
    val accounts = CommandHandler(store, AccountDecider)
    val balances = BalancePerAccount()
    val runner = ProjectionRunner(store, listOf(balances))

    accounts.handle("account-42", AccountCommand.Open("Mustafa"))
    accounts.handle("account-42", AccountCommand.Deposit(10_000))
    accounts.handle("account-7", AccountCommand.Open("Aya"))
    accounts.handle("account-7", AccountCommand.Deposit(25_000))

    runner.catchUp()
    println(balances.richest(2))   // [(account-7, 25000), (account-42, 10000)]
}

Real stores hand you this loop as a catch-up subscription: replay from your checkpoint, then keep receiving live events. Two operational rules fall out of the checkpoint design:

Idempotency or atomicity — pick one. Delivery to a projection is effectively at-least-once: a crash between “applied event” and “saved checkpoint” means the event is applied again on restart. Either make apply idempotent, or commit the read-model update and the checkpoint in the same transaction. (Readers of the CRDT series will recognize this as the same redelivery problem state-based merge absorbed with idempotence — here you must engineer it back in by hand.)

Rebuild is just checkpoint = -1. Projection logic wrong? Schema changed? Drop the read model, reset the checkpoint, replay. Rebuilds are routine maintenance in event-sourced systems — and at scale they take time, so plan for running old and new versions of a projection side by side and cutting over when the new one catches up.

Eventual consistency, now in your UI

The price of all this: a projection is always slightly behind the log. A user submits a deposit, the command succeeds, the page re-renders from a read model that hasn’t applied the event yet — and their money is missing. Nothing is wrong; the read is just stale. But users don’t grade you on consistency models.

The standard mitigations, roughly in order of preference:

  1. Respond from the write side. The command handler already returns the new events (ours does, see Part 2) — it knows the new balance. Render the confirmation from that, no read model involved.
  2. Wait for the checkpoint. The append returned a global position; have the query side wait until the projection’s checkpoint passes it before answering that one user.
  3. Design the UX for it. “Your transfer is being processed.” Often the most honest option — the bank’s actual settlement is eventually consistent too.

What you should not do is bolt synchronous projection updates onto every command “to be safe” — you’ve reinvented the single overloaded model, now with extra moving parts. If a piece of data truly must be read-your-own-write consistent, that’s a signal it belongs on the write side, inside a stream’s boundary.

Leaving the building: integration events

Projections consume the log inside the system. The moment another service wants your events, resist pointing it at your event store. Internal events are your domain’s private vocabulary, free to change at every refactor (Part 4 is about how often that happens). Publish deliberate, stable integration events instead — a translated, smaller contract — typically via an outbox so that “persisted” and “published” can’t disagree. This is where a message broker enters an event-sourced architecture, as the transport for integration events — not as the event store. Why a broker makes a poor event store is the opening act of Part 4.


References