Lesson 05 · The Enrichment Worker

How a bare node learns what it is

The async classifier — and the discovery loop that makes the graph grow itself. ~11 min.

Builds on: Lesson 4 Anchor: EIP-1967, getOwners, asset() New: poll loop / goroutines New: the discovery feedback loop

The indexer creates nodes bare — just an id, a graph_id, and pending_enrichment=true (you saw this born in Lesson 4's ON CREATE SET). The enrichment-worker is the third binary, running asynchronously off the hot path, whose whole job is to answer: what is this address, and who is it connected to? — and then to feed its discoveries back into the monitored set, so the graph expands itself.

Why it's a separate binary (recall Lesson 1)
Classification means slow, rate-limited calls: eth_getCode, EIP-1967 storage reads, getOwners(), plus Etherscan/Blockscout HTTP. If the indexer did this inline it would fall behind the chain. So it's pushed here, off the hot path, where it can take its time. Indexer = fast & ordered; enrichment = slow & thorough.

1 · The poll loop (your first goroutine)

The worker is a long-running loop. In Go, Run is launched as a goroutine (a lightweight concurrent thread, started with the go keyword) and spins forever, asking Memgraph for the next batch of bare nodes:

// pkg/enrichment/worker.go — the worker loop (shape)
func (w *Worker) Run(ctx context.Context) error {
    for {                                  // loop until ctx is cancelled
        n, err := w.processBatch(ctx)       // claim + enrich a batch of pending nodes
        if ctx.Err() != nil { return ctx.Err() }
        // back off when there was nothing to do, then loop again
    }
}

It finds work with an index-gated query — anchored, never a full scan (Lesson 2's rule):

MATCH (n:Entity {graph_id: $g}) WHERE n.pending_enrichment = true ...

2 · Claim it (so two workers don't collide)

You can run several enrichment workers for throughput. That raises a classic concurrency problem: two workers must not enrich the same node at once. The fix is a claim — a worker stamps a node as "mine, until claimTTL expires" before working it. The node moves through a small lifecycle:

StateMeaning
pendingpending_enrichment=true, no claim marker — up for grabs.
claimedA worker holds it (with a TTL lease, default ~120s). Others skip it.
completedAll stages done → pending_enrichment=false.

The TTL matters: if a worker crashes mid-node, the claim expires and another worker can re-claim it — no node gets stuck forever.

Source: pkg/enrichment/worker.go (Worker.Run, processBatch, claimTTL), pkg/enrichment/claim_lifecycle.go + claim_gate.go.

3 · Classify: the staged pipeline

For each claimed node the worker runs a 15-stage pipeline (documented in docs/enrichment-pipeline.md). You already know most of the on-chain probes — they're the exact calls you'd make by hand:

Stage (grouped)How it probes — your EVM knowledge
RPC classificationeth_getCode → EOA vs contract; EIP-1967 slot read → proxy + impl; getOwners() → multisig; asset()/underlying() → vault/wrapper.
Token metadatasymbol(), name(), decimals(), totalSupply() (fails silently for non-tokens).
External APIsEtherscan (contract name, ABI, verification) + Blockscout (nametags, labels, deployer, scam flags).
ABI parsingFunction signatures → label tags (vault / oracle / pool / lending / admin).
ClassificationCombine all signals → a class_subtype.

The final classification is plain Go — a readable switch you could extend in a PR (pkg/enrichment/classify.go):

func ClassSubtype(res *Result) string {
    if res.IsProxyAdmin    { return "proxy_admin" }
    if res.IsGovernanceAdmin { return "governance_admin" }
    switch res.NodeType {
    case types.NodeMultisig: return "safe_smart_account"
    case ...:               return "debt_token", "receipt_token",
                              "curator", "bridge", "oracle" ...
    }
    return ""
}

Source: pkg/enrichment/classify.go (ClassSubtype, LabelSource, …) + the 15 stages in docs/enrichment-pipeline.md.

4 · Write the structural edges

Classification produces edges the indexer couldn't derive from a single event — the CONTROLLED_BY / OPERATED_BY families from Lesson 2: ADMIN_CTRL, OWNS, CURATES, VAULT_ASSET, BELONGS_TO, plus transitively-derived OWNS_ADMIN / RESERVE_BACKING. They're written in one Memgraph transaction — the same atomic, MERGE-based pattern you learned in Lesson 4.

Division of labour, crisply
Indexer writes the high-frequency, event-sourced edges (HOLDS, APPROVES). Enrichment writes the slow, classification-sourced structural edges (who controls / owns / curates / belongs to what). Same graph, two writers, different cadences.

5 · Discovery: the graph grows itself 🔁

This is the punchline of the whole system. While classifying, the worker discovers related addresses — a proxy's implementation, a multisig's owners, a vault's curator, a contract's deployer. Stage 15 takes all of those RelatedAddrs and adds them to the monitored set:

BARE NODE
pending_enrichment=true
CLASSIFY
find proxy impl, owners, curator, deployer
DISCOVER
RelatedAddrs → monitored:{chain}
NOW INDEXED
their events are now kept → new bare nodes
The feedback loop that closes the system
Remember from Lesson 1: an event is only processed if one party is in the monitored set. By adding discovered addresses to that set, enrichment makes the indexer start keeping their events too — which creates new bare nodes — which get enriched — which discover more addresses… The graph expands outward from known risk along real relationships, on its own. Discovery is the engine of graph growth (the third entry path into the monitored set, alongside bootstrap and promotion).

6 · Resilience: rate limits & circuit breakers

Because enrichment leans on external APIs, it must survive them being slow or down. The package has rate-limited clients and circuit breakers (backoff.go, blockscout_breaker) so a flaky Etherscan can't stall the whole worker. As a contributor, expect any external-API stage to be wrapped in retry/backoff — don't add a raw HTTP call.

Also runs alongside as independent goroutines: oracle-bridger, LP/receipt refreshers, parity monitor (see docs/enrichment-pipeline.md § Periodic Maintenance). These are the start of the risk engine — a future lesson.

Check yourself

1. What does the enrichment worker poll Memgraph for?
2. Why does a worker "claim" a node with a TTL lease before enriching it?
3. Adding a vault's discovered curator address to the monitored set causes what?
4. Which edges does enrichment write that the indexer generally can't from a single event?
5. To decide a contract is a multisig vs a vault, the RPC classification stage calls…
6. Why is classification done in a separate binary instead of inline in the indexer?
↳ Ask your teacher
Try: "Open the real claim Cypher in claim_lifecycle.go," · "How does EIP-1967 proxy resolution work in the code?" · "Show me project.go's protocol inference," · "What exactly are RelatedAddrs and where are they collected?"

What you can now do

The whole system, in one breath
block-ingest fetches blocks → indexer decodes (topic0→decoder), filters via the monitored set, coalesces & atomically writes HOLDS/etc. + cursor → enrichment-worker classifies the bare nodes, writes structural edges, and feeds discovered addresses back into the monitored set → the graph grows. The risk engine then runs analytics (DebtRank, exposure, AT_RISK) on top. You now understand all three binaries.

Grounded in: docs/enrichment-pipeline.md (the 15 stages + periodic tasks), pkg/enrichment/worker.go (Run, processBatch, claimTTL), classify.go (ClassSubtype), claim_lifecycle.go/claim_gate.go, backoff.go. Verify against source — the code is the truth.