-
Notifications
You must be signed in to change notification settings - Fork 2
/
deletetracker.go
87 lines (72 loc) · 2.41 KB
/
deletetracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package statedb
import (
"sync/atomic"
"github.com/cilium/statedb/index"
)
type deleteTracker[Obj any] struct {
db *DB
trackerName string
table Table[Obj]
// revision is the last observed revision. Starts out at zero
// in which case the garbage collector will not care about this
// tracker when considering which objects to delete.
revision atomic.Uint64
}
// setRevision is called to set the starting low watermark when
// this deletion tracker is inserted into the table.
func (dt *deleteTracker[Obj]) setRevision(rev uint64) {
dt.revision.Store(rev)
}
// getRevision is called by the graveyard garbage collector to
// compute the global low watermark.
func (dt *deleteTracker[Obj]) getRevision() uint64 {
return dt.revision.Load()
}
// Deleted returns an iterator for deleted objects in this table starting from
// 'minRevision'. The deleted objects are not garbage-collected unless 'Mark' is
// called!
func (dt *deleteTracker[Obj]) deleted(txn *txn, minRevision Revision) Iterator[Obj] {
indexEntry := txn.root[dt.table.tablePos()].indexes[GraveyardRevisionIndexPos]
indexTxn := indexReadTxn{indexEntry.tree, indexEntry.unique}
iter := indexTxn.LowerBound(index.Uint64(minRevision))
return &iterator[Obj]{iter}
}
// Mark the revision up to which deleted objects have been processed. This sets
// the low watermark for deleted object garbage collection.
func (dt *deleteTracker[Obj]) mark(upTo Revision) {
// Store the new low watermark and trigger a round of garbage collection.
dt.revision.Store(upTo)
select {
case dt.db.gcTrigger <- struct{}{}:
default:
}
}
func (dt *deleteTracker[Obj]) close() {
if dt.db == nil {
return
}
// Remove the delete tracker from the table.
txn := dt.db.WriteTxn(dt.table).getTxn()
dt.db = nil
db := txn.db
table := txn.modifiedTables[dt.table.tablePos()]
if table == nil {
panic("BUG: Table missing from write transaction")
}
_, _, table.deleteTrackers = table.deleteTrackers.Delete([]byte(dt.trackerName))
txn.Commit()
db.metrics.DeleteTrackerCount(dt.table.Name(), table.deleteTrackers.Len())
// Trigger garbage collection without this delete tracker to garbage
// collect any deleted objects that may not have been consumed.
select {
case db.gcTrigger <- struct{}{}:
default:
}
}
var closedWatchChannel = func() <-chan struct{} {
ch := make(chan struct{})
close(ch)
return ch
}()