Skip to content

Commit 4181bc4

Browse files
joamakidylandreimerink
authored andcommitted
part: Fix missing watch channel closes
Add validation to catch if a watch channel is removed from the tree and not closed. Fix two cases where the watch channel was not properly closed: 1) When an empty key was removed the watch channel of this root node was not closed. 2) The watch channel of the non-leaf node containing the target leaf was not closed. Signed-off-by: Jussi Maki <[email protected]>
1 parent ac88a9b commit 4181bc4

File tree

3 files changed

+60
-4
lines changed

3 files changed

+60
-4
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build:
66
go build ./...
77

88
test:
9-
go test ./... -cover -vet=all -test.count 1
9+
PART_VALIDATE=1 go test ./... -cover -vet=all -test.count 1
1010

1111
test-race:
1212
go test -race ./... -test.count 1

part/tree.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func (t *Tree[T]) Txn() *Txn[T] {
6969
txn = newTxn[T](t.opts)
7070
}
7171
txn.opts = t.opts
72+
txn.oldRoot = t.root
7273
txn.root = t.root
7374
txn.size = t.size
7475
return txn

part/txn.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@ package part
55

66
import (
77
"bytes"
8+
"fmt"
9+
"os"
810
)
911

1012
// Txn is a transaction against a tree. It allows doing efficient
1113
// modifications to a tree by caching and reusing cloned nodes.
1214
type Txn[T any] struct {
13-
root *header[T]
14-
opts options
15-
size int // the number of objects in the tree
15+
oldRoot, root *header[T]
16+
opts options
17+
size int // the number of objects in the tree
1618

1719
// mutated is the set of nodes mutated in this transaction
1820
// that we can keep mutating without cloning them again.
@@ -174,6 +176,10 @@ func (txn *Txn[T]) Notify() {
174176
close(ch)
175177
}
176178
clear(txn.watches)
179+
180+
if !txn.opts.rootOnlyWatch() {
181+
validateRemovedWatches(txn.oldRoot, txn.root)
182+
}
177183
}
178184

179185
// PrintTree to the standard output. For debugging.
@@ -393,6 +399,7 @@ func (txn *Txn[T]) delete(root *header[T], key []byte) (oldValue T, hadOld bool,
393399
// Target is the root, clear it.
394400
if root.isLeaf() || newRoot.size() == 0 {
395401
// Replace leaf or empty root with a node4
402+
txn.watches[root.watch] = struct{}{}
396403
newRoot = newNode4[T]()
397404
} else {
398405
newRoot = txn.cloneNode(root)
@@ -420,6 +427,9 @@ func (txn *Txn[T]) delete(root *header[T], key []byte) (oldValue T, hadOld bool,
420427
children[target.index] = target.node
421428
} else if target.node.size() == 0 && (target.node == this || target.node.getLeaf() == nil) {
422429
// The node is empty, remove it from the parent.
430+
if target.node.watch != nil {
431+
txn.watches[target.node.watch] = struct{}{}
432+
}
423433
parent.node.remove(target.index)
424434
} else {
425435
// Update the target (as it may have been cloned)
@@ -473,3 +483,48 @@ func (txn *Txn[T]) delete(root *header[T], key []byte) (oldValue T, hadOld bool,
473483
newRoot = parents[0].node
474484
return
475485
}
486+
487+
var runValidation = os.Getenv("PART_VALIDATE") != ""
488+
489+
func validateRemovedWatches[T any](oldRoot *header[T], newRoot *header[T]) {
490+
if !runValidation {
491+
return
492+
}
493+
var collectWatches func(depth int, watches map[<-chan struct{}]int, node *header[T])
494+
collectWatches = func(depth int, watches map[<-chan struct{}]int, node *header[T]) {
495+
if node == nil {
496+
return
497+
}
498+
if node.watch == nil {
499+
panic("nil watch channel")
500+
}
501+
watches[node.watch] = depth
502+
if leaf := node.getLeaf(); leaf != nil && !node.isLeaf() {
503+
watches[leaf.watch] = depth
504+
}
505+
for _, child := range node.children() {
506+
if child != nil {
507+
collectWatches(depth+1, watches, child)
508+
}
509+
}
510+
}
511+
oldWatches := map[<-chan struct{}]int{}
512+
collectWatches(0, oldWatches, oldRoot)
513+
newWatches := map[<-chan struct{}]int{}
514+
collectWatches(0, newWatches, newRoot)
515+
516+
// Any nodes that are not part of the new tree must have their watch channels closed.
517+
for watch := range newWatches {
518+
delete(oldWatches, watch)
519+
}
520+
for watch, depth := range oldWatches {
521+
select {
522+
case <-watch:
523+
default:
524+
oldRoot.printTree(0)
525+
fmt.Println("---")
526+
newRoot.printTree(0)
527+
panic(fmt.Sprintf("dropped watch channel %p at depth %d not closed", watch, depth))
528+
}
529+
}
530+
}

0 commit comments

Comments
 (0)