← Back to blog

Idempotency in Practice, Part 3: Idempotent Consumers and Pipelines

The idempotent consumer pattern implemented (processed-message table in the same transaction), dedup identity for events, Kafka's exactly-once machinery taken apart honestly, offset-commit ordering, and the series-capstone retrofit checklist.

#idempotency#kafka#distributed-systems#kotlin

Part 1 established why every delivery guarantee bottoms out in at-least-once plus dedup; Part 2 built the key store that synchronous APIs need. This part covers the asynchronous half — consumers fed by brokers, Kafka’s exactly-once machinery taken apart honestly — and closes the series with the retrofit checklist.


Two problems wearing one name

“Make it idempotent” hides two genuinely different problems, and teams burn time solving the wrong one.

An idempotent API serves a synchronous caller who is owed an answer. Detecting the duplicate is the easy half; the hard half is reproducing the original response — status code, body, the created object’s ID — for a client you don’t control, possibly while the first attempt is still running (hence Part 2’s 409 dance), with a published expiry contract and per-tenant key scoping. The dedup store is part of your public interface.

An idempotent consumer answers to a message broker, not a caller. There is no response to replay — the only obligation is don’t apply the effect twice. The key usually comes for free (a message ID or event ID minted by the producer), redelivery is driven by acks, rebalances, and visibility timeouts rather than impatient humans, and — the structural advantage — the consumer typically writes to its own database, so the transactional claim is available in a way it often isn’t for an API call that must hit a third party mid-request.

Idempotent APIIdempotent consumer
Duplicate sourceClient retries, double-clicksBroker redelivery, rebalances
Key minted byClient (it owns the intent)Producer / broker (message ID)
On duplicateReplay stored responseAck and drop, silently
Concurrent duplicate409 / wait — caller is watchingUsually serialized by partition or row lock
Claim atomic with work?Often impossible (foreign calls mid-request)Usually yes — same local transaction

That last row is why the consumer side, despite its scarier vocabulary, is the easier of the two. Let’s collect the win.

The idempotent consumer, implemented

The Idempotent Consumer pattern is one table and one rule. The table records which messages this consumer has already applied; the rule is that the insert into it happens in the same database transaction as the side effect:

CREATE TABLE processed_messages (
    consumer_id  TEXT        NOT NULL,   -- this service+handler, so consumers don't share fate
    message_id   UUID        NOT NULL,   -- the event's identity (see below)
    processed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (consumer_id, message_id)
);
fun handle(record: ConsumerRecord<String, PaymentCaptured>) {
    val event = record.value()
    val applied = db.tx {
        val claimed = queryOrNull(
            """
            INSERT INTO processed_messages (consumer_id, message_id)
            VALUES (?, ?)
            ON CONFLICT DO NOTHING
            RETURNING message_id
            """,
            CONSUMER_ID, event.eventId,
        )
        if (claimed == null) return@tx false   // duplicate: roll nothing back, apply nothing

        execute(
            """
            UPDATE invoices SET status = 'paid', paid_at = ?
            WHERE id = ? AND status = 'awaiting_payment'
            """,
            event.capturedAt, event.invoiceId,
        )
        true
    }
    if (!applied) log.debug("dropped duplicate {}", event.eventId)
    // fall through either way: the offset gets committed after this returns
}

There is no window in this code, and it’s worth dwelling on why. The claim and the business write commit or vanish together — the database arbitrates concurrent duplicates, which is the thing databases are best at. A redelivery hits ON CONFLICT DO NOTHING, applies nothing, and gets acknowledged like any other message; dropping a duplicate is a success, not an error. Note the belt-and-suspenders WHERE status = 'awaiting_payment' — Part 1’s guarded state transition — which keeps the handler safe even if someone later prunes processed_messages too aggressively. The costs are mundane: one extra write per message, a table that grows until a reaper trims it (you have one of those by now), and a guarantee that stops at the edge of this one database.

What goes in message_id matters more than it looks. The lazy answer is the broker’s coordinates (topic, partition, offset) — wrong, because a re-published or replayed event gets new coordinates and sails past your dedup. The right answer is identity minted where the intent lives, same as Part 1’s argument about clients: a producer-assigned event ID (a UUID stamped at creation) dedups redeliveries of the same event. Stronger still, when your events describe an entity’s history, is aggregate ID + version: “invoice 4127, event 6” is idempotent against redelivery and against the producer accidentally emitting the same logical change twice with fresh UUIDs — the version collides even when the event ID doesn’t. Use the event ID by default; reach for aggregate-plus-version when the producer is event-sourced and versions exist anyway. (The event sourcing series makes the case for emitting versioned events in the first place.)

The trap on this side of the table is the consumer that emits effects — sends an email, calls an API. The transaction above hasn’t eliminated those duplicates, it has re-exported them: at-least-once redelivery becomes at-least-once outbound calls, and the downstream system needs its own idempotency — a propagated key derived from the event ID, exactly like Part 2’s orders-api-${key.id}. The problem composes; it never disappears.

Kafka’s exactly-once, taken apart

Kafka’s exactly-once semantics deserve a precise treatment, because they’re both real engineering and the most widely misread marketing in distributed systems. The machinery, introduced in 0.11, has two layers.

Layer one: the idempotent producer. Every producer session gets a broker-assigned producer ID (PID), and every record batch carries a per-partition sequence number. The broker tracks the last sequences it has persisted per (PID, partition) and discards anything it has already seen — TCP-style dedup, except the sequence numbers live in the replicated log, so they survive broker failover. This kills the classic duplicate where a produce succeeds but the ack is lost and the producer’s retry writes the record twice. Scope matters: the guarantee is per partition and per producer session. A producer that restarts gets a new PID, and the broker has no way to connect its first send to the old session’s last — idempotence protects retries within a session, not your application’s own retry loop around the producer.

Since Kafka 3.0 this layer is on by default: KIP-679 flipped the producer defaults to enable.idempotence=true and acks=all, on the argument that the strongest guarantee costs little (the current producer docs confirm Default: true, with the constraints that max.in.flight.requests.per.connection ≤ 5, retries > 0, and acks=all — set a conflicting value without explicitly enabling idempotence and it quietly turns itself off, which is exactly the kind of footnote worth knowing before an incident review). If you’re on a modern client, plain send() is already dedup’d against broker-side retry duplicates. You were getting at-least-once with duplicates for years; now you get at-least-once where the broker eats its own.

Layer two: transactions. Configure a stable transactional.id (which implies idempotence) and a consume-transform-produce loop can commit atomically: the records produced to output topics and the consumer offsets on the input topic go into one transaction, all-or-nothing. The stable ID is also the answer to the restart hole above — a restarted producer with the same transactional.id resumes its identity, and the broker bumps an epoch so that any zombie still running with the old epoch gets fenced off: its late commits are rejected rather than interleaved. Downstream, consumers set isolation.level=read_committed to see only committed transactions (the default is still read_uncommitted, another footnote with teeth) — at the cost of reading only up to the last stable offset, so an open transaction holds everyone behind it.

That is genuine exactly-once processing — within a perimeter. Kafka in, Kafka out: source topic, transform, output topic, offsets, all inside the transaction, redeliveries and zombies fenced. The fine print is the perimeter itself:

Kafka's exactly-once island: topics, processing, and offset commits inside the transaction; any external database or API is outside the perimeter, back to at-least-once.

The moment your processor calls an external database, a payment API, or an SMTP server, that call is outside the transaction — Kafka cannot roll back an email. If the transaction aborts after the call, the retry repeats it; you are back to at-least-once plus dedup, which is to say back to this series: the processed-messages table above, or Part 2’s key store, sitting at the boundary. Brandur’s framing covers even the reverse direction: producing to Kafka from a non-Kafka context is itself a foreign state mutation, however reliable it feels. Kafka didn’t refute Part 1’s impossibility result. It drew a careful boundary, did rigorous sequence-number dedup inside it, and — to its credit — documented the edge. The marketing just tends to crop the diagram.

Offset commits: choosing your failure mode

Outside transactions — which is most consumers — the duplicate-vs-loss dichotomy from Part 1 reappears in miniature as a single ordering decision: do you commit the offset before or after the side effect?

Two timelines: commit-after the side effect yields duplicates on crash; commit-before yields silent loss.

Commit after the side effect, and a crash between the two means the broker redelivers work you already did: duplicates, which your idempotent handler absorbs. Commit before, and a crash means the broker believes work happened that never did: silent loss, which nothing absorbs. Stated like that, the choice makes itself — commit after, always, and let dedup clean up — but two defaults conspire against you. enable.auto.commit=true (still the Kafka default) commits on a background timer with no knowledge of where your handler is, which means it can commit before your side effect under exactly the wrong crash timing; serious consumers turn it off and commit explicitly after the transaction. And batch-oriented handlers that commit once per poll loop need the whole batch applied first, or the tail of the batch is committed-but-unprocessed. The pattern in the previous section assumed this ordering: transaction commits, then the offset. Duplicates chosen deliberately, then deduplicated mechanically.

(One adjacent seam, deliberately not covered here: how the event gets into the broker atomically with the database write that caused it. That’s the transactional outbox — write the event to an outbox table in the business transaction, relay it to the broker afterwards — and it earns at-least-once publishing with the same honesty this series keeps demanding. It gets a full treatment in an upcoming series of its own; for now, note that its consumer-facing consequence is more redelivery, which the patterns above already absorb.)

The checklist: retrofitting idempotency, end to end

The condensed version of all three parts, for the next time someone says the magic words in a design review:

  1. Classify the operation (Part 1’s taxonomy). Can it be restated as absolute state — PUT, UPSERT, conditional write, guarded state transition? If yes, do that and stop. Natural idempotency needs none of the below.
  2. Decide which problem you have. Synchronous caller owed a response → idempotent API (3–10). Broker-fed consumer → idempotent consumer (11–14). Both → both.
  3. Require a client-generated Idempotency-Key on the mutating endpoint; reject requests without one (400) rather than guessing.
  4. Scope keys per caller — unique on (account_id, key), never globally.
  5. Claim atomically: unique-constraint insert (or ON CONFLICT) in the same transaction as local state changes. Not check-then-insert.
  6. Fingerprint the request and reject the same key with a different payload (422).
  7. Handle the concurrent retry: in-flight keys get 409 (or block); finished keys get the stored response replayed, errors included.
  8. Persist the response — and recovery points before each foreign call, so a crashed request resumes instead of re-executing (Part 2’s state machine).
  9. Pass derived keys to every downstream call that supports them; treat calls that don’t as permanent residual risk and log them as such.
  10. Choose the expiry window deliberately — longer than the slowest plausible retry path — and build the reaper on day one, because the table grows from day one.
  11. Insert into processed_messages in the same transaction as the consumer’s side effect; treat the constraint violation as a successful drop, not an error.
  12. Pick the dedup identity consciously: producer-minted event ID by default, aggregate ID + version when you have it; never broker coordinates.
  13. Commit offsets after the side effect, explicitly — auto-commit off — and accept the duplicates your handler now absorbs by design.
  14. Know your Kafka perimeter: idempotent producer and transactions cover Kafka-in-Kafka-out; every external touch needs its own dedup, and read_committed doesn’t set itself.
  15. Build the reconciliation that catches what slips through, and a metric on dedup hits. A dedup store that never fires is untested, not unnecessary. (And give your retries a budget while you’re at it — unbounded retry is how duplicates become retry storms.)

Fifteen lines, a few background processes, two new tables, and a published contract with every client and every downstream system. That’s the price of nodding in the design review — and it’s worth paying precisely once.

References