Distributed Counter (YouTube Likes)
Design a Distributed Counter (YouTube Likes)
1. What is a Distributed Counter?
In the early days of the web, counting was simple: UPDATE posts SET likes = likes + 1 WHERE id = 123. The database handled the locking, and life was good.
Then came “Gangnam Style”. In 2012, it became the first video to hit 1 Billion views, breaking YouTube’s 32-bit integer limit (2,147,483,647). But the bigger problem was Velocity. When a viral event occurs, millions of users click “Like” simultaneously.
A standard relational database (Postgres/MySQL) can handle ~500-1000 concurrent writes to a single row before lock contention brings the system to a halt. This chapter explores how to scale from 100 to 1 Million writes per second using Sharded Counters, Redis, and Write-Behind Caching.
[!TIP] Real-World Examples:
- YouTube: View counts, Likes.
- Twitter: Tweet likes, Retweets.
- Facebook: Post reactions.
2. Requirements & Goals
2.1 Functional Requirements
- Ingest: The system must accept “Like”, “View”, or “Upvote” events immediately.
- Query: Users must see the current count (approximate or exact).
- Deduplication: A user can only like a video once (Idempotency).
- Persistence: Counts must not be lost if the cache crashes.
2.2 Non-Functional Requirements
- Low Latency: Write operations must return < 10ms (99th percentile).
- High Availability: 99.99%. It is better to show an approximate count than to fail the write (AP over CP).
- Scalability: Handle 1 Million interactions/sec on a single object (Hot Key).
- Eventual Consistency: It is acceptable if the viewer count lags by a few seconds.
2.3 Extended Requirements
- Auditability: For “Ad Views” (which equate to money), we must have an exact, audit-proof log of every event.
- Bot Detection: We must filter out “fake” views from click farms.
3. Capacity Estimation
Let’s design for a platform like YouTube or Twitter.
3.1 Traffic Analysis
- DAU (Daily Active Users): 500 Million.
- Reads: 50 Views/User/Day = 25 Billion Views/Day.
- Writes (Likes/Interactions): 5 Likes/User/Day = 2.5 Billion Writes/Day.
- Average Write QPS: 2.5 × 109 / 86400 ≈ 29,000 writes/sec.
- Peak QPS: Viral events (e.g., Super Bowl) can spike to 100x the average -> 3 Million writes/sec.
3.2 Storage (The Ledger)
- Counter Store: Tiny (Just a number).
- Relationship Store: Storing “User X liked Video Y” is massive.
UserId(8 bytes) +VideoId(8 bytes) +Timestamp(8 bytes) = 24 bytes.- 2.5B writes/day * 24 bytes ≈ 60 GB/day.
- 5-Year Retention: 60 GB * 365 * 5 ≈ 110 TB.
- Conclusion: We need a sharded NoSQL database (Cassandra/DynamoDB) or a sharded SQL setup.
4. System APIs
We need a simple REST or gRPC API for clients.
| Method | Endpoint | Description |
|---|---|---|
POST |
/v1/likes |
Records a like interaction. Payload: { videoId: "v123", userId: "u456" } |
GET |
/v1/likes/{videoId} |
Returns the current count. Response: { count: 1540032, approx: true } |
DELETE |
/v1/likes/{videoId} |
Removes a like (Unlike). Payload: { userId: "u456" } |
5. Database Design
We need two distinct storage layers: one for the count (Fast) and one for the record (Durable).
5.1 Redis (Hot Storage)
Used for real-time counting.
- Key:
video:{id}:likes-> Value:Integer - Set (Optional):
video:{id}:liked_users->Set<UserId>(For deduplication, if small).
5.2 SQL / NoSQL (Cold Storage)
Used for the durable record of who liked what.
Table: likes
CREATE TABLE likes (
user_id BIGINT,
video_id BIGINT,
created_at TIMESTAMP,
PRIMARY KEY (user_id, video_id) -- Ensures 1 like per user per video
);
6. High-Level Architecture
We use a Write-Behind (Async) architecture to decouple the high-speed writes from the durable storage.
Table
7. Component Design (Deep Dive)
7.1 The Single Row Lock Bottleneck
The core bottleneck is the Single Row Lock. If 10,000 requests try to update key="video:123", Redis (being single-threaded) handles them sequentially. While Redis is fast (~100k ops/sec), network latency and CPU contention will spike when millions of users hit the same key.
7.2 Solution: Counter Sharding
Instead of one key, we split the counter into N keys: video:123:0, video:123:1 … video:123:N-1.
The Write Path (Random Distribution)
When a write comes in, we pick a random shard. This distributes the load evenly across the Redis cluster (if using Redis Cluster) or just reduces contention on a single key.
import random
def increment_likes(video_id, n_shards=100):
shard_id = random.randint(0, n_shards - 1)
key = f"video:{video_id}:{shard_id}"
# INCR is atomic
redis.incr(key)
The Read Path (Scatter-Gather)
To get the total likes, we sum all shards. This is slower than reading one key, but since Reads « Writes during a viral spike, it’s an acceptable trade-off.
def get_total_likes(video_id, n_shards=100):
keys = [f"video:{video_id}:{i}" for i in range(n_shards)]
# MGET executes in a single round-trip (pipelining)
values = redis.mget(keys)
total = sum(int(v) or 0 for v in values)
return total
7.3 Adaptive Sharding
For most videos, N=1 is sufficient. For “Gangnam Style”, we need N=1000.
- Heuristic: Start with
N=10. Monitor Redis CPU. If CPU > 50% on the node holding the key, increaseN. - Discovery: How does the reader know
N? Store metadata:video:123:meta->{shards: 50}.
8. Data Partitioning & Sharding
What if we have a Multi-Region setup (US, EU, ASIA)? If we simply replicate Redis, we get write conflicts.
CRDT (Conflict-free Replicated Data Type):
- G-Counter (Grow-Only Counter): A distributed counter that only increments.
- Logic: Each region maintains its own counter array.
Merge(A, B) = Max(A[i], B[i])(if using vectors) or simplySum(All Regions)if using disjoint sets. - Redis Implementation: Redis Enterprise offers CRDT support natively for active-active replication.
9. Reliability, Caching, & Load Balancing
9.1 Handling “Approximate” Counting
For things like “Unique Views”, we don’t need an exact count. Storing 1 Billion IP addresses takes ~4GB.
- HyperLogLog (HLL): A probabilistic data structure that estimates cardinality with 0.81% error using only ~12KB of memory.
- Use Case: “1.2M Views” (Exact number doesn’t matter).
- Redis Support:
PFADDandPFCOUNTcommands.
9.2 Thundering Herd
If a cache key expires, thousands of readers hit the DB.
- Solution: Probabilistic Early Expiration. If a key expires in 10s, and we are at 9s, flip a coin. If heads, recompute the value early.
10. Interactive Decision Visualizer: Single Lock vs Sharded
Experience the difference between a single contended counter and a sharded approach.
- Single Lock: Requests must wait in line. High latency.
- Sharded: Requests are processed in parallel. High throughput.
Counter Architecture Simulator
Simulate 1,000 concurrent "Like" requests
11. System Walkthrough
Let’s trace a “Like” request from a user in New York.
Scenario: User A likes Video V123
- User Action: User A clicks “Like”. Client sends
POST /likes.{ "videoId": "v123", "userId": "u456", "timestamp": 1698765432 } - API Gateway: Validates Auth Token. Routes to API Cluster.
- Sharding Logic:
- API computes:
shard_id = hash(v123) % 100 = 42. - Target Key:
video:v123:42.
- API computes:
- Redis Write (Hot Path):
- API executes
INCR video:v123:42. - Response:
105(Current shard count). - Total Latency so far: 15ms.
- API executes
- Event Queue (Async Path):
- API pushes message to Kafka Topic
likes-events.{ "event": "LIKE", "video": "v123", "user": "u456", "shard": 42 }
- API pushes message to Kafka Topic
- Persistence:
- Worker pulls batch of 100 events.
- Executes SQL Batch Insert:
INSERT INTO likes (user_id, video_id) VALUES ....
12. Low-Level Optimizations
12.1 Redis Pipelining vs Lua
- Pipelining: We use
MGETto read 100 shards in 1 Round Trip Time (RTT). Without pipelining, reading 100 keys would take 100 RTTs (e.g., 50ms vs 5000ms). - Lua Scripting: Used for Atomic “Check and Set”. E.g., “Only increment if user hasn’t liked yet”.
if redis.call("SISMEMBER", KEYS[2], ARGV[1]) == 0 then redis.call("INCR", KEYS[1]) redis.call("SADD", KEYS[2], ARGV[1]) return 1 else return 0 end
12.2 HyperLogLog Internals
- HLL works by hashing the input (UserID) and counting the leading zeros in the binary representation.
- If we see a hash with 10 leading zeros, it’s likely we’ve seen ~2^10 distinct items.
- We use 16,384 registers (buckets) to average out the variance.
13. Interview Gauntlet
Q1: How do you handle “Unlikes”?
- Answer: Send a decrement event (-1). In the sharded counter, we just
DECRthe random shard. In SQL, weDELETEthe row.
Q2: How do you migrate from N=10 to N=100 shards?
- Answer: We don’t need to migrate data. We just start writing to new keys (11-99). The Reader
MGETjust needs to know to query 0-99.
Q3: What if the API crashes before writing to Kafka?
- Answer: We lose the “Durable Record” (User liked Video), but the Counter in Redis is already updated. The system is eventually consistent. For ads/money, we might need a distributed transaction (Saga), but for Likes, this trade-off is acceptable.
Q4: How does Instagram handle “Private Account” likes?
- Answer: Authorization check first. The “Fanout” service checks if the viewer follows the private account before delivering the notification.
Q5: Can we use DynamoDB Atomic Counter?
- Answer: Yes, but DynamoDB writes are expensive ($). Redis is cheaper for high-frequency updates. We usually buffer in Redis and flush to DynamoDB.
14. Summary: The Whiteboard Strategy
If asked to design this in an interview, draw this 4-Quadrant Layout:
1. Requirements
- Func: Ingest, Query, Dedup.
- Non-Func: High Write Throughput, Low Latency.
- Scale: 1M+ writes/sec.
2. Architecture
|
(Async) -> [Kafka] -> [Worker] -> [SQL]
* Sharding: Key: `video:123:shard_N`.
* Write-Behind: Buffer writes in RAM, flush to Disk later.
3. Data & API
Redis: INCR video:123:5
SQL: (user_id, video_id, timestamp)
4. Deep Dives
- Sharding: Solves single-key contention.
- Approximate: Use HyperLogLog for unique views.
- Consistency: AP system (Eventual Consistency).