Four idioms — and context underneath all of them. ~14 min.
Your last stated gap. Go's concurrency reputation is intimidating, but this codebase uses a small, repeated
vocabulary — four patterns cover almost everything, and one primitive, context, ties them together. You've
already read every one of these goroutines in earlier lessons; now we name the patterns so you can read any of them.
go f()) = a cheap concurrent task. channel = a typed pipe between goroutines.
select = "wait on whichever channel is ready first." context.Context = a cancellation /
deadline signal that flows down through every call. Plus two sync types — WaitGroup (wait for N tasks) and
Mutex (one-at-a-time access). That's the whole toolkit you'll see.
Every periodic component — the at_risk scheduler (L23), the oracle bridger (L27), the validator tiers (L34), the lease
keepalive (L37) — is the same shape: do work, then select between "time to go again" and "we've been cancelled."
for { doWork(ctx) select { case <-ctx.Done(): // cancelled (shutdown) → return cleanly return nil case <-time.After(interval): // the tick → loop again } }
ctx.Done() is a channel that closes when the context is cancelled; a select case on it fires immediately
when that happens. So a single cancel() at the top (on SIGTERM) propagates down every context and every loop notices
on its next select and returns — graceful shutdown, no shared flag, no kill. The lease keepalive (L37) is this exact
loop with <-ticker.C instead of time.After; on a lost lease it just returns and stops renewing.
Parallel I/O — N independent RPC reads at once — is readAllFeeds (L26). Pre-allocate a result slice, launch one
goroutine per item, each writing its own slot, then wait for all:
out := make([]navlinkReadResult, len(bindings)) // one slot per task — no shared write var wg sync.WaitGroup for i, b := range bindings { wg.Add(1) go func(i int, b candidateBinding) { // i, b passed as ARGS — capture safety defer wg.Done() out[i] = readOneFeed(ctx, b) // each goroutine owns out[i] exclusively }(i, b) } wg.Wait() // block until all N finish
out[i] and nothing else, so there's no shared-memory race —
no mutex needed, because no two goroutines touch the same memory. (b) The loop-variable capture. i and
b are passed as arguments to the goroutine func, not closed over. Before Go 1.22, closing over the loop variable
would give every goroutine the last iteration's value — a classic bug. Passing them as args snapshots the right value.
(The round-robin rpcPool.Next() inside spreads the load across RPC endpoints.)
When one unit of work might hang or panic and must not take down the loop, run it in a goroutine under a deadline and turn its failure into data. The validator's per-check runner (L34) is the template:
checkCtx, cancel := context.WithTimeout(ctx, timeout) // a child ctx that auto-cancels after `timeout` defer cancel() done := make(chan result, 1) // buffered: the goroutine never blocks sending go func() { defer func() { if r := recover(); r != nil { out.panicVal = r } }() // catch a panic → value out.findings = check(checkCtx, deps) done <- out }() select { case out := <-done: // finished (maybe with a recovered panic) → emit CHECK_PANIC finding case <-checkCtx.Done(): // timed out → emit CHECK_TIMEOUT finding, move on }
select on its Done(). recover() inside a deferred func catches a panic so a goroutine crash becomes a
return value, not a process death (L34's "the monitor monitors itself"). The buffered channel (make(chan …, 1))
means even if the runner already moved on after a timeout, the late goroutine's send doesn't block forever — it lands in
the buffer and is GC'd. This is "let one task fail without failing the loop," in Go.
Not everything needs channels. When the requirement is simply "only one of these at a time," a plain sync.Mutex is
right — HeavyMu (L31) serializes the heavy risk computers so 7+ don't thrash memory:
HeavyMu.Lock() defer HeavyMu.Unlock() runHeavyCompute() // only one heavy job holds the gate at a time
HeavyMu is an in-process mutex: it coordinates goroutines inside one binary. The graph-writer lease
(L37) is a distributed lock: Redis SET NX, coordinating across pods. Same idea — "one at a time" — at two
scales. Reach for sync.Mutex within a process; you need Redis (or similar) the moment "one at a time" must hold
across machines. Picking the wrong scale is a real bug: an in-process mutex does nothing across pods.
contextNotice ctx is the first argument to nearly every function in this codebase. That's the Go convention, and it's load-
bearing: a context carries cancellation (Done channel), deadlines (WithTimeout), and request-scoped values
(trace IDs, L11) down the call tree. The four patterns are really four ways of reacting to one context — loop until
it's done, fan out under it, time-bound a child of it, or guard a resource while honoring it. Learn to follow ctx and
you can follow the concurrency.
context underneath, cover essentially all the concurrency in risk-graph-indexer. Every goroutine you met
in L26, L34, and L37 is one of these. You can now read a Run loop, a fan-out, or an isolated task and say exactly what
it does and how it shuts down.
case <-ctx.Done(): arm of the select do?cancel() on SIGTERM cleanly stops every loop in the process. How?readAllFeeds, each goroutine writes out[i] and the results slice has one slot per task. Why no mutex?i and b as arguments to the goroutine func rather than closing over them. What does that prevent?recover() in a deferred func. What does that achieve?HeavyMu (a sync.Mutex) and the graph-writer lease (Redis SET NX) both enforce "one at a time." When do you need the lease, not the mutex?ctx the first argument to nearly every function in the codebase?for { work; select { ctx.Done / tick } } loop as the shape of every long-runner.sync.Mutex for in-process "one at a time" and a distributed lease across pods, and say why.context as the cancellation/deadline thread under all four patterns.Grounded in real goroutines: pkg/statetracker/lease.go (startKeepAlive for/select/ticker loop), pkg/risk/at_risk_scheduler.go + pkg/enrichment/oracle_bridger.go (Run for/select/time.After loops), pkg/enrichment/navlink_refresher.go (readAllFeeds WaitGroup fan-out, own-slot, args-not-closure, rpcPool.Next()), pkg/validation/validator.go (runCheck WithTimeout child ctx + recover + buffered done chan + select), pkg/risk/heavy_gate.go (HeavyMu sync.Mutex / WithHeavy); context.Context threaded throughout. Verify against source — the code is the truth.