diff --git a/index/batch.go b/index/batch.go index b17d484..e762a15 100644 --- a/index/batch.go +++ b/index/batch.go @@ -19,6 +19,7 @@ import segment "github.com/blugelabs/bluge_segment_api" type Batch struct { documents []segment.Document ids []segment.Term + insertIfAbsent bool persistedCallback func(error) } @@ -30,6 +31,12 @@ func (b *Batch) Insert(doc segment.Document) { b.documents = append(b.documents, doc) } +func (b *Batch) InsertIfAbsent(id segment.Term, doc segment.Document) { + b.documents = append(b.documents, doc) + b.ids = append(b.ids, id) + b.insertIfAbsent = true +} + func (b *Batch) Update(id segment.Term, doc segment.Document) { b.documents = append(b.documents, doc) b.ids = append(b.ids, id) diff --git a/index/writer.go b/index/writer.go index c850dc2..abf19a5 100644 --- a/index/writer.go +++ b/index/writer.go @@ -224,6 +224,15 @@ func (s *Writer) close() (err error) { // Batch applies a batch of changes to the index atomically func (s *Writer) Batch(batch *Batch) (err error) { + if batch.insertIfAbsent { + if err := s.removeExistingDocuments(batch); err != nil { + return err + } + if len(batch.documents) == 0 { + return nil + } + } + start := time.Now() defer func() { @@ -287,6 +296,31 @@ func (s *Writer) Batch(batch *Batch) (err error) { return err } +func (s *Writer) removeExistingDocuments(batch *Batch) error { + root := s.currentSnapshot() + defer func() { _ = root.Close() }() + + for _, seg := range root.segment { + dict, err := seg.segment.Dictionary(batch.ids[0].Field()) + if err != nil { + return err + } + + for i := 0; i < len(batch.ids); i++ { + if ok, _ := dict.Contains(batch.ids[i].Term()); ok { + batch.documents = append(batch.documents[:i], batch.documents[i+1:]...) + batch.ids = append(batch.ids[:i], batch.ids[i+1:]...) + i-- + if len(batch.documents) == 0 { + return nil + } + } + } + } + batch.ids = batch.ids[:0] + return nil +} + func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Term, internalOps map[string][]byte, persistedCallback func(error)) error { // new introduction @@ -304,16 +338,18 @@ func (s *Writer) prepareSegment(newSegment *segmentWrapper, idTerms []segment.Te introduction.persisted = make(chan error, 1) } - // optimistically prepare obsoletes outside of rootLock - root := s.currentSnapshot() - defer func() { _ = root.Close() }() + if len(idTerms) > 0 { + // optimistically prepare obsoletes outside of rootLock + root := s.currentSnapshot() + defer func() { _ = root.Close() }() - for _, seg := range root.segment { - delta, err := seg.segment.DocsMatchingTerms(idTerms) - if err != nil { - return err + for _, seg := range root.segment { + delta, err := seg.segment.DocsMatchingTerms(idTerms) + if err != nil { + return err + } + introduction.obsoletes[seg.id] = delta } - introduction.obsoletes[seg.id] = delta } introStartTime := time.Now() diff --git a/index/writer_test.go b/index/writer_test.go index c52f308..dd42cd7 100644 --- a/index/writer_test.go +++ b/index/writer_test.go @@ -1631,3 +1631,139 @@ func TestIndexSeekBackwardsStats(t *testing.T) { idx.stats.TotTermSearchersFinished) } } + +func TestBatch_InsertIfAbsent(t *testing.T) { + cfg, cleanup := CreateConfig("TestBatch_InsertIfAbsent") + defer func() { + err := cleanup() + if err != nil { + t.Log(err) + } + }() + + idx, err := OpenWriter(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + + // Verify initial document count is zero + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // Insert a document using InsertIfAbsent + docID := "doc-1" + doc := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister", false, false, true), + } + batch := NewBatch() + batch.InsertIfAbsent(testIdentifier(docID), doc) + + // Apply the batch + if err := idx.Batch(batch); err != nil { + t.Fatalf("failed to apply batch: %v", err) + } + expectedCount++ + + // Verify document count after insertion + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } + + // Attempt to InsertIfAbsent with the same ID + docDuplicate := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister2", true, false, true), + } + batchDuplicate := NewBatch() + batchDuplicate.InsertIfAbsent(testIdentifier(docID), docDuplicate) + + // Apply the duplicate batch + if err := idx.Batch(batchDuplicate); err != nil { + t.Fatalf("failed to apply duplicate batch: %v", err) + } + + // Since it's InsertIfAbsent, the document should not be duplicated + // Verify document count remains the same + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d after duplicate insert, got %d", expectedCount, docCount) + } + + docNum1, err := findNumberByID(reader, docID) + if err != nil { + t.Fatal(err) + } + + dvr, err := reader.DocumentValueReader([]string{"title"}) + if err != nil { + t.Fatal(err) + } + err = dvr.VisitDocumentValues(docNum1, func(field string, term []byte) { + if field == "title" { + if string(term) != "mister" { + t.Errorf("expected title to be 'First Document', got '%s'", string(term)) + } + } + }) + if err != nil { + t.Fatal(err) + } + + err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool { + if field == "title" { + if string(value) != "mister" { + t.Errorf("expected title to be 'mister', got '%s'", string(value)) + } + } + return true + }) + if err != nil { + t.Fatal(err) + } + + err = reader.Close() + if err != nil { + t.Fatal(err) + } +}