Skip to content

Commit

Permalink
feat(writer): support InsertIfAbsent functionality
Browse files Browse the repository at this point in the history
Ensures documents are only inserted if their docIDs are not already present in the current index

Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily committed Nov 11, 2024
1 parent 04f5c70 commit c317df1
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 8 deletions.
9 changes: 9 additions & 0 deletions index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import segment "github.com/blugelabs/bluge_segment_api"
type Batch struct {
documents []segment.Document
ids []segment.Term
unparsedDocuments []segment.Document
unparsedIDs []segment.Term
persistedCallback func(error)
}

Expand All @@ -30,6 +32,11 @@ func (b *Batch) Insert(doc segment.Document) {
b.documents = append(b.documents, doc)
}

func (b *Batch) InsertIfAbsent(id segment.Term, doc segment.Document) {
b.unparsedDocuments = append(b.unparsedDocuments, doc)
b.unparsedIDs = append(b.unparsedIDs, id)
}

func (b *Batch) Update(id segment.Term, doc segment.Document) {
b.documents = append(b.documents, doc)
b.ids = append(b.ids, id)
Expand All @@ -43,6 +50,8 @@ func (b *Batch) Reset() {
b.documents = b.documents[:0]
b.ids = b.ids[:0]
b.persistedCallback = nil
b.unparsedDocuments = b.unparsedDocuments[:0]
b.unparsedIDs = b.unparsedIDs[:0]
}

func (b *Batch) SetPersistedCallback(f func(error)) {
Expand Down
55 changes: 47 additions & 8 deletions index/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,15 @@ func (s *Writer) close() (err error) {

// Batch applies a batch of changes to the index atomically
func (s *Writer) Batch(batch *Batch) (err error) {
if len(batch.unparsedIDs) > 0 {
if err := s.removeExistingDocuments(batch); err != nil {
return err
}
if len(batch.documents) == 0 {
return nil
}
}

start := time.Now()

defer func() {
Expand Down Expand Up @@ -287,6 +296,34 @@ func (s *Writer) Batch(batch *Batch) (err error) {
return err
}

func (s *Writer) removeExistingDocuments(batch *Batch) error {
root := s.currentSnapshot()
defer func() { _ = root.Close() }()

for _, seg := range root.segment {
dict, err := seg.segment.Dictionary(batch.unparsedIDs[0].Field())
if err != nil {
return err
}

for i := 0; i < len(batch.unparsedIDs); i++ {
if ok, _ := dict.Contains(batch.unparsedIDs[i].Term()); ok {
batch.unparsedDocuments = append(batch.unparsedDocuments[:i], batch.unparsedDocuments[i+1:]...)
batch.unparsedIDs = append(batch.unparsedIDs[:i], batch.unparsedIDs[i+1:]...)
i--
if len(batch.unparsedDocuments) == 0 {
return nil
}
}
}
}
if len(batch.unparsedDocuments) > 0 {
batch.documents = append(batch.documents, batch.unparsedDocuments...)
batch.ids = append(batch.ids, batch.unparsedIDs...)
}
return nil
}

func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Term,
internalOps map[string][]byte, persistedCallback func(error)) error {
// new introduction
Expand All @@ -304,16 +341,18 @@ func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Te
introduction.persisted = make(chan error, 1)
}

// optimistically prepare obsoletes outside of rootLock
root := s.currentSnapshot()
defer func() { _ = root.Close() }()
if len(idTerms) > 0 {
// optimistically prepare obsoletes outside of rootLock
root := s.currentSnapshot()
defer func() { _ = root.Close() }()

for _, seg := range root.segment {
delta, err := seg.segment.DocsMatchingTerms(idTerms)
if err != nil {
return err
for _, seg := range root.segment {
delta, err := seg.segment.DocsMatchingTerms(idTerms)
if err != nil {
return err
}
introduction.obsoletes[seg.id] = delta
}
introduction.obsoletes[seg.id] = delta
}

introStartTime := time.Now()
Expand Down
136 changes: 136 additions & 0 deletions index/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1631,3 +1631,139 @@ func TestIndexSeekBackwardsStats(t *testing.T) {
idx.stats.TotTermSearchersFinished)
}
}

func TestBatch_InsertIfAbsent(t *testing.T) {
cfg, cleanup := CreateConfig("TestBatch_InsertIfAbsent")
defer func() {
err := cleanup()
if err != nil {
t.Log(err)
}
}()

idx, err := OpenWriter(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
err := idx.Close()
if err != nil {
t.Fatal(err)
}
}()

var expectedCount uint64

// Verify initial document count is zero
reader, err := idx.Reader()
if err != nil {
t.Fatal(err)
}
docCount, err := reader.Count()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}

// Insert a document using InsertIfAbsent
docID := "doc-1"
doc := &FakeDocument{
NewFakeField("_id", docID, true, false, false),
NewFakeField("title", "mister", false, false, true),
}
batch := NewBatch()
batch.InsertIfAbsent(testIdentifier(docID), doc)

// Apply the batch
if err := idx.Batch(batch); err != nil {
t.Fatalf("failed to apply batch: %v", err)
}
expectedCount++

// Verify document count after insertion
reader, err = idx.Reader()
if err != nil {
t.Fatal(err)
}
docCount, err = reader.Count()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d got %d", expectedCount, docCount)
}
err = reader.Close()
if err != nil {
t.Fatal(err)
}

// Attempt to InsertIfAbsent with the same ID
docDuplicate := &FakeDocument{
NewFakeField("_id", docID, true, false, false),
NewFakeField("title", "mister2", true, false, true),
}
batchDuplicate := NewBatch()
batchDuplicate.InsertIfAbsent(testIdentifier(docID), docDuplicate)

// Apply the duplicate batch
if err := idx.Batch(batchDuplicate); err != nil {
t.Fatalf("failed to apply duplicate batch: %v", err)
}

// Since it's InsertIfAbsent, the document should not be duplicated
// Verify document count remains the same
reader, err = idx.Reader()
if err != nil {
t.Fatal(err)
}
docCount, err = reader.Count()
if err != nil {
t.Error(err)
}
if docCount != expectedCount {
t.Errorf("Expected document count to be %d after duplicate insert, got %d", expectedCount, docCount)
}

docNum1, err := findNumberByID(reader, docID)
if err != nil {
t.Fatal(err)
}

dvr, err := reader.DocumentValueReader([]string{"title"})
if err != nil {
t.Fatal(err)
}
err = dvr.VisitDocumentValues(docNum1, func(field string, term []byte) {
if field == "title" {
if string(term) != "mister" {
t.Errorf("expected title to be 'First Document', got '%s'", string(term))
}
}
})
if err != nil {
t.Fatal(err)
}

err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool {
if field == "title" {
if string(value) != "mister" {
t.Errorf("expected title to be 'mister', got '%s'", string(value))
}
}
return true
})
if err != nil {
t.Fatal(err)
}

err = reader.Close()
if err != nil {
t.Fatal(err)
}
}

0 comments on commit c317df1

Please sign in to comment.