Opening the "Redis Stream" black box: a durable log, consumer groups, and flow control. ~12 min.
Since Lesson 1 we've waved at "the Redis Stream" as the thing connecting block-ingest to the indexer. Time to open it. The three binaries are separate processes — they can't call each other's functions. They communicate by passing blocks through a durable, ordered log in Redis. Understanding this backbone explains how the system stays correct when a consumer is slow, restarts, or dies mid-block — which is the deep part you came for.
One instance per chain. Its job: turn the RPC into an ordered stream of RawBlock payloads.
The interesting engineering is all about not being slow or wasteful:
| Technique | What & why | Code |
|---|---|---|
| Round-robin RPC | Comma-separated RPC_URL endpoints, rotated per call to spread load / dodge rate limits. | feed.next() |
| Prefetch concurrency | Fetch CATCHUP_BATCH_SIZE blocks ahead in parallel during catch-up — RPC latency hidden by pipelining. | WithPrefetch(n) |
| RPC cache | Every RPC response cached in Redis, 7-day TTL. Rewinds & restarts cost zero RPC calls. | WithCache(NewRPCCache(rdb, 7d)) |
| Finality lag | Stay N blocks behind head to avoid ingesting blocks that may get reorged away. | WithFinalityLag(n) |
The feed hands blocks to the publisher over a channel — Go's typed, thread-safe pipe between concurrent goroutines. It's buffered to the prefetch size:
// pkg/feed/rpc.go — Subscribe ch := make(chan *types.RawBlock, prefetch) // buffered channel, capacity = prefetch // producer goroutine: ch <- block (send; blocks if buffer full) // consumer side: block := <-ch (receive; blocks if buffer empty)
That "blocks if buffer full" is not a bug — it's flow control. If nobody drains the channel, the fetcher naturally pauses. Hold that thought; it connects to backpressure below.
Source: pkg/feed/rpc.go (RPCFeed, Subscribe, next, WithPrefetch/WithCache/WithFinalityLag), cmd/block-ingest/main.go.
The producer appends each block to a Redis Stream keyed blocks:{chainID} with
XADD. The critical mental model: a Redis Stream is an append-only, persistent, ordered log
— not fire-and-forget pub/sub.
// pkg/queue/queue.go — Publish q.rdb.XAdd(ctx, &redis.XAddArgs{ Stream: "blocks:1", // the durable log for chain 1 MaxLen: q.maxLen, Approx: true, // soft cap (defence-in-depth trim) Values: ..., // the serialized RawBlock })
The indexer reads with XREADGROUP as part of a consumer group. A consumer group is
Redis's mechanism for "a team of workers sharing a stream, where each entry is handed to exactly one member
and the group remembers what's been delivered and acknowledged."
// pkg/queue/queue.go q.rdb.XGroupCreateMkStream(ctx, streamKey, group, startID) // once: create group (+ stream) q.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: group, Consumer: name, Streams: []string{streamKey, ">"}, // ">" = give me entries never delivered to this group Count: batch, })
Three properties fall out of this, and they're the heart of the lesson:
An entry delivered to a consumer enters that consumer's PEL (Pending Entries List) — "delivered
but not yet acknowledged." The indexer only calls XACK after it has committed the block to
Memgraph. So the cycle is:
If the indexer crashes between steps 1 and 3, the entry stays in the PEL — unacked. It will be redelivered. That means a block can be processed more than once: at-least-once, not exactly-once.
MERGE + the monotonic updated_block guard make re-processing a
block produce a byte-identical graph. The atomic cursor commit means a crash rolls back cleanly and the
redelivered block re-applies harmlessly. Streaming chose at-least-once; the write path chose idempotency;
together they're crash-safe. That's one design, two halves.What if a consumer doesn't just pause but dies with entries still in its PEL? Another consumer
reclaims them with XAUTOCLAIM, so no block is stranded:
q.rdb.XAutoClaim(ctx, ...) // take over entries idle in a dead consumer's PEL
blocks:1 stream: 99 blocks stuck across two dead indexer
pods, had to be [reclaimed]." When pods die mid-block, their PEL entries are orphaned until
XAUTOCLAIM picks them up. This is exactly the failure mode the reclaim path exists for.If block-ingest raced ahead unbounded, it would pile up unread blocks in Redis (memory!) or trigger
MAXLEN trimming that silently drops unread blocks. So Publish samples the consumer
backlog and blocks before XADD when the consumer is too far behind:
// pkg/queue/queue.go — backpressure // backlog = max over consumer groups of (lag + pending) if backlog >= threshold { awaitBackpressure(ctx) } // pause producer until it drains
lag + pending, not XLEN (the raw stream length). Why?
XACK doesn't shrink XLEN — only XADD-with-MAXLEN trims it. If you
backpressured on XLEN, then once the producer pauses, XADD stops → MAXLEN never
fires → XLEN stays high forever → permanent deadlock. Sampling lag+pending means
an ACKing consumer immediately reduces the number the producer is watching, so it can resume. This is the
kind of bug that only shows up in production — and the code was written to avoid it.Now the buffered channel from §1 clicks into place. SubscribeToStream calls Publish
synchronously, so a blocked Publish (waiting on backpressure) stops draining the prefetch
channel → the channel buffer fills → the RPC fetcher's ch <- block blocks → fetching pauses.
One stalled consumer applies smooth back-pressure all the way back to the RPC calls. No config, no
coordination — just blocking primitives composing.
Recall from Lesson 1 there are two Redis instances. Keep them straight:
| Redis | Holds | Persistence |
|---|---|---|
redis-cache (:6380) | The block stream (blocks:{chainID}) + the RPC cache | Persistent (you don't want to refetch blocks) |
redis (:6379) | Live state: cursors, balances, the monitored set | Self-healable from Memgraph if lost |
XACK. What happens to that block?lag + pending instead of XLEN?Publish (waiting on backpressure) eventually pauses the RPC fetcher. By what mechanism?XAUTOCLAIM).lag+pending vs XLEN deadlock — and how it propagates back to RPC.Grounded in: pkg/queue/queue.go (XAdd/XReadGroup/XAck/XAutoClaim, backpressure, the blocks:1 war story),
pkg/feed/rpc.go (RPCFeed, Subscribe, prefetch channel, cache, round-robin, finality lag),
cmd/block-ingest/main.go (producer wiring). Verify against source — the code is the truth.