Lesson 43 · Closing the Go Gap · Skills

How this codebase does concurrency

Four idioms — and context underneath all of them. ~14 min.

Builds on: L37 · L34 · L26 Anchor: goroutines you've already seen New: the ctx-cancellable loop New: fan-out + WaitGroup, isolate + recover

Your last stated gap. Go's concurrency reputation is intimidating, but this codebase uses a small, repeated vocabulary — four patterns cover almost everything, and one primitive, context, ties them together. You've already read every one of these goroutines in earlier lessons; now we name the patterns so you can read any of them.

Your anchor: four words
goroutine (go f()) = a cheap concurrent task. channel = a typed pipe between goroutines. select = "wait on whichever channel is ready first." context.Context = a cancellation / deadline signal that flows down through every call. Plus two sync types — WaitGroup (wait for N tasks) and Mutex (one-at-a-time access). That's the whole toolkit you'll see.

Pattern 1 · The ctx-cancellable loop (every long-runner)

Every periodic component — the at_risk scheduler (L23), the oracle bridger (L27), the validator tiers (L34), the lease keepalive (L37) — is the same shape: do work, then select between "time to go again" and "we've been cancelled."

for {
    doWork(ctx)
    select {
    case <-ctx.Done():            // cancelled (shutdown) → return cleanly
        return nil
    case <-time.After(interval):  // the tick → loop again
    }
}
ctx.Done() is the universal off-switch
ctx.Done() is a channel that closes when the context is cancelled; a select case on it fires immediately when that happens. So a single cancel() at the top (on SIGTERM) propagates down every context and every loop notices on its next select and returns — graceful shutdown, no shared flag, no kill. The lease keepalive (L37) is this exact loop with <-ticker.C instead of time.After; on a lost lease it just returns and stops renewing.

Pattern 2 · Bounded fan-out + WaitGroup join

Parallel I/O — N independent RPC reads at once — is readAllFeeds (L26). Pre-allocate a result slice, launch one goroutine per item, each writing its own slot, then wait for all:

out := make([]navlinkReadResult, len(bindings))   // one slot per task — no shared write
var wg sync.WaitGroup
for i, b := range bindings {
    wg.Add(1)
    go func(i int, b candidateBinding) {   // i, b passed as ARGS — capture safety
        defer wg.Done()
        out[i] = readOneFeed(ctx, b)            // each goroutine owns out[i] exclusively
    }(i, b)
}
wg.Wait()                                       // block until all N finish
Two things that trip up Go newcomers — both handled here
(a) The own-slot rule. Each goroutine writes out[i] and nothing else, so there's no shared-memory race — no mutex needed, because no two goroutines touch the same memory. (b) The loop-variable capture. i and b are passed as arguments to the goroutine func, not closed over. Before Go 1.22, closing over the loop variable would give every goroutine the last iteration's value — a classic bug. Passing them as args snapshots the right value. (The round-robin rpcPool.Next() inside spreads the load across RPC endpoints.)

Pattern 3 · Isolate a risky task — goroutine + timeout + recover

When one unit of work might hang or panic and must not take down the loop, run it in a goroutine under a deadline and turn its failure into data. The validator's per-check runner (L34) is the template:

checkCtx, cancel := context.WithTimeout(ctx, timeout)   // a child ctx that auto-cancels after `timeout`
defer cancel()
done := make(chan result, 1)                       // buffered: the goroutine never blocks sending
go func() {
    defer func() { if r := recover(); r != nil { out.panicVal = r } }()  // catch a panic → value
    out.findings = check(checkCtx, deps)
    done <- out
}()
select {
case out := <-done:          // finished (maybe with a recovered panic) → emit CHECK_PANIC finding
case <-checkCtx.Done():      // timed out → emit CHECK_TIMEOUT finding, move on
}
Three ideas in one block
context.WithTimeout makes a child ctx that cancels itself after the deadline — the timeout case is just a select on its Done(). recover() inside a deferred func catches a panic so a goroutine crash becomes a return value, not a process death (L34's "the monitor monitors itself"). The buffered channel (make(chan …, 1)) means even if the runner already moved on after a timeout, the late goroutine's send doesn't block forever — it lands in the buffer and is GC'd. This is "let one task fail without failing the loop," in Go.

Pattern 4 · A Mutex for shared state (when you don't need channels)

Not everything needs channels. When the requirement is simply "only one of these at a time," a plain sync.Mutex is right — HeavyMu (L31) serializes the heavy risk computers so 7+ don't thrash memory:

HeavyMu.Lock()
defer HeavyMu.Unlock()
runHeavyCompute()    // only one heavy job holds the gate at a time
In-process vs distributed locks — know which you hold
HeavyMu is an in-process mutex: it coordinates goroutines inside one binary. The graph-writer lease (L37) is a distributed lock: Redis SET NX, coordinating across pods. Same idea — "one at a time" — at two scales. Reach for sync.Mutex within a process; you need Redis (or similar) the moment "one at a time" must hold across machines. Picking the wrong scale is a real bug: an in-process mutex does nothing across pods.

The thread through all four: context

Notice ctx is the first argument to nearly every function in this codebase. That's the Go convention, and it's load- bearing: a context carries cancellation (Done channel), deadlines (WithTimeout), and request-scoped values (trace IDs, L11) down the call tree. The four patterns are really four ways of reacting to one context — loop until it's done, fan out under it, time-bound a child of it, or guard a resource while honoring it. Learn to follow ctx and you can follow the concurrency.

The Go gap, closed for reading
Four patterns — the ctx-cancellable loop, bounded fan-out + WaitGroup, isolate-with-timeout-and-recover, and the plain Mutex — plus context underneath, cover essentially all the concurrency in risk-graph-indexer. Every goroutine you met in L26, L34, and L37 is one of these. You can now read a Run loop, a fan-out, or an isolated task and say exactly what it does and how it shuts down.

Check yourself

1. In the long-runner loop, what does the case <-ctx.Done(): arm of the select do?
2. A single cancel() on SIGTERM cleanly stops every loop in the process. How?
3. In readAllFeeds, each goroutine writes out[i] and the results slice has one slot per task. Why no mutex?
4. The fan-out passes i and b as arguments to the goroutine func rather than closing over them. What does that prevent?
5. The validator runs each check in a goroutine with a recover() in a deferred func. What does that achieve?
6. How does the per-check timeout work in that same block?
7. HeavyMu (a sync.Mutex) and the graph-writer lease (Redis SET NX) both enforce "one at a time." When do you need the lease, not the mutex?
8. Why is ctx the first argument to nearly every function in the codebase?
↳ Ask your teacher
Try: "What's the difference between a buffered and unbuffered channel here?" · "Show me where the top-level context is created and cancelled on SIGTERM." · "Does the codebase use errgroup anywhere, and how does it differ from WaitGroup?" · "What's a goroutine leak and how would I spot one in a Run loop?" · "Why buffered channel size 1 in the validator, specifically?"

What you can now do

Both stated gaps, closed for reading
Cypher (L42) and Go concurrency (here) were your two biggest gaps; streaming was the third, covered by L7 + the statetracker primitives (L37). You can now read this codebase's hot paths — the Cypher it runs and the goroutines that run it — and reason about correctness. That was the mission's "solid-enough grip on the stack to read any hot path."

Grounded in real goroutines: pkg/statetracker/lease.go (startKeepAlive for/select/ticker loop), pkg/risk/at_risk_scheduler.go + pkg/enrichment/oracle_bridger.go (Run for/select/time.After loops), pkg/enrichment/navlink_refresher.go (readAllFeeds WaitGroup fan-out, own-slot, args-not-closure, rpcPool.Next()), pkg/validation/validator.go (runCheck WithTimeout child ctx + recover + buffered done chan + select), pkg/risk/heavy_gate.go (HeavyMu sync.Mutex / WithHeavy); context.Context threaded throughout. Verify against source — the code is the truth.