Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(i): Rewrite document fetcher #3279

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *collection) newFetcher() fetcher.Fetcher {
if c.fetcherFactory != nil {
innerFetcher = c.fetcherFactory()
} else {
innerFetcher = new(fetcher.DocumentFetcher)
innerFetcher = fetcher.NewDocumentFetcher()
}

return lens.NewFetcher(innerFetcher, c.db.LensRegistry())
Expand Down
2 changes: 1 addition & 1 deletion internal/db/collection_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (c *collection) get(
// create a new document fetcher
df := c.newFetcher()
// initialize it with the primary index
err := df.Init(ctx, identity.FromContext(ctx), txn, c.db.acp, c, fields, nil, nil, false, showDeleted)
err := df.Init(ctx, identity.FromContext(ctx), txn, c.db.acp, c, fields, nil, nil, showDeleted)
if err != nil {
_ = df.Close()
return nil, err
Expand Down
1 change: 0 additions & 1 deletion internal/db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ func (c *collection) iterateAllDocs(
nil,
nil,
false,
false,
)
if err != nil {
return errors.Join(err, df.Close())
Expand Down
95 changes: 95 additions & 0 deletions internal/db/fetcher/deleted.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"errors"

"github.com/sourcenetwork/immutable"
)

// deleted is a fetcher that orchastrates the fetching of deleted and active documents.
type deleted struct {
Comment on lines +19 to +20
Copy link
Member

@shahzadlone shahzadlone Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Why not rename to deletedFetcher, I know the package name is fetcher but still prefer the more descriptive name especially as you also have newDeletedFetcher function.

Suggested change
// deleted is a fetcher that orchastrates the fetching of deleted and active documents.
type deleted struct {
// deleted is a fetcher that orchastrates the fetching of deleted and active documents.
type deletedFetcher struct {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike fetcher.deletedFetcher now, and I would have expected @fredcarle to complain if I had named it so. Happy to rename if the majority want to though, otherwise leaving as-is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fred and I have rather similar views on this kind of naming stuff and although I'm not speaking for him, I do prefer the explicitness of deletedFetcher here. deleted is just too generic of a term, same with document which given the context of the package could just as easily refer to an actual document.

As for the import/package full naming (ie: fetcher.deletedFetcher) this is also OK and not against any language idioms or best practises, because the alternative (fetcher.deleted) is not a clear enough naming scheme (ignoring the fact that this isnt a public type).

Copy link
Member

@shahzadlone shahzadlone Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dislike fetcher.deletedFetcher now, and I would have expected @fredcarle to complain if I had named it so. Happy to rename if the majority want to though, otherwise leaving as-is.

Why newDeletedFetcher over newDeleted then also haha? The only reason deleted is easy to understand for us is because we have context. A new developer just stumbling on this file even though the package is called fetcher might not grasp what a general term like deleted is referring to here, let alone what a "deleted fetcher" is.

I do understand that this is subjective so happy to follow consensus. I do think I am bias to longer and descriptive names haha my ideal would definitely be deletedAndActiveDocumentFetcher or activeDocumentAndDeletedFetcher for this and activeDocumentFetecher for the other one that is currently document

activeFetcher fetcher
activeDocID immutable.Option[string]

deletedFetcher fetcher
deletedDocID immutable.Option[string]

currentFetcher fetcher
}

var _ fetcher = (*deleted)(nil)

func newDeletedFetcher(
activeFetcher fetcher,
deletedFetcher fetcher,
) *deleted {
return &deleted{
activeFetcher: activeFetcher,
deletedFetcher: deletedFetcher,
}
}

func (f *deleted) NextDoc() (immutable.Option[string], error) {
if !f.activeDocID.HasValue() {
var err error
f.activeDocID, err = f.activeFetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}

Check warning on line 48 in internal/db/fetcher/deleted.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/deleted.go#L47-L48

Added lines #L47 - L48 were not covered by tests
}

if !f.deletedDocID.HasValue() {
var err error
f.deletedDocID, err = f.deletedFetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}

Check warning on line 56 in internal/db/fetcher/deleted.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/deleted.go#L55-L56

Added lines #L55 - L56 were not covered by tests
}

if !f.activeDocID.HasValue() || (f.deletedDocID.HasValue() && f.deletedDocID.Value() < f.activeDocID.Value()) {
f.currentFetcher = f.deletedFetcher
return f.deletedDocID, nil
}

f.currentFetcher = f.activeFetcher
return f.activeDocID, nil
}

func (f *deleted) GetFields() (immutable.Option[EncodedDocument], error) {
doc, err := f.currentFetcher.GetFields()
if err != nil {
return immutable.None[EncodedDocument](), err
}

Check warning on line 72 in internal/db/fetcher/deleted.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/deleted.go#L71-L72

Added lines #L71 - L72 were not covered by tests

if f.activeFetcher == f.currentFetcher {
f.activeDocID = immutable.None[string]()
} else {
f.deletedDocID = immutable.None[string]()
}

return doc, nil
}

func (f *deleted) Close() error {
activeErr := f.activeFetcher.Close()
if activeErr != nil {
deletedErr := f.deletedFetcher.Close()
if deletedErr != nil {
return errors.Join(activeErr, deletedErr)
}

Check warning on line 89 in internal/db/fetcher/deleted.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/deleted.go#L86-L89

Added lines #L86 - L89 were not covered by tests

return activeErr

Check warning on line 91 in internal/db/fetcher/deleted.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/deleted.go#L91

Added line #L91 was not covered by tests
}

return f.deletedFetcher.Close()
}
197 changes: 197 additions & 0 deletions internal/db/fetcher/document.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"bytes"
"context"

dsq "github.com/ipfs/go-datastore/query"
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/iterable"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

// document is the type responsible for fetching documents from the datastore.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question/suggestion: Is this only for active documents? or can work for both?

Perhaps this suggestion if only for active:

Suggested change
// document is the type responsible for fetching documents from the datastore.
// document is the type responsible for fetching only active documents from the datastore.

//
// It does not filter the data in any way.
type document struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Similar renaming suggestion as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same answer as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as above, I don't think document is the best naming here, too generic/general and could be interpreted in a myriad of ways (which I initially did).

In addition to my above comment, from a purely gramatical POV, the fetcher is a verb indicating action of some kind, which makes sense since the fetchers are a system of pulling and processing key values into a consistent output form. Where as document is a noun and indicates a static object. (This grammar argument isn't my primary concern with the naming, just an observation)

// The set of fields to fetch, mapped by field ID.
fieldsByID map[uint32]client.FieldDefinition
// The status to assign fetched documents.
status client.DocumentStatus
// Statistics on the actions of this instance.
execInfo *ExecInfo
// The iterable results that documents will be fetched from.
kvResultsIter dsq.Results

// The most recently yielded item from kvResultsIter.
currentKV keyValue
// nextKV may hold a datastore key value retrieved from kvResultsIter
// that was not yet ready to be yielded from the instance.
//
// When the next document is requested, this value should be yielded
// before resuming iteration through the kvResultsIter.
nextKV immutable.Option[keyValue]
}

var _ fetcher = (*document)(nil)

func newDocumentFetcher(
ctx context.Context,
fieldsByID map[uint32]client.FieldDefinition,
kvIter iterable.Iterator,
prefix keys.DataStoreKey,
status client.DocumentStatus,
execInfo *ExecInfo,
) (*document, error) {
if status == client.Active {
prefix = prefix.WithValueFlag()
} else if status == client.Deleted {
prefix = prefix.WithDeletedFlag()
}

kvResultsIter, err := kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS())
if err != nil {
return nil, err
}

Check warning on line 68 in internal/db/fetcher/document.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/document.go#L67-L68

Added lines #L67 - L68 were not covered by tests

return &document{
fieldsByID: fieldsByID,
kvResultsIter: kvResultsIter,
status: status,
execInfo: execInfo,
}, nil
}

// keyValue is a KV store response containing the resulting core.DataStoreKey and byte array value.
type keyValue struct {
Key keys.DataStoreKey
Value []byte
}

func (f *document) NextDoc() (immutable.Option[string], error) {
if f.nextKV.HasValue() {
docID := f.nextKV.Value().Key.DocID
f.currentKV = f.nextKV.Value()

f.nextKV = immutable.None[keyValue]()
f.execInfo.DocsFetched++

return immutable.Some(docID), nil
}

for {
res, ok := f.kvResultsIter.NextSync()
if res.Error != nil {
return immutable.None[string](), res.Error
}

Check warning on line 99 in internal/db/fetcher/document.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/document.go#L98-L99

Added lines #L98 - L99 were not covered by tests
if !ok {
return immutable.None[string](), nil
}

dsKey, err := keys.NewDataStoreKey(res.Key)
if err != nil {
return immutable.None[string](), err
}

Check warning on line 107 in internal/db/fetcher/document.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/document.go#L106-L107

Added lines #L106 - L107 were not covered by tests

if dsKey.DocID != f.currentKV.Key.DocID {
f.currentKV = keyValue{
Key: dsKey,
Value: res.Value,
}
break
}
}

f.execInfo.DocsFetched++

return immutable.Some(f.currentKV.Key.DocID), nil
}

func (f *document) GetFields() (immutable.Option[EncodedDocument], error) {
doc := encodedDocument{}
doc.id = []byte(f.currentKV.Key.DocID)
doc.status = f.status
doc.properties = map[client.FieldDefinition]*encProperty{}

err := f.appendKv(&doc, f.currentKV)
if err != nil {
return immutable.None[EncodedDocument](), err
}

Check warning on line 132 in internal/db/fetcher/document.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/document.go#L131-L132

Added lines #L131 - L132 were not covered by tests

for {
res, ok := f.kvResultsIter.NextSync()
if !ok {
break
}

dsKey, err := keys.NewDataStoreKey(res.Key)
if err != nil {
return immutable.None[EncodedDocument](), err
}

Check warning on line 143 in internal/db/fetcher/document.go

View check run for this annotation

Codecov / codecov/patch

internal/db/fetcher/document.go#L142-L143

Added lines #L142 - L143 were not covered by tests

kv := keyValue{
Key: dsKey,
Value: res.Value,
}

if dsKey.DocID != f.currentKV.Key.DocID {
f.nextKV = immutable.Some(kv)
break
}

err = f.appendKv(&doc, kv)
if err != nil {
return immutable.None[EncodedDocument](), err
}
}

return immutable.Some[EncodedDocument](&doc), nil
}

func (f *document) appendKv(doc *encodedDocument, kv keyValue) error {
if kv.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID {
doc.schemaVersionID = string(kv.Value)
return nil
}

// we have to skip the object marker
if bytes.Equal(kv.Value, []byte{base.ObjectMarker}) {
return nil
}

fieldID, err := kv.Key.FieldIDAsUint()
if err != nil {
return err
}

fieldDesc, ok := f.fieldsByID[fieldID]
if !ok {
return nil
}

f.execInfo.FieldsFetched++

doc.properties[fieldDesc] = &encProperty{
Desc: fieldDesc,
Raw: kv.Value,
}

return nil
}

func (f *document) Close() error {
return f.kvResultsIter.Close()
}
Loading
Loading