Transactions (Exactly-Once)

Idempotence (Chapter 3) guarantees exactly-once writes for a single partition. But what if you need to write to multiple partitions atomically?

🚨 The Dual-Write Problem (Banking Example)

A banking app reads a withdrawal from topic-A, processes it, and writes a deposit to topic-B.

  • Scenario A: If the app crashes after reading but before writing: Money lost.
  • Scenario B: If the app crashes after writing but before committing the offset: Money duplicated (re-processed on restart).

Kafka Transactions solve this by allowing you to bundle multiple writes (and offset commits) into a single atomic unit.

1. The Mechanics of Transactions

Kafka introduces a Transaction Coordinator (a special module inside the broker) and a Transaction Log (internal topic __transaction_state) to manage this state machine.

💡 Analogy: The Escrow Agent
Think of the Transaction Coordinator as an escrow agent in a real estate deal. The buyer's money and the seller's deed are temporarily held in trust (the Transaction Log). Neither party actually receives their item until the agent officially stamps the deal as COMMIT. If any issue arises, the agent stamps ABORT, and it's as if the transaction never happened.

1.1 The Protocol (Two-Phase Commit)

  1. Init: Producer registers with a unique transactional.id.
  2. Begin: Producer starts a transaction.
  3. Produce: Producer sends messages to various partitions. These messages are written to the log but marked as “Uncommitted” (invisible to most consumers).
  4. Commit/Abort:
    • Commit: The Coordinator writes a COMMIT marker to the log. Consumers can now see the data.
    • Abort: The Coordinator writes an ABORT marker. Consumers ignore the data.

1.2 Zombie Fencing (The Power of transactional.id)

Why is transactional.id mandatory? It solves the Zombie Producer problem. If a producer experiences a long network partition, the system might assume it died and spin up a replacement. When the original producer wakes up (a “zombie”), it might try to continue its transaction. Because both producers share the same transactional.id, the Transaction Coordinator tracks an Epoch Number. The new producer gets a higher epoch. When the zombie tries to commit with the older epoch, the Coordinator definitively rejects it with a ProducerFencedException.

1.3 Isolation Levels

Consumers control what they see via isolation.level:

  • read_uncommitted (Default): Sees everything, including aborted transactions and in-flight data.
  • read_committed: Only sees messages from committed transactions. This is required for Exactly-Once processing.

2. Interactive: The Atomic Switch

Simulate a transaction writing to two partitions. See how read_committed consumers ignore the data until the final COMMIT marker drops.

Topic A (Partition 0)

Topic B (Partition 0)

Consumer View (read_committed):
Waiting for committed data...

3. Implementation Guide

To enable transactions, you must configure both the Producer and the Consumer correctly.

Key Configuration:

  • Producer: Must set a transactional.id and initialize transactions.
  • Consumer: Must set isolation.level to read_committed.
// --- Java Example ---

// 1. Producer Setup
Properties prodProps = new Properties();
prodProps.put("bootstrap.servers", "localhost:9092");
prodProps.put("transactional.id", "my-transactional-id"); // Mandatory

Producer<String, String> producer = new KafkaProducer<>(prodProps);
producer.initTransactions(); // Register with coordinator

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic-A", "Alice", "$50"));
    producer.send(new ProducerRecord<>("topic-B", "Bob", "$50"));
    producer.commitTransaction(); // Atomic Commit
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // Fatal errors (e.g. Zombie fenced)
    producer.close();
} catch (KafkaException e) {
    // Transient errors
    producer.abortTransaction();
}

// 2. Consumer Setup
Properties consProps = new Properties();
consProps.put("bootstrap.servers", "localhost:9092");
consProps.put("group.id", "my-group");
// CRITICAL: Only see committed data
consProps.put("isolation.level", "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consProps);
// --- Go Example (Franz-Go) ---
package main

import (
	"context"
	"log"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.TransactionalID("my-go-txn-id"), // Enable Transactions
	)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	ctx := context.Background()

	// 1. Begin Transaction
	if err := client.BeginTransaction(); err != nil {
		log.Fatal(err)
	}

	// 2. Produce Messages
	recordA := &kgo.Record{Topic: "topic-A", Value: []byte("Alice $50")}
	if err := client.ProduceSync(ctx, recordA).FirstErr(); err != nil {
		client.AbortBufferedRecords(ctx) // Rollback
		log.Fatal(err)
	}

	recordB := &kgo.Record{Topic: "topic-B", Value: []byte("Bob $50")}
	if err := client.ProduceSync(ctx, recordB).FirstErr(); err != nil {
		client.AbortBufferedRecords(ctx) // Rollback
		log.Fatal(err)
	}

	// 3. Commit Transaction
	if err := client.EndTransaction(ctx, kgo.TryCommit); err != nil {
		log.Fatal(err)
	}

	// For Consumer, set: kgo.ConsumeIsolationLevel(kgo.ReadCommitted())

	log.Println("Transaction Committed Successfully!")
}

4. Summary

  • Transactions enable atomic writes across multiple partitions.
  • The Transaction Coordinator manages the lifecycle using a Transaction Log.
  • Consumers must use read_committed to ignore aborted or incomplete transactions.
  • This is the foundation of Kafka Streams Exactly-Once Processing.