Event Sourcing from the Ground Up, Part 2: Building It in Kotlin
A from-scratch Kotlin event store with optimistic concurrency, a bank-account decider, replay, and snapshots — about 150 lines, no framework.
Part 1 made the case that events are facts, the log is the truth, and state is a fold. This part turns that into running Kotlin — an event store with optimistic concurrency, a bank-account decider, replay, and snapshots — with no framework. Part 3 adds the read side.
We’re building the example Part 1 promised: a bank account, the entity that CRDTs famously cannot protect because “balance must never go negative” needs coordination. Here, the totally ordered stream is the coordination, and you’ll see exactly where the invariant gets enforced.
The shape we’ll follow is the decider pattern, Jérémie Chassaing’s distillation of event sourcing into three pure pieces (thinkbeforecoding, Functional Event Sourcing Decider):
initialState : S— the state of an entity that has no history yet,decide : (C, S) -> List<E>— business logic: look at a command and the current state, reject or emit events,evolve : (S, E) -> S— pure state transition: given a fact, move the state forward.
Both functions are pure. No I/O, no clock, no database. That makes the domain logic trivially testable — feed in events, assert on decisions — and it keeps persistence entirely out of the domain’s sight.
1. Events, commands, state
Kotlin’s sealed hierarchies are almost unfairly good at this. Events are past-tense facts; commands are imperative requests; state is a small sealed type whose variants make illegal states unrepresentable:
sealed interface AccountEvent {
data class Opened(val owner: String) : AccountEvent
data class Deposited(val amountCents: Long) : AccountEvent
data class Withdrawn(val amountCents: Long) : AccountEvent
data class Closed(val reason: String) : AccountEvent
}
sealed interface AccountCommand {
data class Open(val owner: String) : AccountCommand
data class Deposit(val amountCents: Long) : AccountCommand
data class Withdraw(val amountCents: Long) : AccountCommand
data class Close(val reason: String) : AccountCommand
}
sealed interface Account {
data object NotOpened : Account
data class Active(val owner: String, val balanceCents: Long) : Account
data object Closed : Account
}
Two deliberate choices. Money is Long cents, because replaying history through floating point is how you end up explaining a 0.30000000000000004 balance to an auditor. And Account.NotOpened is a real state, not a null — “this stream has no history” is something decide needs to reason about explicitly.
2. The decider
interface Decider<C, S, E> {
val initialState: S
fun decide(command: C, state: S): List<E>
fun evolve(state: S, event: E): S
}
object AccountDecider : Decider<AccountCommand, Account, AccountEvent> {
override val initialState: Account = Account.NotOpened
override fun decide(command: AccountCommand, state: Account): List<AccountEvent> =
when (command) {
is AccountCommand.Open -> when (state) {
is Account.NotOpened -> listOf(AccountEvent.Opened(command.owner))
else -> error("account already exists")
}
is AccountCommand.Deposit -> {
require(command.amountCents > 0) { "deposit must be positive" }
when (state) {
is Account.Active -> listOf(AccountEvent.Deposited(command.amountCents))
else -> error("no active account")
}
}
is AccountCommand.Withdraw -> {
require(command.amountCents > 0) { "withdrawal must be positive" }
when (state) {
is Account.Active ->
if (state.balanceCents >= command.amountCents)
listOf(AccountEvent.Withdrawn(command.amountCents))
else error("insufficient funds") // <- the invariant
else -> error("no active account")
}
}
is AccountCommand.Close -> when (state) {
is Account.Active ->
if (state.balanceCents == 0L) listOf(AccountEvent.Closed(command.reason))
else error("balance must be zero to close")
else -> error("no active account")
}
}
override fun evolve(state: Account, event: AccountEvent): Account =
when (event) {
is AccountEvent.Opened -> Account.Active(event.owner, balanceCents = 0)
is AccountEvent.Deposited -> (state as Account.Active)
.copy(balanceCents = state.balanceCents + event.amountCents)
is AccountEvent.Withdrawn -> (state as Account.Active)
.copy(balanceCents = state.balanceCents - event.amountCents)
is AccountEvent.Closed -> Account.Closed
}
}
Note the asymmetry, because it’s the heart of the pattern. decide is full of rejections — it’s the bouncer. evolve validates nothing. By the time evolve runs, the event is history; refusing to apply a fact that already happened is not an option. If evolve ever wants to say “no,” your validation is in the wrong function. (This is the same discipline as merge in a CRDT: it must accept whatever arrives. The difference is that here the bouncer exists at all.)
Both when expressions are exhaustive over sealed types: add a new event and the compiler walks you to every fold that must handle it. In a pattern where events live forever, that compile-time checklist is worth a lot.
3. The event store
Three operations, as promised in Part 1 — append with a version check, read one stream, read everything:
import java.time.Instant
data class StoredEvent(
val globalPosition: Long, // ordering across ALL streams (Part 3 lives on this)
val streamId: String,
val streamVersion: Long, // 0-based position within the stream
val payload: Any,
val recordedAt: Instant,
)
class ConcurrencyConflict(streamId: String, expected: Long, actual: Long) :
RuntimeException("stream $streamId is at version $actual, expected $expected")
interface EventStore {
/** Appends iff the stream's last version equals [expectedVersion] (-1 = empty). */
fun append(streamId: String, expectedVersion: Long, events: List<Any>): Long
fun readStream(streamId: String, fromVersion: Long = 0): List<StoredEvent>
fun readAll(afterPosition: Long = -1): List<StoredEvent>
}
class InMemoryEventStore : EventStore {
private val log = ArrayList<StoredEvent>()
private val versions = HashMap<String, Long>() // streamId -> last version
@Synchronized
override fun append(streamId: String, expectedVersion: Long, events: List<Any>): Long {
val actual = versions.getOrDefault(streamId, -1L)
if (actual != expectedVersion) throw ConcurrencyConflict(streamId, expectedVersion, actual)
var version = actual
val now = Instant.now()
for (event in events) {
version += 1
log += StoredEvent(log.size.toLong(), streamId, version, event, now)
}
versions[streamId] = version
return version
}
@Synchronized
override fun readStream(streamId: String, fromVersion: Long): List<StoredEvent> =
log.filter { it.streamId == streamId && it.streamVersion >= fromVersion }
@Synchronized
override fun readAll(afterPosition: Long): List<StoredEvent> =
log.filter { it.globalPosition > afterPosition }
}
An ArrayList and a HashMap — deliberately boring, because the contract is what matters. A production store (KurrentDB, Postgres with the right schema, Axon Server) implements this same contract with durability and real indexes; we’ll tour them in Part 4. The linear scan in readStream would be an indexed lookup there, and that index is precisely what a message broker doesn’t give you — foreshadowing Part 4’s Kafka discussion.
Optimistic concurrency is the whole ballgame
expectedVersion is what turns an append-only log into a consistency mechanism. The handler reads a stream at version n, decides, and appends with expectedVersion = n. If another writer slipped in a withdrawal meanwhile, the version no longer matches, the append throws, and the loser retries against fresh state — at which point decide re-checks the balance and may now refuse. This check is what enforces the invariant under concurrency. Without it, two simultaneous withdrawals of 80 from a balance of 100 both pass validation and history records a negative balance with a straight face. With it, lost updates are structurally impossible rather than merely unlikely.
4. The command handler: read → fold → decide → append
class CommandHandler<C, S, E : Any>(
private val store: EventStore,
private val decider: Decider<C, S, E>,
private val maxRetries: Int = 3,
) {
fun handle(streamId: String, command: C): List<E> {
repeat(maxRetries) {
val history = store.readStream(streamId)
var state = decider.initialState
for (stored in history) {
@Suppress("UNCHECKED_CAST")
state = decider.evolve(state, stored.payload as E)
}
val fresh = decider.decide(command, state)
try {
if (fresh.isNotEmpty())
store.append(streamId, expectedVersion = history.size - 1L, events = fresh)
return fresh
} catch (_: ConcurrencyConflict) { /* re-read and retry */ }
}
error("gave up on $streamId after $maxRetries conflicts")
}
}
That loop — read, fold, decide, append-or-retry — is the entire write path of every event-sourced system you will ever meet, whatever the framework hides it behind. Using it:
fun main() {
val store = InMemoryEventStore()
val accounts = CommandHandler(store, AccountDecider)
accounts.handle("account-42", AccountCommand.Open("Mustafa"))
accounts.handle("account-42", AccountCommand.Deposit(10_000))
accounts.handle("account-42", AccountCommand.Withdraw(2_500))
runCatching { accounts.handle("account-42", AccountCommand.Withdraw(50_000)) }
.onFailure { println(it.message) } // insufficient funds
val state = store.readStream("account-42")
.fold(AccountDecider.initialState) { s, e -> AccountDecider.evolve(s, e.payload as AccountEvent) }
println(state) // Active(owner=Mustafa, balanceCents=7500)
}
The rejected withdrawal left no trace in the log — commands fail, events don’t. (If your domain wants to remember rejections, model WithdrawalDeclined as a real event; overdraft-fee revenue is built on exactly that.)
5. Snapshots: a cache, not a truth
Folding 12 events is instant. Folding the 480,000 events of a hot account on every command is not. The standard fix is a snapshot: every N events, persist the folded state plus the version it covers, then on load, start from the snapshot and replay only the tail.
data class Snapshot<S>(val state: S, val streamVersion: Long)
class SnapshotStore<S> {
private val byStream = HashMap<String, Snapshot<S>>()
fun load(streamId: String): Snapshot<S>? = byStream[streamId]
fun save(streamId: String, snapshot: Snapshot<S>) { byStream[streamId] = snapshot }
}
fun <S, E : Any> loadState(
store: EventStore, snapshots: SnapshotStore<S>, decider: Decider<*, S, E>, streamId: String,
): Pair<S, Long> {
val snap = snapshots.load(streamId)
var state = snap?.state ?: decider.initialState
var version = snap?.streamVersion ?: -1L
for (stored in store.readStream(streamId, fromVersion = version + 1)) {
@Suppress("UNCHECKED_CAST")
state = decider.evolve(state, stored.payload as E)
version = stored.streamVersion
}
return state to version
}
Two rules keep snapshots harmless. They are disposable: you must be able to delete every snapshot and rebuild from events alone — they’re a cache of the fold, never a second source of truth. And they are versioned by state shape: change what Account looks like and old snapshots are garbage; throw them away and re-fold. Most systems need snapshots far later than they think, and some never do — measure your actual stream lengths before adding the moving part.
What we have, and what’s missing
About 150 lines bought us: an append-only event store, optimistic concurrency, a pure and exhaustively-checked domain model, replay, and snapshots. The bank-balance invariant — the one a CRDT cannot give you — is enforced in one visible place by the conjunction of decide and expectedVersion.
What’s missing is everything around the question “what’s the total balance across all accounts?” — queries that cut across streams. Answering those by folding every stream on every request would be absurd. That’s what projections are for, and they’re Part 3.