Skip to content

Commit 27450c1

Browse files
authored
fix(core): fix duplicate mutation entries for count index (#9208) (#9209)
Due to duplicate entries in the count index, sometimes we had wrong count being reported. This wrong count was causing the transcation is too old issue. This diff fixes the duplicate entries fixing the issue.
1 parent 2d1ccb6 commit 27450c1

File tree

10 files changed

+637
-404
lines changed

10 files changed

+637
-404
lines changed

dgraph/cmd/debug/run.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,10 @@ func lookup(db *badger.DB) {
533533
if err != nil {
534534
log.Fatal(err)
535535
}
536-
fmt.Fprintf(&buf, " Length: %d", pl.Length(math.MaxUint64, 0))
536+
pl.RLock()
537+
c, _, _ := pl.GetLength(math.MaxUint64)
538+
pl.RUnlock()
539+
fmt.Fprintf(&buf, " Length: %d", c)
537540

538541
splits := pl.PartSplits()
539542
isMultiPart := len(splits) > 0
@@ -611,6 +614,13 @@ func printKeys(db *badger.DB) {
611614
}
612615

613616
var sz, deltaCount int64
617+
pl, err := posting.GetNew(key, db, opt.readTs)
618+
if err == nil {
619+
pl.RLock()
620+
c, _, _ := pl.GetLength(math.MaxUint64)
621+
fmt.Fprintf(&buf, " countValue: [%d]", c)
622+
pl.RUnlock()
623+
}
614624
LOOP:
615625
for ; itr.ValidForPrefix(prefix); itr.Next() {
616626
item := itr.Item()

posting/index.go

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,18 @@ type countParams struct {
233233
reverse bool
234234
}
235235

236+
// When we want to update count edges, we should set them with OVR instead of SET as SET will mess with count
237+
func shouldAddCountEdge(found bool, edge *pb.DirectedEdge) bool {
238+
if found {
239+
if edge.Op != pb.DirectedEdge_DEL {
240+
edge.Op = pb.DirectedEdge_OVR
241+
}
242+
return true
243+
} else {
244+
return edge.Op != pb.DirectedEdge_DEL
245+
}
246+
}
247+
236248
func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
237249
hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) {
238250
countBefore, countAfter := 0, 0
@@ -242,12 +254,14 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
242254
defer plist.Unlock()
243255
if hasCountIndex {
244256
countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId)
245-
if countBefore == -1 {
257+
if countBefore < 0 {
246258
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
247259
}
248260
}
249-
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
250-
return emptyCountParams, err
261+
if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
262+
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
263+
return emptyCountParams, err
264+
}
251265
}
252266
if hasCountIndex {
253267
countAfter = countAfterMutation(countBefore, found, edge.Op)
@@ -311,7 +325,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
311325
// entries for this key in the index are removed.
312326
pred, ok := schema.State().Get(ctx, t.Attr)
313327
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
314-
t.Op == pb.DirectedEdge_SET && t.ValueId != 0
328+
t.Op != pb.DirectedEdge_DEL && t.ValueId != 0
315329
if isSingleUidUpdate {
316330
dataKey := x.DataKey(t.Attr, t.Entity)
317331
dataList, err := getFn(dataKey)
@@ -458,7 +472,7 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
458472
}
459473

460474
func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
461-
if !found && op == pb.DirectedEdge_SET {
475+
if !found && op != pb.DirectedEdge_DEL {
462476
return countBefore + 1
463477
} else if found && op == pb.DirectedEdge_DEL {
464478
return countBefore - 1
@@ -531,8 +545,10 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
531545
}
532546
}
533547

534-
if err = l.addMutationInternal(ctx, txn, t); err != nil {
535-
return val, found, emptyCountParams, err
548+
if !(hasCountIndex && !shouldAddCountEdge(found && currPost.Op != Del, t)) {
549+
if err = l.addMutationInternal(ctx, txn, t); err != nil {
550+
return val, found, emptyCountParams, err
551+
}
536552
}
537553

538554
if found && doUpdateIndex {
@@ -596,7 +612,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,
596612
return err
597613
}
598614
}
599-
if edge.Op == pb.DirectedEdge_SET {
615+
if edge.Op != pb.DirectedEdge_DEL {
600616
val = types.Val{
601617
Tid: types.TypeID(edge.ValueType),
602618
Value: edge.Value,
@@ -895,15 +911,13 @@ func (r *rebuilder) Run(ctx context.Context) error {
895911
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
896912
var counter uint64 = 1
897913

898-
var txn *Txn
899-
900914
tmpWriter := tmpDB.NewManagedWriteBatch()
901915
stream := pstore.NewStreamAt(r.startTs)
902916
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
903917
stream.Prefix = r.prefix
904918
//TODO We need to create a single transaction irrespective of the type of the predicate
905919
if pred.ValueType == pb.Posting_VFLOAT {
906-
txn = NewTxn(r.startTs)
920+
x.AssertTrue(false)
907921
}
908922
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
909923
// We should return quickly if the context is no longer valid.
@@ -923,44 +937,25 @@ func (r *rebuilder) Run(ctx context.Context) error {
923937
return nil, errors.Wrapf(err, "error reading posting list from disk")
924938
}
925939

926-
// We are using different transactions in each call to KeyToList function. This could
927-
// be a problem for computing reverse count indexes if deltas for same key are added
928-
// in different transactions. Such a case doesn't occur for now.
929-
// TODO: Maybe we can always use txn initialized in rebuilder.Run().
930-
streamTxn := txn
931-
if streamTxn == nil {
932-
streamTxn = NewTxn(r.startTs)
933-
}
934-
edges, err := r.fn(pk.Uid, l, streamTxn)
940+
kvs, err := l.Rollup(nil, r.startTs)
935941
if err != nil {
936942
return nil, err
937943
}
938944

939-
if txn != nil {
940-
kvs := make([]*bpb.KV, 0, len(edges))
941-
for _, edge := range edges {
942-
version := atomic.AddUint64(&counter, 1)
943-
key := x.DataKey(edge.Attr, edge.Entity)
944-
pl, err := txn.GetFromDelta(key)
945-
if err != nil {
946-
return &bpb.KVList{}, nil
947-
}
948-
data := pl.getMutation(r.startTs)
949-
kv := bpb.KV{
950-
Key: x.DataKey(edge.Attr, edge.Entity),
951-
Value: data,
952-
UserMeta: []byte{BitDeltaPosting},
953-
Version: version,
954-
}
955-
kvs = append(kvs, &kv)
956-
}
957-
return &bpb.KVList{Kv: kvs}, nil
945+
for _, kv := range kvs {
946+
version := atomic.AddUint64(&counter, 1)
947+
kv.Version = version
948+
}
949+
950+
streamTxn := NewTxn(r.startTs)
951+
_, err = r.fn(pk.Uid, l, streamTxn)
952+
if err != nil {
953+
return nil, err
958954
}
959955

960956
// Convert data into deltas.
961957
streamTxn.Update()
962958
// txn.cache.Lock() is not required because we are the only one making changes to txn.
963-
kvs := make([]*bpb.KV, 0, len(streamTxn.cache.deltas))
964959
for key, data := range streamTxn.cache.deltas {
965960
version := atomic.AddUint64(&counter, 1)
966961
kv := bpb.KV{

posting/list.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,12 @@ var (
5555
)
5656

5757
const (
58-
// Set means overwrite in mutation layer. It contributes 0 in Length.
58+
// Set means set in mutation layer. It contributes 1 in Length.
5959
Set uint32 = 0x01
6060
// Del means delete in mutation layer. It contributes -1 in Length.
6161
Del uint32 = 0x02
62+
// Ovr means overwrite in mutation layer. It contributes 0 in Length.
63+
Ovr uint32 = 0x03
6264

6365
// BitSchemaPosting signals that the value stores a schema or type.
6466
BitSchemaPosting byte = 0x01
@@ -305,6 +307,8 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting {
305307
switch t.Op {
306308
case pb.DirectedEdge_SET:
307309
op = Set
310+
case pb.DirectedEdge_OVR:
311+
op = Set
308312
case pb.DirectedEdge_DEL:
309313
op = Del
310314
default:
@@ -340,7 +344,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
340344
// Ensure that you either abort the uncommitted postings or commit them before calling me.
341345
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
342346
l.AssertLock()
343-
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)
347+
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)
344348

345349
// If we have a delete all, then we replace the map entry with just one.
346350
if hasDeleteAll(mpost) {
@@ -529,7 +533,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
529533
}
530534
pred, ok := schema.State().Get(ctx, t.Attr)
531535
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
532-
pk.IsData() && mpost.Op == Set && mpost.PostingType == pb.Posting_REF
536+
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF
533537

534538
if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
535539
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
@@ -555,6 +559,10 @@ func (l *List) getPosting(startTs uint64) *pb.PostingList {
555559
return nil
556560
}
557561

562+
func (l *List) GetPosting(startTs uint64) *pb.PostingList {
563+
return l.getPosting(startTs)
564+
}
565+
558566
// getMutation returns a marshaled version of posting list mutation stored internally.
559567
func (l *List) getMutation(startTs uint64) []byte {
560568
l.RLock()
@@ -817,31 +825,40 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
817825
return count == 0, nil
818826
}
819827

828+
func (l *List) GetLength(readTs uint64) (int, bool, *pb.Posting) {
829+
return l.getPostingAndLengthNoSort(readTs, 0, 0)
830+
}
831+
820832
func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
821833
l.AssertRLock()
822834

823835
dec := codec.Decoder{Pack: l.plist.Pack}
824836
uids := dec.Seek(uid, codec.SeekStart)
825837
length := codec.ExactLen(l.plist.Pack)
826838
found := len(uids) > 0 && uids[0] == uid
839+
found_ts := uint64(0)
827840

828841
for _, plist := range l.mutationMap {
829842
for _, mpost := range plist.Postings {
843+
ts := mpost.CommitTs
844+
if mpost.StartTs == readTs {
845+
ts = mpost.StartTs
846+
}
830847
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
831848
if hasDeleteAll(mpost) {
832849
found = false
833850
length = 0
834851
continue
835852
}
836-
if mpost.Uid == uid {
837-
found = (mpost.Op == Set)
853+
if mpost.Uid == uid && found_ts < ts {
854+
found = (mpost.Op != Del)
855+
found_ts = ts
838856
}
839857
if mpost.Op == Set {
840858
length += 1
841-
} else {
859+
} else if mpost.Op == Del {
842860
length -= 1
843861
}
844-
845862
}
846863
}
847864
}

posting/mvcc.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,10 @@ func PostingListCacheEnabled() bool {
555555
return lCache != nil
556556
}
557557

558+
func GetNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
559+
return getNew(key, pstore, readTs)
560+
}
561+
558562
func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
559563
if PostingListCacheEnabled() {
560564
l, ok := lCache.Get(key)

protos/pb.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ message DirectedEdge {
237237
enum Op {
238238
SET = 0;
239239
DEL = 1;
240+
OVR = 2;
240241
}
241242
Op op = 8;
242243
repeated api.Facet facets = 9;

0 commit comments

Comments
 (0)