Replication: Preventing Data Loss
In a distributed system, hardware failure is guaranteed. Disks fail, networks partition, and servers crash. Kafka guarantees zero data loss through Replication.
Every partition in Kafka has a configured replication.factor (usually 3). This means 3 copies of your data exist on 3 different brokers.
1. Leader vs Follower
Kafka uses a Primary-Backup model (or Leader-Follower).
- The Leader: One broker is elected as the “Leader” for a partition (e.g., Partition 0). All reads and writes go to this leader.
- The Followers: The other 2 replicas are “Followers”. They passively pull data from the Leader to stay up-to-date.
[!NOTE] Client Interaction: By default, clients (Producers and Consumers) only talk to the Leader. Followers are just hot standbys. (Note: Newer Kafka versions allow “Follower Fetching” for consumers to reduce cross-AZ costs, but the Leader still handles all writes).
2. The Protocol: ISR vs Quorum
Why does Kafka use an In-Sync Replica (ISR) model instead of a traditional Quorum (like Raft or Paxos)?
- Quorum (Raft/Paxos): Requires a majority (N/2 + 1) to acknowledge a write. If you have 3 nodes, you need 2 to write. If you lose 2 nodes, the cluster halts (Availability sacrifice).
- Kafka ISR: Kafka dynamically maintains a set of healthy replicas (the ISR). A write is committed only when all members of the ISR acknowledge it.
- Benefit: Kafka can tolerate N-1 failures and still accept writes (as long as the remaining node is in the ISR).
- Trade-off: If all replicas fail, you must choose between waiting for the leader (Consistency) or accepting the first replica to come back (Availability - potential data loss).
3. In-Sync Replicas (ISR)
Not all followers are created equal.
- ISR (In-Sync Replicas): The set of replicas that are fully caught up with the leader (or within a small time window).
- Out-of-Sync: If a follower crashes or falls too far behind (network lag), the Leader kicks it out of the ISR set.
The ISR is critical because only a member of the ISR can be elected as a new Leader if the current Leader fails.
4. Hardware Reality: Network Latency & ACKs
When a producer sends a message, it traverses the network to the Leader. The Leader writes it to the Page Cache (RAM) and sends it to Followers.
The acks setting determines how long the producer waits.
acks=0 (Fire and Forget)
- The producer sends the data and returns immediately. No acknowledgment from the broker.
- Latency: Lowest (Network One-Way).
- Risk: High (Data loss if broker crashes).
acks=1 (Leader Acknowledgment)
- The producer waits for the Leader to write the message to its local disk (page cache).
- Latency: Medium (Network RTT + Leader Disk/RAM Write).
- Risk: Medium (Data loss if Leader crashes before replicating to followers).
acks=all (Strongest Guarantee)
- The producer waits for the Leader AND all current members of the ISR to acknowledge the write.
- Latency: Highest (Network RTT + Slowest Follower RTT).
- Risk: Zero (as long as one replica survives).
[!IMPORTANT] Performance Impact:
acks=alladds latency because you are bound by the slowest link in the ISR chain. If one follower is in a different region with 100ms latency, your write latency will be >100ms.
5. The Safety Valve: min.insync.replicas
Using acks=all isn’t enough if your ISR shrinks to just 1 node (the Leader). In that case, acks=all effectively becomes acks=1.
To prevent this, you set min.insync.replicas (usually 2).
- Rule: If
ISR size < min.insync.replicas, the broker REJECTS writes withNotEnoughReplicasException. - Trade-off (CAP Theorem): By setting
min.insync.replicas, you sacrifice Availability (Kafka will reject writes if nodes are down) to guarantee Consistency (no data loss). This is a classic CP (Consistency/Partition Tolerance) configuration in the CAP Theorem. Without it, you are choosing AP (Availability over Consistency).
6. Interactive: Replication Simulator
Visualize how Kafka replicates data and handles failures. Kill a follower to see how the ISR shrinks.
7. Code: Configuring Durability
Here is how you configure these settings in your Producers and Brokers.
Java
// Java: Producer Configuration for High Durability
import java.util.Properties;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.common.config.ConfigResource;
public class DurabilityExample {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// ACKS=ALL: Wait for Leader + ISR
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Retries: If the write fails (e.g. NotEnoughReplicas), keep trying
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // Ensures exactly-once semantics
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// ... produce messages
}
// ---------------------------------------------------------
// Java: Broker Configuration (AdminClient)
// Setting min.insync.replicas for a specific topic
// ---------------------------------------------------------
try (AdminClient admin = AdminClient.create(props)) {
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, "my-critical-topic");
ConfigEntry entry = new ConfigEntry("min.insync.replicas", "2");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = new HashMap<>();
configs.put(resource, Collections.singletonList(op));
admin.incrementalAlterConfigs(configs).all().get();
}
}
}
Go
// Go: Segmentio/kafka-go Writer Configuration
package main
import (
"context"
"log"
"github.com/segmentio/kafka-go"
)
func main() {
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "my-critical-topic",
Balancer: &kafka.LeastBytes{},
// RequiredAcks:
// 0 = Fire and Forget
// 1 = Leader Only
// -1 = All ISR (Strongest)
RequiredAcks: kafka.RequireAll,
// Retry automatically on failure
MaxAttempts: 10,
}
defer w.Close()
// Producing a critical message
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("payment_id_123"),
Value: []byte("transaction_data"),
},
)
if err != nil {
log.Fatal("failed to write messages:", err)
}
}