Closing the loop: how decoded events become a batched, atomic write to Memgraph. ~11 min.
Lesson 3 ended with a DecodedEvent in hand. Now we complete the journey
you started in Lesson 1: decoded event → graph edge. The surprising part is what the
indexer doesn't do — it does not write one edge per event. It collects a whole batch of
blocks, nets the events down, and commits everything (plus the block cursor) in a
single atomic transaction.
DecodedEvents (Lesson 3)addTransfer / addApproval into a batchUNWIND+MERGE per edge typeThe batchAccumulator (pkg/indexer/batch_writer.go) gathers a
whole block-batch's worth of mutations before touching the database. Each surviving event is added
to an in-memory pile:
// pkg/indexer/batch_writer.go — adding events to the batch ba.addTransfer(monSet, blockNum, txHash, transferEvent) // → a HOLDS delta ba.addApproval(monSet, blockNum, approvalEvent) // → an APPROVES row ba.addLendingDelta(key, supplied, borrowed, blockNum) // → lending stats
Note monSet — this is the monitored-set filter
from Lesson 1, applied right here as events are accumulated.
Here's the key insight. If a monitored wallet receives USDC five times and sends it twice in the
same batch, the indexer does not issue seven balance writes. It sums the deltas into a
single net change per (holder, token) pair:
// pkg/indexer/batch_writer.go func (ba *batchAccumulator) accumulateDelta(key holdsKey, delta *big.Int) { // add this transfer's signed delta to the running total for holder→token } func (ba *batchAccumulator) coalesceHolds() { /* collapse to one row per edge */ }
Transfer logs; in the graph it's one final balance.
Coalescing computes that final balance once. Fewer writes — and, crucially, the result is a
pure function of the blocks, which is what makes replay safe (next section).
UNWIND + MERGE per edge typeNow the two new Cypher verbs. Instead of one query per edge, the flush ships a list of rows and lets Cypher loop over them — hundreds of edges in 2–3 queries.
| Verb | What it does | SQL-ish analogy |
|---|---|---|
UNWIND $rows AS row | Expands a list parameter into one iteration per element — the batching primitive. | "FOR EACH row IN rows" |
MERGE (n {...}) | Upsert: match the node/edge if it exists, else create it. Never duplicates. | "INSERT … ON CONFLICT DO NOTHING" + match |
ON CREATE SET … | Runs only when MERGE actually created the node — e.g. stamp pending_enrichment=true. | the "… DO INSERT" half |
Here's the real HOLDS write, lightly trimmed (pkg/indexer/batch_writer.go):
UNWIND $rows AS row MERGE (holder:Entity {id: row.holder, graph_id: $graph}) ON CREATE SET holder.pending_enrichment = true, holder.category = 'unknown' MERGE (token:Entity {id: row.token, graph_id: $graph}) ON CREATE SET token.pending_enrichment = true, token.category = 'smart_contract' MERGE (token)-[r:HOLDS]->(holder) WITH row, token, r WHERE coalesce(r.updated_block, 0) <= row.block // ← monotonic guard SET r.quantity_raw = row.balance, r.updated_block = row.block, r.category = 'CONTAINS', r.subcategory = 'HOLDS'
Read it against everything you've learned:
MERGE'd as :Entity with graph_id — Lesson 2's rules, enforced in the write.ON CREATE SET … pending_enrichment = true — this is exactly how the "bare node" from Lesson 1 is born. If the token/holder is new, it's created bare for the enrichment-worker to pick up.(token)-[:HOLDS]->(holder) — note the direction (token→holder here); recall Lesson 2's "query both directions" rule, this is why.coalesce(r.updated_block,0) <= row.block drops any write older than what's already on the edge — so a stale/replayed block can never stomp a newer balance.Source: risk-graph-indexer/pkg/indexer/batch_writer.go — flush(), the HOLDS UNWIND block. The APPROVES write right below it uses the same shape: MERGE (owner)-[r:APPROVES {token_contract: row.token}]->(spender).
Everything above runs inside a single transaction, and the very last thing written is the block cursor — the indexer's "I've processed up to here" bookmark:
// pkg/indexer/indexer.go — processBatchOpts (conceptual shape) graphstore.ExecuteWrite(ctx, store, func(tx Tx) error { acc.flush(ctx, tx) // HOLDS, APPROVES, … edges acc.flushLendingStats(ctx, tx) // lending market state tx.SetCursor(ctx, graphID, lastBlock.Number) // ← the bookmark, same tx return nil // nil → COMMIT; any error → ROLLBACK everything })
MERGE+monotonic-guarded, re-processing the same block produces the identical graph. Re-runs are harmless.ExecuteWrite (the "single-writer" model — see
docs/single-writer.md and pkg/graphwrite/). The
contract is identical to what you learned: batched, ordered, atomic-with-cursor. Learn it as one
transaction; reach for the single-writer doc when a ticket sends you into pkg/graphwrite.
MERGE (token)-[r:HOLDS]->(holder) do if the edge already exists?UNWIND $rows AS row doing in the flush query?ON CREATE SET holder.pending_enrichment = true connects to which earlier concept?UNWIND+MERGE flush → atomic commit with cursor.MERGE, ON CREATE SET, and the monotonic guard.Transfer from RPC all the way to a committed HOLDS edge — the whole loop, end to end. 🎉MERGEs with the
cursor, atomically. That's the spine of the entire system. Everything else (enrichment, the risk engine)
hangs off this.
Grounded in: pkg/indexer/batch_writer.go (batchAccumulator, addTransfer, accumulateDelta/coalesceHolds, flush + the HOLDS UNWIND/MERGE),
pkg/indexer/indexer.go (processBatchOpts: flush + flushLendingStats + SetCursor in one ExecuteWrite),
pkg/graphstore/helpers.go (ExecuteWrite), docs/single-writer.md. Verify against source — the code is the truth.