-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(badger): add support for Badger version 4. The default remains Badger version 2 to ensure backward compatibility. #12316
base: feat/faster-datastore-with-badger
Are you sure you want to change the base?
Changes from 41 commits
30fbab6
7a278cf
bee642a
b077093
0e14b09
a4b7538
6c4f012
1690d3d
4adc087
a3c1c2d
da5f823
e4ffc68
b3ecb25
6aa56cc
94f3ce4
5aefc8e
d680f6f
0daa826
dafc880
395be91
91d5680
fd626f9
900d31f
d0143fb
1b1bbd0
55918db
a1d39a7
dd95024
279f2ba
9f82d83
c01521f
92c3f2d
ab99ef0
feac779
3fd2707
48f50a6
2cac6a6
df59d5d
2a65044
50cabd0
cab56a8
6c92ba1
c461160
6346cc6
52604e9
4de5f71
d2de85e
ee737c9
6d5d679
26610e7
c973d48
0dc3a85
624779f
c7e3a84
7b22c1c
59fe7e3
d415d9f
517c7ae
0225c91
dbef5de
a5cb674
8518d23
475139f
4a4ddaa
8f4299e
5ccbb3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,20 +10,23 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"github.com/dgraph-io/badger/v2" | ||
"github.com/dgraph-io/badger/v2/options" | ||
badgerstruct "github.com/dgraph-io/badger/v2/pb" | ||
blocks "github.com/ipfs/go-block-format" | ||
"github.com/ipfs/go-cid" | ||
ipld "github.com/ipfs/go-ipld-format" | ||
logger "github.com/ipfs/go-log/v2" | ||
pool "github.com/libp2p/go-buffer-pool" | ||
"github.com/multiformats/go-base32" | ||
"go.uber.org/multierr" | ||
"go.uber.org/zap" | ||
"golang.org/x/xerrors" | ||
|
||
"github.com/filecoin-project/lotus/blockstore" | ||
"github.com/filecoin-project/lotus/blockstore/badger/versions" | ||
badger "github.com/filecoin-project/lotus/blockstore/badger/versions" | ||
) | ||
|
||
// aliases to mask badger dependencies. | ||
const ( | ||
defaultGCThreshold = 0.125 | ||
) | ||
|
||
var ( | ||
|
@@ -39,46 +42,6 @@ var ( | |
log = logger.Logger("badgerbs") | ||
) | ||
|
||
// aliases to mask badger dependencies. | ||
const ( | ||
// FileIO is equivalent to badger/options.FileIO. | ||
FileIO = options.FileIO | ||
// MemoryMap is equivalent to badger/options.MemoryMap. | ||
MemoryMap = options.MemoryMap | ||
// LoadToRAM is equivalent to badger/options.LoadToRAM. | ||
LoadToRAM = options.LoadToRAM | ||
defaultGCThreshold = 0.125 | ||
) | ||
|
||
// Options embeds the badger options themselves, and augments them with | ||
// blockstore-specific options. | ||
type Options struct { | ||
badger.Options | ||
|
||
// Prefix is an optional prefix to prepend to keys. Default: "". | ||
Prefix string | ||
} | ||
|
||
func DefaultOptions(path string) Options { | ||
return Options{ | ||
Options: badger.DefaultOptions(path), | ||
Prefix: "", | ||
} | ||
} | ||
|
||
// badgerLogger is a local wrapper for go-log to make the interface | ||
// compatible with badger.Logger (namely, aliasing Warnf to Warningf) | ||
type badgerLogger struct { | ||
*zap.SugaredLogger // skips 1 caller to get useful line info, skipping over badger.Options. | ||
|
||
skip2 *zap.SugaredLogger // skips 2 callers, just like above + this logger. | ||
} | ||
|
||
// Warningf is required by the badger logger APIs. | ||
func (b *badgerLogger) Warningf(format string, args ...interface{}) { | ||
b.skip2.Warnf(format, args...) | ||
} | ||
|
||
// bsState is the current blockstore state | ||
type bsState int | ||
|
||
|
@@ -115,9 +78,9 @@ type Blockstore struct { | |
moveState bsMoveState | ||
rlock int | ||
|
||
db *badger.DB | ||
dbNext *badger.DB // when moving | ||
opts Options | ||
db badger.BadgerDB | ||
dbNext badger.BadgerDB // when moving | ||
opts badger.Options | ||
|
||
prefixing bool | ||
prefix []byte | ||
|
@@ -132,13 +95,9 @@ var _ blockstore.BlockstoreSize = (*Blockstore)(nil) | |
var _ io.Closer = (*Blockstore)(nil) | ||
|
||
// Open creates a new badger-backed blockstore, with the supplied options. | ||
func Open(opts Options) (*Blockstore, error) { | ||
opts.Logger = &badgerLogger{ | ||
SugaredLogger: log.Desugar().WithOptions(zap.AddCallerSkip(1)).Sugar(), | ||
skip2: log.Desugar().WithOptions(zap.AddCallerSkip(2)).Sugar(), | ||
} | ||
func Open(opts badger.Options) (*Blockstore, error) { | ||
|
||
db, err := badger.Open(opts.Options) | ||
db, err := badger.OpenBadgerDB(opts) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to open badger blockstore: %w", err) | ||
} | ||
|
@@ -315,10 +274,10 @@ func (b *Blockstore) movingGC(ctx context.Context) error { | |
log.Infof("moving blockstore from %s to %s", b.opts.Dir, newPath) | ||
|
||
opts := b.opts | ||
opts.Dir = newPath | ||
opts.ValueDir = newPath | ||
opts.SetDir(newPath) | ||
opts.SetValueDir(newPath) | ||
|
||
dbNew, err := badger.Open(opts.Options) | ||
dbNew, err := badger.OpenBadgerDB(opts) | ||
if err != nil { | ||
return fmt.Errorf("failed to open badger blockstore in %s: %w", newPath, err) | ||
} | ||
|
@@ -391,65 +350,8 @@ func symlink(path, linkTo string) error { | |
} | ||
|
||
// doCopy copies a badger blockstore to another | ||
func (b *Blockstore) doCopy(ctx context.Context, from, to *badger.DB) (defErr error) { | ||
batch := to.NewWriteBatch() | ||
defer func() { | ||
if defErr == nil { | ||
defErr = batch.Flush() | ||
} | ||
if defErr != nil { | ||
batch.Cancel() | ||
} | ||
}() | ||
|
||
return iterateBadger(ctx, from, func(kvs []*badgerstruct.KV) error { | ||
// check whether context is closed on every kv group | ||
if err := ctx.Err(); err != nil { | ||
return err | ||
} | ||
for _, kv := range kvs { | ||
if err := batch.Set(kv.Key, kv.Value); err != nil { | ||
return err | ||
} | ||
} | ||
return nil | ||
}) | ||
} | ||
|
||
var IterateLSMWorkers int // defaults to between( 2, 8, runtime.NumCPU/2 ) | ||
|
||
func iterateBadger(ctx context.Context, db *badger.DB, iter func([]*badgerstruct.KV) error) error { | ||
workers := IterateLSMWorkers | ||
if workers == 0 { | ||
workers = between(2, 8, runtime.NumCPU()/2) | ||
} | ||
|
||
stream := db.NewStream() | ||
stream.NumGo = workers | ||
stream.LogPrefix = "iterateBadgerKVs" | ||
stream.Send = func(kvl *badgerstruct.KVList) error { | ||
kvs := make([]*badgerstruct.KV, 0, len(kvl.Kv)) | ||
for _, kv := range kvl.Kv { | ||
if kv.Key != nil && kv.Value != nil { | ||
kvs = append(kvs, kv) | ||
} | ||
} | ||
if len(kvs) == 0 { | ||
return nil | ||
} | ||
return iter(kvs) | ||
} | ||
return stream.Orchestrate(ctx) | ||
} | ||
|
||
func between(min, max, val int) int { | ||
if val > max { | ||
val = max | ||
} | ||
if val < min { | ||
val = min | ||
} | ||
return val | ||
func (b *Blockstore) doCopy(ctx context.Context, from versions.BadgerDB, to versions.BadgerDB) error { | ||
return from.Copy(ctx, to) | ||
} | ||
|
||
func (b *Blockstore) deleteDB(path string) { | ||
|
@@ -505,7 +407,7 @@ func (b *Blockstore) onlineGC(ctx context.Context, threshold float64, checkFreq | |
} | ||
} | ||
|
||
if err == badger.ErrNoRewrite { | ||
if err == b.db.GetErrNoRewrite() { | ||
// not really an error in this case, it signals the end of GC | ||
return nil | ||
} | ||
|
@@ -578,7 +480,7 @@ func (b *Blockstore) GCOnce(ctx context.Context, opts ...blockstore.BlockstoreGC | |
|
||
// Note no compaction needed before single GC as we will hit at most one vlog anyway | ||
err := b.db.RunValueLogGC(threshold) | ||
if err == badger.ErrNoRewrite { | ||
if err == b.db.GetErrNoRewrite() { | ||
// not really an error in this case, it signals the end of GC | ||
return nil | ||
} | ||
|
@@ -636,11 +538,14 @@ func (b *Blockstore) View(ctx context.Context, cid cid.Cid, fn func([]byte) erro | |
defer KeyPool.Put(k) | ||
} | ||
|
||
return b.db.View(func(txn *badger.Txn) error { | ||
return b.db.View(func(txn badger.Txn) error { | ||
|
||
errKeyNotFound := b.db.GetErrKeyNotFound() | ||
|
||
switch item, err := txn.Get(k); err { | ||
case nil: | ||
return item.Value(fn) | ||
case badger.ErrKeyNotFound: | ||
case errKeyNotFound: | ||
return ipld.ErrNotFound{Cid: cid} | ||
default: | ||
return fmt.Errorf("failed to view block from badger blockstore: %w", err) | ||
|
@@ -683,13 +588,14 @@ func (b *Blockstore) Has(ctx context.Context, cid cid.Cid) (bool, error) { | |
defer KeyPool.Put(k) | ||
} | ||
|
||
err := b.db.View(func(txn *badger.Txn) error { | ||
err := b.db.View(func(txn badger.Txn) error { | ||
_, err := txn.Get(k) | ||
return err | ||
}) | ||
|
||
errKeyNotFound := b.db.GetErrKeyNotFound() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't great. Other options:
I can't think of a great solution, tbh. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked into |
||
switch err { | ||
case badger.ErrKeyNotFound: | ||
case errKeyNotFound: | ||
return false, nil | ||
case nil: | ||
return true, nil | ||
|
@@ -718,12 +624,13 @@ func (b *Blockstore) Get(ctx context.Context, cid cid.Cid) (blocks.Block, error) | |
} | ||
|
||
var val []byte | ||
err := b.db.View(func(txn *badger.Txn) error { | ||
err := b.db.View(func(txn badger.Txn) error { | ||
errKeyNotFound := b.db.GetErrKeyNotFound() | ||
switch item, err := txn.Get(k); err { | ||
case nil: | ||
val, err = item.ValueCopy(nil) | ||
return err | ||
case badger.ErrKeyNotFound: | ||
case errKeyNotFound: | ||
return ipld.ErrNotFound{Cid: cid} | ||
default: | ||
return fmt.Errorf("failed to get block from badger blockstore: %w", err) | ||
|
@@ -751,11 +658,12 @@ func (b *Blockstore) GetSize(ctx context.Context, cid cid.Cid) (int, error) { | |
} | ||
|
||
var size int | ||
err := b.db.View(func(txn *badger.Txn) error { | ||
err := b.db.View(func(txn badger.Txn) error { | ||
errKeyNotFound := b.db.GetErrKeyNotFound() | ||
switch item, err := txn.Get(k); err { | ||
case nil: | ||
size = int(item.ValueSize()) | ||
case badger.ErrKeyNotFound: | ||
case errKeyNotFound: | ||
return ipld.ErrNotFound{Cid: cid} | ||
default: | ||
return fmt.Errorf("failed to get block size from badger blockstore: %w", err) | ||
|
@@ -805,10 +713,13 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { | |
keys = append(keys, k) | ||
} | ||
|
||
err := b.db.View(func(txn *badger.Txn) error { | ||
err := b.db.View(func(txn badger.Txn) error { | ||
|
||
errKeyNotFound := b.db.GetErrKeyNotFound() | ||
|
||
for i, k := range keys { | ||
switch _, err := txn.Get(k); err { | ||
case badger.ErrKeyNotFound: | ||
case errKeyNotFound: | ||
case nil: | ||
keys[i] = nil | ||
default: | ||
|
@@ -822,7 +733,7 @@ func (b *Blockstore) PutMany(ctx context.Context, blocks []blocks.Block) error { | |
return err | ||
} | ||
|
||
put := func(db *badger.DB) error { | ||
put := func(db badger.BadgerDB) error { | ||
batch := db.NewWriteBatch() | ||
defer batch.Cancel() | ||
|
||
|
@@ -1070,6 +981,6 @@ func (b *Blockstore) StorageKey(dst []byte, cid cid.Cid) []byte { | |
|
||
// DB is added for lotus-shed needs | ||
// WARNING: THIS IS COMPLETELY UNSAFE; DONT USE THIS IN PRODUCTION CODE | ||
func (b *Blockstore) DB() *badger.DB { | ||
func (b *Blockstore) DB() badger.BadgerDB { | ||
return b.db | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs expansion. Maybe badger v4 should should be its own section? Maybe something like:
Experimental Badger v4 Support
With filecoin-project/lotus#12316, users can now opt-in to using Badger v4 instead of v2 for the datastore.
Why upgrade?
How to upgrade?
The v2 and v4 datastores are incompatible. Badger directories are directories, it's advised to first copy your v2 datastore. Then enable v4 with
LOTUS_CHAINSTORE_BADGERVERSION=4
. Download a recent mainnet snapshot (link to snapshot directory) to import usinglotus command
.If you run into any problems please report them by opening an issue and you can also rollback with
LOTUS_CHAINSTORE_BADGERVERSION=2
and copying back v2 directory.