From c317df1af201eb070287fbe1819f75f8182a9dd3 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Sun, 10 Nov 2024 08:38:06 +0800 Subject: [PATCH] feat(writer): support InsertIfAbsent functionality Ensures documents are only inserted if their docIDs are not already present in the current index Signed-off-by: Gao Hongtao --- index/batch.go | 9 +++ index/writer.go | 55 ++++++++++++++--- index/writer_test.go | 136 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 192 insertions(+), 8 deletions(-) diff --git a/index/batch.go b/index/batch.go index b17d484..ab26188 100644 --- a/index/batch.go +++ b/index/batch.go @@ -19,6 +19,8 @@ import segment "github.com/blugelabs/bluge_segment_api" type Batch struct { documents []segment.Document ids []segment.Term + unparsedDocuments []segment.Document + unparsedIDs []segment.Term persistedCallback func(error) } @@ -30,6 +32,11 @@ func (b *Batch) Insert(doc segment.Document) { b.documents = append(b.documents, doc) } +func (b *Batch) InsertIfAbsent(id segment.Term, doc segment.Document) { + b.unparsedDocuments = append(b.unparsedDocuments, doc) + b.unparsedIDs = append(b.unparsedIDs, id) +} + func (b *Batch) Update(id segment.Term, doc segment.Document) { b.documents = append(b.documents, doc) b.ids = append(b.ids, id) @@ -43,6 +50,8 @@ func (b *Batch) Reset() { b.documents = b.documents[:0] b.ids = b.ids[:0] b.persistedCallback = nil + b.unparsedDocuments = b.unparsedDocuments[:0] + b.unparsedIDs = b.unparsedIDs[:0] } func (b *Batch) SetPersistedCallback(f func(error)) { diff --git a/index/writer.go b/index/writer.go index c850dc2..cc82d51 100644 --- a/index/writer.go +++ b/index/writer.go @@ -224,6 +224,15 @@ func (s *Writer) close() (err error) { // Batch applies a batch of changes to the index atomically func (s *Writer) Batch(batch *Batch) (err error) { + if len(batch.unparsedIDs) > 0 { + if err := s.removeExistingDocuments(batch); err != nil { + return err + } + if len(batch.documents) == 0 { + return nil + } + } + start := time.Now() defer func() { @@ -287,6 +296,34 @@ func (s *Writer) Batch(batch *Batch) (err error) { return err } +func (s *Writer) removeExistingDocuments(batch *Batch) error { + root := s.currentSnapshot() + defer func() { _ = root.Close() }() + + for _, seg := range root.segment { + dict, err := seg.segment.Dictionary(batch.unparsedIDs[0].Field()) + if err != nil { + return err + } + + for i := 0; i < len(batch.unparsedIDs); i++ { + if ok, _ := dict.Contains(batch.unparsedIDs[i].Term()); ok { + batch.unparsedDocuments = append(batch.unparsedDocuments[:i], batch.unparsedDocuments[i+1:]...) + batch.unparsedIDs = append(batch.unparsedIDs[:i], batch.unparsedIDs[i+1:]...) + i-- + if len(batch.unparsedDocuments) == 0 { + return nil + } + } + } + } + if len(batch.unparsedDocuments) > 0 { + batch.documents = append(batch.documents, batch.unparsedDocuments...) + batch.ids = append(batch.ids, batch.unparsedIDs...) + } + return nil +} + func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Term, internalOps map[string][]byte, persistedCallback func(error)) error { // new introduction @@ -304,16 +341,18 @@ func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Te introduction.persisted = make(chan error, 1) } - // optimistically prepare obsoletes outside of rootLock - root := s.currentSnapshot() - defer func() { _ = root.Close() }() + if len(idTerms) > 0 { + // optimistically prepare obsoletes outside of rootLock + root := s.currentSnapshot() + defer func() { _ = root.Close() }() - for _, seg := range root.segment { - delta, err := seg.segment.DocsMatchingTerms(idTerms) - if err != nil { - return err + for _, seg := range root.segment { + delta, err := seg.segment.DocsMatchingTerms(idTerms) + if err != nil { + return err + } + introduction.obsoletes[seg.id] = delta } - introduction.obsoletes[seg.id] = delta } introStartTime := time.Now() diff --git a/index/writer_test.go b/index/writer_test.go index c52f308..dd42cd7 100644 --- a/index/writer_test.go +++ b/index/writer_test.go @@ -1631,3 +1631,139 @@ func TestIndexSeekBackwardsStats(t *testing.T) { idx.stats.TotTermSearchersFinished) } } + +func TestBatch_InsertIfAbsent(t *testing.T) { + cfg, cleanup := CreateConfig("TestBatch_InsertIfAbsent") + defer func() { + err := cleanup() + if err != nil { + t.Log(err) + } + }() + + idx, err := OpenWriter(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + + // Verify initial document count is zero + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // Insert a document using InsertIfAbsent + docID := "doc-1" + doc := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister", false, false, true), + } + batch := NewBatch() + batch.InsertIfAbsent(testIdentifier(docID), doc) + + // Apply the batch + if err := idx.Batch(batch); err != nil { + t.Fatalf("failed to apply batch: %v", err) + } + expectedCount++ + + // Verify document count after insertion + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // Attempt to InsertIfAbsent with the same ID + docDuplicate := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister2", true, false, true), + } + batchDuplicate := NewBatch() + batchDuplicate.InsertIfAbsent(testIdentifier(docID), docDuplicate) + + // Apply the duplicate batch + if err := idx.Batch(batchDuplicate); err != nil { + t.Fatalf("failed to apply duplicate batch: %v", err) + } + + // Since it's InsertIfAbsent, the document should not be duplicated + // Verify document count remains the same + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d after duplicate insert, got %d", expectedCount, docCount) + } + + docNum1, err := findNumberByID(reader, docID) + if err != nil { + t.Fatal(err) + } + + dvr, err := reader.DocumentValueReader([]string{"title"}) + if err != nil { + t.Fatal(err) + } + err = dvr.VisitDocumentValues(docNum1, func(field string, term []byte) { + if field == "title" { + if string(term) != "mister" { + t.Errorf("expected title to be 'First Document', got '%s'", string(term)) + } + } + }) + if err != nil { + t.Fatal(err) + } + + err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool { + if field == "title" { + if string(value) != "mister" { + t.Errorf("expected title to be 'mister', got '%s'", string(value)) + } + } + return true + }) + if err != nil { + t.Fatal(err) + } + + err = reader.Close() + if err != nil { + t.Fatal(err) + } +}