Lesson 07 · The Streaming Backbone

How blocks travel between the binaries

Opening the "Redis Stream" black box: a durable log, consumer groups, and flow control. ~12 min.

Builds on: Lesson 1 · Lesson 4 Fills your "streaming" gap New: Go channels New: at-least-once + backpressure

Since Lesson 1 we've waved at "the Redis Stream" as the thing connecting block-ingest to the indexer. Time to open it. The three binaries are separate processes — they can't call each other's functions. They communicate by passing blocks through a durable, ordered log in Redis. Understanding this backbone explains how the system stays correct when a consumer is slow, restarts, or dies mid-block — which is the deep part you came for.

block-ingest
RPC → RawBlock → Publish
— XADD →
stream blocks:{chainID}
durable ordered log
— XREADGROUP →
indexer (consumer group)
read → process → XACK

1 · block-ingest, the producer

One instance per chain. Its job: turn the RPC into an ordered stream of RawBlock payloads. The interesting engineering is all about not being slow or wasteful:

TechniqueWhat & whyCode
Round-robin RPCComma-separated RPC_URL endpoints, rotated per call to spread load / dodge rate limits.feed.next()
Prefetch concurrencyFetch CATCHUP_BATCH_SIZE blocks ahead in parallel during catch-up — RPC latency hidden by pipelining.WithPrefetch(n)
RPC cacheEvery RPC response cached in Redis, 7-day TTL. Rewinds & restarts cost zero RPC calls.WithCache(NewRPCCache(rdb, 7d))
Finality lagStay N blocks behind head to avoid ingesting blocks that may get reorged away.WithFinalityLag(n)

Your first Go channel

The feed hands blocks to the publisher over a channel — Go's typed, thread-safe pipe between concurrent goroutines. It's buffered to the prefetch size:

// pkg/feed/rpc.go — Subscribe
ch := make(chan *types.RawBlock, prefetch)   // buffered channel, capacity = prefetch
// producer goroutine: ch <- block   (send; blocks if buffer full)
// consumer side:      block := <-ch (receive; blocks if buffer empty)

That "blocks if buffer full" is not a bug — it's flow control. If nobody drains the channel, the fetcher naturally pauses. Hold that thought; it connects to backpressure below.

Source: pkg/feed/rpc.go (RPCFeed, Subscribe, next, WithPrefetch/WithCache/WithFinalityLag), cmd/block-ingest/main.go.

2 · The stream: a durable log, not pub/sub

The producer appends each block to a Redis Stream keyed blocks:{chainID} with XADD. The critical mental model: a Redis Stream is an append-only, persistent, ordered log — not fire-and-forget pub/sub.

An anchor you already own
You know the blockchain itself: an append-only, ordered log of blocks that consumers replay from a position. A Redis Stream is the same shape, used as a work queue. Each entry has an ID, entries are ordered, and a consumer tracks "where am I." If a consumer is down, entries wait in the log — they're not lost (unlike pub/sub, which would drop them).
// pkg/queue/queue.go — Publish
q.rdb.XAdd(ctx, &redis.XAddArgs{
    Stream: "blocks:1",           // the durable log for chain 1
    MaxLen: q.maxLen, Approx: true,  // soft cap (defence-in-depth trim)
    Values: ...,                       // the serialized RawBlock
})

3 · Consumer groups: the indexer's read position

The indexer reads with XREADGROUP as part of a consumer group. A consumer group is Redis's mechanism for "a team of workers sharing a stream, where each entry is handed to exactly one member and the group remembers what's been delivered and acknowledged."

// pkg/queue/queue.go
q.rdb.XGroupCreateMkStream(ctx, streamKey, group, startID)  // once: create group (+ stream)

q.rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
    Group: group, Consumer: name,
    Streams: []string{streamKey, ">"},  // ">" = give me entries never delivered to this group
    Count:   batch,
})

Three properties fall out of this, and they're the heart of the lesson:

① At-least-once delivery (read → process → ack)

An entry delivered to a consumer enters that consumer's PEL (Pending Entries List) — "delivered but not yet acknowledged." The indexer only calls XACK after it has committed the block to Memgraph. So the cycle is:

1. XREADGROUP
entry → PEL (in-flight)
2. process
decode · filter · atomic write + cursor
3. XACK
remove from PEL — done

If the indexer crashes between steps 1 and 3, the entry stays in the PEL — unacked. It will be redelivered. That means a block can be processed more than once: at-least-once, not exactly-once.

🔗 Why Lesson 4 was built the way it was
This is the payoff. At-least-once delivery means the write path must be safe to re-run. And it is — by design: Lesson 4's coalesced state (pure function of blocks) + MERGE + the monotonic updated_block guard make re-processing a block produce a byte-identical graph. The atomic cursor commit means a crash rolls back cleanly and the redelivered block re-applies harmlessly. Streaming chose at-least-once; the write path chose idempotency; together they're crash-safe. That's one design, two halves.

② Reclaiming a dead consumer's work

What if a consumer doesn't just pause but dies with entries still in its PEL? Another consumer reclaims them with XAUTOCLAIM, so no block is stranded:

q.rdb.XAutoClaim(ctx, ...)  // take over entries idle in a dead consumer's PEL
A real war story (from the code comments)
"Stage 2026-05-19 saw this on the blocks:1 stream: 99 blocks stuck across two dead indexer pods, had to be [reclaimed]." When pods die mid-block, their PEL entries are orphaned until XAUTOCLAIM picks them up. This is exactly the failure mode the reclaim path exists for.

③ Backpressure: the producer slows when the consumer falls behind

If block-ingest raced ahead unbounded, it would pile up unread blocks in Redis (memory!) or trigger MAXLEN trimming that silently drops unread blocks. So Publish samples the consumer backlog and blocks before XADD when the consumer is too far behind:

// pkg/queue/queue.go — backpressure
// backlog = max over consumer groups of (lag + pending)
if backlog >= threshold { awaitBackpressure(ctx) }   // pause producer until it drains
A subtle correctness detail worth savouring
Backpressure samples lag + pending, not XLEN (the raw stream length). Why? XACK doesn't shrink XLEN — only XADD-with-MAXLEN trims it. If you backpressured on XLEN, then once the producer pauses, XADD stops → MAXLEN never fires → XLEN stays high forever → permanent deadlock. Sampling lag+pending means an ACKing consumer immediately reduces the number the producer is watching, so it can resume. This is the kind of bug that only shows up in production — and the code was written to avoid it.

The flow-control chain, end to end

Now the buffered channel from §1 clicks into place. SubscribeToStream calls Publish synchronously, so a blocked Publish (waiting on backpressure) stops draining the prefetch channel → the channel buffer fills → the RPC fetcher's ch <- block blocks → fetching pauses. One stalled consumer applies smooth back-pressure all the way back to the RPC calls. No config, no coordination — just blocking primitives composing.

4 · The other Redis (don't confuse them)

Recall from Lesson 1 there are two Redis instances. Keep them straight:

RedisHoldsPersistence
redis-cache (:6380)The block stream (blocks:{chainID}) + the RPC cachePersistent (you don't want to refetch blocks)
redis (:6379)Live state: cursors, balances, the monitored setSelf-healable from Memgraph if lost

Check yourself

1. How do the three binaries communicate?
2. The indexer crashes after reading a block but before XACK. What happens to that block?
3. Because delivery is at-least-once, a block may be processed twice. Why is that safe here?
4. Why does producer backpressure sample lag + pending instead of XLEN?
5. A Redis Stream differs from pub/sub because…
6. A blocked Publish (waiting on backpressure) eventually pauses the RPC fetcher. By what mechanism?
↳ Ask your teacher
Try: "Show me the real XAutoClaim reclaim path," · "What exactly is 'lag' vs 'pending' in a consumer group?" · "How does the RPC cache key work?" · "Walk me through Go channels and goroutines properly."

What you can now do

The deep thread you're pulling
Cursor (Lesson 4) + at-least-once delivery (here) + idempotent writes (Lesson 4) = a system that survives crashes, restarts, and redelivery without corrupting the graph. That's not three features — it's one coherent correctness argument. The next lesson follows this thread into the messy cases head-on.

Grounded in: pkg/queue/queue.go (XAdd/XReadGroup/XAck/XAutoClaim, backpressure, the blocks:1 war story), pkg/feed/rpc.go (RPCFeed, Subscribe, prefetch channel, cache, round-robin, finality lag), cmd/block-ingest/main.go (producer wiring). Verify against source — the code is the truth.