Consumer Groups

Reading a stream sequentially with XRANGE is fine for a single client, but what if you have 10,000 events per second? A single consumer will be overwhelmed.

Just like Kafka, Redis Streams supports Consumer Groups to allow multiple clients to cooperate and consume a stream in parallel.

1. How Consumer Groups Work

A Consumer Group guarantees that each message is delivered to only one consumer in the group. This allows you to scale out processing by simply adding more consumers.

  1. Creation: You create a group for a stream (e.g., mygroup for mystream).
  2. Consumption: Consumers ask for “new” messages (>). Redis serves unread messages to them.
  3. Tracking: Redis remembers which message was sent to which consumer in the Pending Entries List (PEL).
  4. Acknowledgment: Once processed, the consumer sends an XACK. Redis removes it from the PEL.

2. Key Commands

Creating a Group (XGROUP)

# Create group 'mygroup' starting from the beginning (0)
XGROUP CREATE mystream mygroup 0 MKSTREAM

Reading Messages (XREADGROUP)

# Read up to 1 new message for consumer 'Alice'
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
  • >: A special ID meaning “give me messages never delivered to other consumers”.

Acknowledging (XACK)

# Tell Redis we are done with message 1623456789123-0
XACK mystream mygroup 1623456789123-0

3. Interactive: Consumer Group Balance

Simulate a high-throughput stream and watch how messages are distributed among consumers.

Stream: "mystream"

⬇️ XREADGROUP
Consumer 1

4. Deep Dive: Pending Entries List (PEL)

The PEL is critical for fault tolerance. It ensures at-least-once delivery.

What happens if a consumer crashes?

  1. Alice reads message ID:100.
  2. Redis adds ID:100 to the PEL associated with Alice.
  3. Alice crashes before sending XACK.
  4. Message ID:100 stays in the PEL forever (it is not lost).
  5. Recovery: Another consumer (Bob) can inspect Alice’s PEL (XPENDING) and “steal” the message (XCLAIM) to process it himself.

[!IMPORTANT] Without XACK, the PEL will grow indefinitely, consuming memory. Always acknowledge processed messages!

5. Code Examples: Consumer

Java

import redis.clients.jedis.Jedis;
import redis.clients.jedis.params.XReadGroupParams;
import redis.clients.jedis.resps.StreamEntry;
import java.util.List;
import java.util.Map;

public class ConsumerGroups {
  public static void main(String[] args) {
    try (Jedis jedis = new Jedis("localhost", 6379)) {
      // Create Group
      try {
        jedis.xgroupCreate("mystream", "mygroup", "$", true);
      } catch (Exception e) {
        // Group might already exist
      }

      while (true) {
        // Read
        Map.Entry<String, redis.clients.jedis.StreamEntryID> streamQuery =
          new java.util.AbstractMap.SimpleEntry<>("mystream", redis.clients.jedis.StreamEntryID.UNRECEIVED_ENTRY);

        List<Map.Entry<String, List<StreamEntry>>> streams = jedis.xreadGroup(
          "mygroup",
          "consumer1",
          XReadGroupParams.xReadGroupParams().count(1).block(0),
          streamQuery
        );

        if (streams != null) {
          for (Map.Entry<String, List<StreamEntry>> stream : streams) {
            for (StreamEntry msg : stream.getValue()) {
              System.out.println("Processing: " + msg.getFields());

              // ACK
              jedis.xack("mystream", "mygroup", msg.getID());
            }
          }
        }
      }
    }
  }
}

Go

package main

import (
    "context"
    "fmt"
    "github.com/redis/go-redis/v9"
)

func main() {
    ctx := context.Background()
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

    // Create Group
    rdb.XGroupCreateMkStream(ctx, "mystream", "mygroup", "$")

    for {
        // Read
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    "mygroup",
            Consumer: "consumer1",
            Streams:  []string{"mystream", ">"},
            Count:    1,
            Block:    0,
        }).Result()

        if err != nil {
            panic(err)
        }

        for _, stream := range streams {
            for _, msg := range stream.Messages {
                fmt.Println("Processing:", msg.Values)

                // ACK
                rdb.XAck(ctx, "mystream", "mygroup", msg.ID)
            }
        }
    }
}

6. Summary

Consumer Groups turn Redis Streams into a robust, distributed work queue. By leveraging the PEL and acknowledgments, you can ensure that no job is ever lost, even if workers fail.