Skip to content

Commit

Permalink
TTL Snapshot Bug Fixes (#75)
Browse files Browse the repository at this point in the history
* badger snapshot ttl fix

* change iter to *storage.KVEntry

* CreateTempFile signature change

* rocksdb snapshot ttl fix1

* use archiever

* rewrite test dkv cluster using goreman

* fmt changes

* minor typo changes

* refactor constants

* rocksdb snapshot remove zipping (#80)

* badger snapshot ttl fix

* change iter to *storage.KVEntry

* CreateTempFile signature change

* rocksdb snapshot ttl fix1

* use archiever

* rewrite test dkv cluster using goreman

* fmt changes

* minor typo changes

* refactor constants

* Update main.yml

* rocksdb snapshot remove zipping

* Update tar_test.go

* tidy

* Update tar.go
  • Loading branch information
kingster authored Jul 15, 2021
1 parent b352f98 commit 4cf243a
Show file tree
Hide file tree
Showing 18 changed files with 552 additions and 95 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ on:
branches:
- master
pull_request:
branches:
- master
# branches:
# - master

jobs:
dkv_server_job:
Expand Down
12 changes: 7 additions & 5 deletions cmd/dkvsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,13 +345,14 @@ func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeAppl

dataDir := path.Join(dbFolder, "data")
slg.Infof("Using %s as data directory", dataDir)

sstDir := path.Join(dbFolder, "sst")
if err := os.MkdirAll(sstDir, 0777); err != nil {
slg.Fatalf("Unable to create sst folder at %s. Error: %v.", dbFolder, err)
}

switch dbEngine {
case "rocksdb":
sstDir := path.Join(dbFolder, "sst")
if err := os.MkdirAll(sstDir, 0777); err != nil {
slg.Fatalf("Unable to create sst folder at %s. Error: %v.", dbFolder, err)
}

rocksDb, err := rocksdb.OpenDB(dataDir,
rocksdb.WithSSTDir(sstDir),
rocksdb.WithSyncWrites(),
Expand All @@ -367,6 +368,7 @@ func newKVStore() (storage.KVStore, storage.ChangePropagator, storage.ChangeAppl
var badgerDb badger.DB
var err error
bdbOpts := []badger.DBOption{
badger.WithSSTDir(sstDir),
badger.WithSyncWrites(),
badger.WithCacheSize(blockCacheSize),
badger.WithBadgerConfig(dbEngineIni),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/golang/snappy v0.0.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/kpango/fastime v1.0.16
github.com/matttproud/golang_protobuf_extensions v1.0.1
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.5.1 // indirect
github.com/prometheus/procfs v0.0.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/master/ds_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func newDistributedDKVNode(id int, nodeURL, clusURL string) (DKVService, *grpc.S
dkvRepl := newReplicator(kvs, nodeURL, clusURL)
dkvRepl.Start()
lgr, _ := zap.NewDevelopment()
distSrv := NewDistributedService(kvs, cp, br, dkvRepl, lgr , stats.NewNoOpClient())
distSrv := NewDistributedService(kvs, cp, br, dkvRepl, lgr, stats.NewNoOpClient())
grpcSrv := grpc.NewServer()
serverpb.RegisterDKVServer(grpcSrv, distSrv)
serverpb.RegisterDKVClusterServer(grpcSrv, distSrv)
Expand Down
8 changes: 4 additions & 4 deletions internal/master/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ func (ss *standaloneService) GetReplicas(ctx context.Context, req *serverpb.GetR

var replicas []*serverpb.Replica
for iter.HasNext() {
key, val := iter.Next()
replicaKey, replicaVal := string(key), string(val)
entry := iter.Next()
replicaKey, replicaVal := string(entry.Key), string(entry.Value)
replicaAddr := strings.TrimPrefix(replicaKey, dkvMetaReplicaPrefix)

// checking for valid replicas and not the removed ones whose values are empty
Expand Down Expand Up @@ -265,8 +265,8 @@ func (ss *standaloneService) Iterate(iterReq *serverpb.IterateRequest, dkvIterSr
defer ss.rwl.RUnlock()

iteration := storage.NewIteration(ss.store, iterReq)
err := iteration.ForEach(func(k, v []byte) error {
itRes := &serverpb.IterateResponse{Status: newEmptyStatus(), Key: k, Value: v}
err := iteration.ForEach(func(e *storage.KVEntry) error {
itRes := &serverpb.IterateResponse{Status: newEmptyStatus(), Key: e.Key, Value: e.Value}
return dkvIterSrvr.Send(itRes)
})
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/slave/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ func (ss *slaveService) MultiGet(ctx context.Context, multiGetReq *serverpb.Mult

func (ss *slaveService) Iterate(iterReq *serverpb.IterateRequest, dkvIterSrvr serverpb.DKV_IterateServer) error {
iteration := storage.NewIteration(ss.store, iterReq)
err := iteration.ForEach(func(k, v []byte) error {
itRes := &serverpb.IterateResponse{Status: newEmptyStatus(), Key: k, Value: v}
err := iteration.ForEach(func(e *storage.KVEntry) error {
itRes := &serverpb.IterateResponse{Status: newEmptyStatus(), Key: e.Key, Value: e.Value}
return dkvIterSrvr.Send(itRes)
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/stats/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func statsdFlushInterval(s *bufio.Scanner, w *bufio.Writer) time.Duration {
return 10 * time.Millisecond
}

func connectToStatsD(t *testing.T, statsdAdminURL string) (net.Conn , error){
func connectToStatsD(t *testing.T, statsdAdminURL string) (net.Conn, error) {
conn, err := net.Dial("tcp", statsdAdminURL)
if err != nil {
t.Logf("Unable to connect to StatsD admin endpoint: %s.", statsdAdminURL)
Expand Down
73 changes: 55 additions & 18 deletions internal/storage/badger/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"bytes"
"context"
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"github.com/matttproud/golang_protobuf_extensions/pbutil"
"io"
"io/ioutil"
"os"
"path"
"strconv"
Expand Down Expand Up @@ -44,9 +46,10 @@ type badgerDB struct {
}

type bdgrOpts struct {
opts badger.Options
lgr *zap.Logger
statsCli stats.Client
opts badger.Options
lgr *zap.Logger
statsCli stats.Client
sstDirectory string
}

// DBOption is used to configure the Badger
Expand Down Expand Up @@ -142,6 +145,14 @@ func WithDBDir(dir string) DBOption {
}
}

// WithSSTDir configures the directory to be used
// for SST Operation on Badger.
func WithSSTDir(sstDir string) DBOption {
return func(opts *bdgrOpts) {
opts.sstDirectory = sstDir
}
}

// WithInMemory sets Badger storage to operate entirely
// in memory. No files are created on disk whatsoever.
func WithInMemory() DBOption {
Expand Down Expand Up @@ -271,38 +282,64 @@ func (bdb *badgerDB) CompareAndSet(key, expect, update []byte) (bool, error) {
return err == nil, err
}

const (
badgerSSTPrefix = "badger-snapshot-"
)

func (bdb *badgerDB) GetSnapshot() ([]byte, error) {
defer bdb.opts.statsCli.Timing("badger.snapshot.get.latency.ms", time.Now())

sstFile, err := storage.CreateTempFile(bdb.opts.sstDirectory, badgerSSTPrefix)
if err != nil {
return nil, err
}
defer os.Remove(sstFile.Name())

// TODO: Check if any options need to be set on stream
strm := bdb.db.NewStream()
snap := make(map[string][]byte)
w := bufio.NewWriter(sstFile)

strm.Send = func(list *badger_pb.KVList) error {
for _, kv := range list.Kv {
snap[string(kv.Key)] = kv.Value
entry := serverpb.PutRequest{Key: kv.Key, Value: kv.Value, ExpireTS: kv.ExpiresAt}
_, err := pbutil.WriteDelimited(w, &entry)
if err != nil {
return err
}
}
return nil
}
if err := strm.Orchestrate(context.Background()); err != nil {
return nil, err
}

var buf bytes.Buffer
err := gob.NewEncoder(&buf).Encode(snap)
return buf.Bytes(), err
w.Flush()
sstFile.Close()

return ioutil.ReadFile(sstFile.Name())
}

func (bdb *badgerDB) PutSnapshot(snap []byte) error {
defer bdb.opts.statsCli.Timing("badger.snapshot.put.latency.ms", time.Now())
buf := bytes.NewBuffer(snap)
data := make(map[string][]byte)
if err := gob.NewDecoder(buf).Decode(&data); err != nil {
return err
}

in := bytes.NewReader(snap)
wb := bdb.db.NewWriteBatch()
defer wb.Cancel()
for key, val := range data {
if err := wb.Set([]byte(key), val); err != nil {

entry := &serverpb.PutRequest{}
for {
entry.Reset()
if _, err := pbutil.ReadDelimited(in, entry); err != nil {
if err == io.EOF {
break
}
}

kv := badger.NewEntry(entry.Key, entry.Value)
if entry.ExpireTS > 0 {
kv.ExpiresAt = entry.ExpireTS
}
if err := wb.SetEntry(kv); err != nil {
return err
}
}
Expand Down Expand Up @@ -540,15 +577,15 @@ func (bdbIter *iter) HasNext() bool {
return bdbIter.it.Valid()
}

func (bdbIter *iter) Next() ([]byte, []byte) {
func (bdbIter *iter) Next() *storage.KVEntry {
defer bdbIter.it.Next()
item := bdbIter.it.Item()
key := item.KeyCopy(nil)
val, err := item.ValueCopy(nil)
if err != nil {
bdbIter.iterErr = err
}
return key, val
return &storage.KVEntry{Key: key, Value: val, ExpireTS: item.ExpiresAt()}
}

func (bdbIter *iter) Err() error {
Expand Down
16 changes: 8 additions & 8 deletions internal/storage/badger/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,12 +482,12 @@ func TestIteratorPrefixScan(t *testing.T) {

actCount := 0
for it.HasNext() {
key, val := it.Next()
entry := it.Next()
actCount++
if strings.HasPrefix(string(key), string(prefix)) {
t.Logf("Key: %s Value: %s\n", key, val)
if strings.HasPrefix(string(entry.Key), string(prefix)) {
t.Logf("Key: %s Value: %s\n", entry.Key, entry.Value)
} else {
t.Errorf("Expected key %s to have prefix %s", key, prefix)
t.Errorf("Expected key %s to have prefix %s", entry.Key, prefix)
}
}

Expand Down Expand Up @@ -518,12 +518,12 @@ func TestIteratorFromStartKey(t *testing.T) {

actCount := 0
for it.HasNext() {
key, val := it.Next()
entry := it.Next()
actCount++
if strings.HasPrefix(string(key), string(prefix)) {
t.Logf("Key: %s Value: %s\n", key, val)
if strings.HasPrefix(string(entry.Key), string(prefix)) {
t.Logf("Key: %s Value: %s\n", entry.Key, entry.Value)
} else {
t.Errorf("Expected key %s to have prefix %s", key, prefix)
t.Errorf("Expected key %s to have prefix %s", entry.Key, prefix)
}
}

Expand Down
6 changes: 3 additions & 3 deletions internal/storage/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,23 +76,23 @@ func IterationStartKey(start []byte) IterationOption {
type Iterator interface {
io.Closer
HasNext() bool
Next() ([]byte, []byte)
Next() *KVEntry
Err() error
}

// Iteration is a convenience wrapper around `Iterator`
// that allows for a given handler to be invoked exactly
// once for every key value pair iterated.
type Iteration interface {
ForEach(func([]byte, []byte) error) error
ForEach(func(*KVEntry) error) error
}

type iteration struct {
kvs KVStore
opts *iterOpts
}

func (iter *iteration) ForEach(hndlr func([]byte, []byte) error) error {
func (iter *iteration) ForEach(hndlr func(*KVEntry) error) error {
if err := iter.opts.validate(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion internal/storage/iterators/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (ci *concatenatedIterator) HasNext() bool {
return valid
}

func (ci *concatenatedIterator) Next() ([]byte, []byte) {
func (ci *concatenatedIterator) Next() *storage.KVEntry {
return ci.iterators[ci.currentIter].Next()
}

Expand Down
9 changes: 5 additions & 4 deletions internal/storage/iterators/iterator_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package iterators

import (
"github.com/flipkart-incubator/dkv/internal/storage"
"testing"
)

Expand All @@ -14,10 +15,10 @@ func (si *simpleIterator) HasNext() bool {
return si.currentPos < len(si.data)
}

func (si *simpleIterator) Next() ([]byte, []byte) {
func (si *simpleIterator) Next() *storage.KVEntry {
d := si.data[si.currentPos]
si.currentPos++
return []byte(d), []byte(d)
return &storage.KVEntry{Key: []byte(d), Value: []byte(d)}
}

func (si *simpleIterator) Err() error {
Expand Down Expand Up @@ -45,8 +46,8 @@ func TestIterationConcat(t *testing.T) {
count := 0

for iter3.HasNext() {
k, _ := iter3.Next()
kS := string(k)
entry := iter3.Next()
kS := string(entry.Key)
aI := all[count]

if aI != kS {
Expand Down
Loading

0 comments on commit 4cf243a

Please sign in to comment.