Lesson 04 · The Write Path

From a typed event to a graph edge

Closing the loop: how decoded events become a batched, atomic write to Memgraph. ~11 min.

Builds on: Lesson 3 New: MERGE (upsert) New: UNWIND (batch) New: atomic commit

Lesson 3 ended with a DecodedEvent in hand. Now we complete the journey you started in Lesson 1: decoded event → graph edge. The surprising part is what the indexer doesn't do — it does not write one edge per event. It collects a whole batch of blocks, nets the events down, and commits everything (plus the block cursor) in a single atomic transaction.

DECODE
events → DecodedEvents (Lesson 3)
ACCUMULATE
addTransfer / addApproval into a batch
COALESCE
net many deltas → one per edge
FLUSH
one UNWIND+MERGE per edge type
COMMIT
+cursor, atomically

1 · Accumulate, don't write-per-event

The batchAccumulator (pkg/indexer/batch_writer.go) gathers a whole block-batch's worth of mutations before touching the database. Each surviving event is added to an in-memory pile:

// pkg/indexer/batch_writer.go — adding events to the batch
ba.addTransfer(monSet, blockNum, txHash, transferEvent)   // → a HOLDS delta
ba.addApproval(monSet, blockNum, approvalEvent)          // → an APPROVES row
ba.addLendingDelta(key, supplied, borrowed, blockNum)     // → lending stats

Note monSet — this is the monitored-set filter from Lesson 1, applied right here as events are accumulated.

2 · Coalesce: net the deltas

Here's the key insight. If a monitored wallet receives USDC five times and sends it twice in the same batch, the indexer does not issue seven balance writes. It sums the deltas into a single net change per (holder, token) pair:

// pkg/indexer/batch_writer.go
func (ba *batchAccumulator) accumulateDelta(key holdsKey, delta *big.Int) {
    // add this transfer's signed delta to the running total for holder→token
}
func (ba *batchAccumulator) coalesceHolds() { /* collapse to one row per edge */ }
Your EVM anchor
You know a block can contain dozens of transfers of the same token to the same address. On-chain that's dozens of Transfer logs; in the graph it's one final balance. Coalescing computes that final balance once. Fewer writes — and, crucially, the result is a pure function of the blocks, which is what makes replay safe (next section).

3 · Flush: one UNWIND + MERGE per edge type

Now the two new Cypher verbs. Instead of one query per edge, the flush ships a list of rows and lets Cypher loop over them — hundreds of edges in 2–3 queries.

VerbWhat it doesSQL-ish analogy
UNWIND $rows AS rowExpands a list parameter into one iteration per element — the batching primitive."FOR EACH row IN rows"
MERGE (n {...})Upsert: match the node/edge if it exists, else create it. Never duplicates."INSERT … ON CONFLICT DO NOTHING" + match
ON CREATE SET …Runs only when MERGE actually created the node — e.g. stamp pending_enrichment=true.the "… DO INSERT" half

Here's the real HOLDS write, lightly trimmed (pkg/indexer/batch_writer.go):

UNWIND $rows AS row
MERGE (holder:Entity {id: row.holder, graph_id: $graph})
  ON CREATE SET holder.pending_enrichment = true, holder.category = 'unknown'
MERGE (token:Entity {id: row.token, graph_id: $graph})
  ON CREATE SET token.pending_enrichment = true, token.category = 'smart_contract'
MERGE (token)-[r:HOLDS]->(holder)
WITH row, token, r
WHERE coalesce(r.updated_block, 0) <= row.block        // ← monotonic guard
SET r.quantity_raw = row.balance, r.updated_block = row.block,
    r.category = 'CONTAINS', r.subcategory = 'HOLDS'

Read it against everything you've learned:

Source: risk-graph-indexer/pkg/indexer/batch_writer.goflush(), the HOLDS UNWIND block. The APPROVES write right below it uses the same shape: MERGE (owner)-[r:APPROVES {token_contract: row.token}]->(spender).

4 · Commit: mutations + cursor, all-or-nothing

Everything above runs inside a single transaction, and the very last thing written is the block cursor — the indexer's "I've processed up to here" bookmark:

// pkg/indexer/indexer.go — processBatchOpts (conceptual shape)
graphstore.ExecuteWrite(ctx, store, func(tx Tx) error {
    acc.flush(ctx, tx)              // HOLDS, APPROVES, … edges
    acc.flushLendingStats(ctx, tx)   // lending market state
    tx.SetCursor(ctx, graphID, lastBlock.Number)   // ← the bookmark, same tx
    return nil                       // nil → COMMIT; any error → ROLLBACK everything
})
Why this is the most important property in the system
Because the mutations and the cursor commit together or not at all: This is the "atomic commit" promise from Lesson 1, now concrete.
Contributor note: the real architecture is a touch deeper
In the live system the indexer often records statements and a dedicated graph-writer applies them inside the ExecuteWrite (the "single-writer" model — see docs/single-writer.md and pkg/graphwrite/). The contract is identical to what you learned: batched, ordered, atomic-with-cursor. Learn it as one transaction; reach for the single-writer doc when a ticket sends you into pkg/graphwrite.

Check yourself

1. A wallet receives the same token 6 times in one block batch. How many HOLDS balance writes does the indexer make for that pair?
2. What does MERGE (token)-[r:HOLDS]->(holder) do if the edge already exists?
3. Why is the block cursor written in the same transaction as the edge mutations?
4. What is UNWIND $rows AS row doing in the flush query?
5. In the HOLDS write, ON CREATE SET holder.pending_enrichment = true connects to which earlier concept?
6. Re-processing an already-indexed block produces a byte-identical graph. Which two properties make that true?
↳ Ask your teacher
Try: "Open the real APPROVES write next to HOLDS," · "What's a 'lane' / the single-writer model in pkg/graphwrite?" · "Show me a non-HOLDS handler like applySafeAddedOwner," · "Walk me through what happens on a chain reorg."

What you can now do

🎉 You've closed the loop
Lessons 1–4 together: block-ingest fetches a block → the indexer decodes events (topic0 → decoder) → filters via the monitored set → accumulates & coalesces → flushes batched MERGEs with the cursor, atomically. That's the spine of the entire system. Everything else (enrichment, the risk engine) hangs off this.

Grounded in: pkg/indexer/batch_writer.go (batchAccumulator, addTransfer, accumulateDelta/coalesceHolds, flush + the HOLDS UNWIND/MERGE), pkg/indexer/indexer.go (processBatchOpts: flush + flushLendingStats + SetCursor in one ExecuteWrite), pkg/graphstore/helpers.go (ExecuteWrite), docs/single-writer.md. Verify against source — the code is the truth.