-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(i): Rewrite document fetcher #3279
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
// Copyright 2024 Democratized Data Foundation | ||
// | ||
// Use of this software is governed by the Business Source License | ||
// included in the file licenses/BSL.txt. | ||
// | ||
// As of the Change Date specified in that file, in accordance with | ||
// the Business Source License, use of this software will be governed | ||
// by the Apache License, Version 2.0, included in the file | ||
// licenses/APL.txt. | ||
|
||
package fetcher | ||
|
||
import ( | ||
"errors" | ||
|
||
"github.com/sourcenetwork/immutable" | ||
) | ||
|
||
// deleted is a fetcher that orchastrates the fetching of deleted and active documents. | ||
type deleted struct { | ||
activeFetcher fetcher | ||
activeDocID immutable.Option[string] | ||
|
||
deletedFetcher fetcher | ||
deletedDocID immutable.Option[string] | ||
|
||
currentFetcher fetcher | ||
} | ||
|
||
var _ fetcher = (*deleted)(nil) | ||
|
||
func newDeletedFetcher( | ||
activeFetcher fetcher, | ||
deletedFetcher fetcher, | ||
) *deleted { | ||
return &deleted{ | ||
activeFetcher: activeFetcher, | ||
deletedFetcher: deletedFetcher, | ||
} | ||
} | ||
|
||
func (f *deleted) NextDoc() (immutable.Option[string], error) { | ||
if !f.activeDocID.HasValue() { | ||
var err error | ||
f.activeDocID, err = f.activeFetcher.NextDoc() | ||
if err != nil { | ||
return immutable.None[string](), err | ||
} | ||
} | ||
|
||
if !f.deletedDocID.HasValue() { | ||
var err error | ||
f.deletedDocID, err = f.deletedFetcher.NextDoc() | ||
if err != nil { | ||
return immutable.None[string](), err | ||
} | ||
} | ||
|
||
if !f.activeDocID.HasValue() || (f.deletedDocID.HasValue() && f.deletedDocID.Value() < f.activeDocID.Value()) { | ||
f.currentFetcher = f.deletedFetcher | ||
return f.deletedDocID, nil | ||
} | ||
|
||
f.currentFetcher = f.activeFetcher | ||
return f.activeDocID, nil | ||
} | ||
|
||
func (f *deleted) GetFields() (immutable.Option[EncodedDocument], error) { | ||
doc, err := f.currentFetcher.GetFields() | ||
if err != nil { | ||
return immutable.None[EncodedDocument](), err | ||
} | ||
|
||
if f.activeFetcher == f.currentFetcher { | ||
f.activeDocID = immutable.None[string]() | ||
} else { | ||
f.deletedDocID = immutable.None[string]() | ||
} | ||
|
||
return doc, nil | ||
} | ||
|
||
func (f *deleted) Close() error { | ||
activeErr := f.activeFetcher.Close() | ||
if activeErr != nil { | ||
deletedErr := f.deletedFetcher.Close() | ||
if deletedErr != nil { | ||
return errors.Join(activeErr, deletedErr) | ||
} | ||
|
||
return activeErr | ||
} | ||
|
||
return f.deletedFetcher.Close() | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,197 @@ | ||||||
// Copyright 2024 Democratized Data Foundation | ||||||
// | ||||||
// Use of this software is governed by the Business Source License | ||||||
// included in the file licenses/BSL.txt. | ||||||
// | ||||||
// As of the Change Date specified in that file, in accordance with | ||||||
// the Business Source License, use of this software will be governed | ||||||
// by the Apache License, Version 2.0, included in the file | ||||||
// licenses/APL.txt. | ||||||
|
||||||
package fetcher | ||||||
|
||||||
import ( | ||||||
"bytes" | ||||||
"context" | ||||||
|
||||||
dsq "github.com/ipfs/go-datastore/query" | ||||||
"github.com/sourcenetwork/immutable" | ||||||
|
||||||
"github.com/sourcenetwork/defradb/client" | ||||||
"github.com/sourcenetwork/defradb/datastore/iterable" | ||||||
"github.com/sourcenetwork/defradb/internal/db/base" | ||||||
"github.com/sourcenetwork/defradb/internal/keys" | ||||||
) | ||||||
|
||||||
// document is the type responsible for fetching documents from the datastore. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question/suggestion: Is this only for active documents? or can work for both? Perhaps this suggestion if only for active:
Suggested change
|
||||||
// | ||||||
// It does not filter the data in any way. | ||||||
type document struct { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. suggestion: Similar renaming suggestion as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same answer as above There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as above, I don't think In addition to my above comment, from a purely gramatical POV, the |
||||||
// The set of fields to fetch, mapped by field ID. | ||||||
fieldsByID map[uint32]client.FieldDefinition | ||||||
// The status to assign fetched documents. | ||||||
status client.DocumentStatus | ||||||
// Statistics on the actions of this instance. | ||||||
execInfo *ExecInfo | ||||||
// The iterable results that documents will be fetched from. | ||||||
kvResultsIter dsq.Results | ||||||
|
||||||
// The most recently yielded item from kvResultsIter. | ||||||
currentKV keyValue | ||||||
// nextKV may hold a datastore key value retrieved from kvResultsIter | ||||||
// that was not yet ready to be yielded from the instance. | ||||||
// | ||||||
// When the next document is requested, this value should be yielded | ||||||
// before resuming iteration through the kvResultsIter. | ||||||
nextKV immutable.Option[keyValue] | ||||||
} | ||||||
|
||||||
var _ fetcher = (*document)(nil) | ||||||
|
||||||
func newDocumentFetcher( | ||||||
ctx context.Context, | ||||||
fieldsByID map[uint32]client.FieldDefinition, | ||||||
kvIter iterable.Iterator, | ||||||
prefix keys.DataStoreKey, | ||||||
status client.DocumentStatus, | ||||||
execInfo *ExecInfo, | ||||||
) (*document, error) { | ||||||
if status == client.Active { | ||||||
prefix = prefix.WithValueFlag() | ||||||
} else if status == client.Deleted { | ||||||
prefix = prefix.WithDeletedFlag() | ||||||
} | ||||||
|
||||||
kvResultsIter, err := kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS()) | ||||||
if err != nil { | ||||||
return nil, err | ||||||
} | ||||||
|
||||||
return &document{ | ||||||
fieldsByID: fieldsByID, | ||||||
kvResultsIter: kvResultsIter, | ||||||
status: status, | ||||||
execInfo: execInfo, | ||||||
}, nil | ||||||
} | ||||||
|
||||||
// keyValue is a KV store response containing the resulting core.DataStoreKey and byte array value. | ||||||
type keyValue struct { | ||||||
Key keys.DataStoreKey | ||||||
Value []byte | ||||||
} | ||||||
|
||||||
func (f *document) NextDoc() (immutable.Option[string], error) { | ||||||
if f.nextKV.HasValue() { | ||||||
docID := f.nextKV.Value().Key.DocID | ||||||
f.currentKV = f.nextKV.Value() | ||||||
|
||||||
f.nextKV = immutable.None[keyValue]() | ||||||
f.execInfo.DocsFetched++ | ||||||
|
||||||
return immutable.Some(docID), nil | ||||||
} | ||||||
|
||||||
for { | ||||||
res, ok := f.kvResultsIter.NextSync() | ||||||
if res.Error != nil { | ||||||
return immutable.None[string](), res.Error | ||||||
} | ||||||
if !ok { | ||||||
return immutable.None[string](), nil | ||||||
} | ||||||
|
||||||
dsKey, err := keys.NewDataStoreKey(res.Key) | ||||||
if err != nil { | ||||||
return immutable.None[string](), err | ||||||
} | ||||||
|
||||||
if dsKey.DocID != f.currentKV.Key.DocID { | ||||||
f.currentKV = keyValue{ | ||||||
Key: dsKey, | ||||||
Value: res.Value, | ||||||
} | ||||||
break | ||||||
} | ||||||
} | ||||||
|
||||||
f.execInfo.DocsFetched++ | ||||||
|
||||||
return immutable.Some(f.currentKV.Key.DocID), nil | ||||||
} | ||||||
|
||||||
func (f *document) GetFields() (immutable.Option[EncodedDocument], error) { | ||||||
doc := encodedDocument{} | ||||||
doc.id = []byte(f.currentKV.Key.DocID) | ||||||
doc.status = f.status | ||||||
doc.properties = map[client.FieldDefinition]*encProperty{} | ||||||
|
||||||
err := f.appendKv(&doc, f.currentKV) | ||||||
if err != nil { | ||||||
return immutable.None[EncodedDocument](), err | ||||||
} | ||||||
|
||||||
for { | ||||||
res, ok := f.kvResultsIter.NextSync() | ||||||
if !ok { | ||||||
break | ||||||
} | ||||||
|
||||||
dsKey, err := keys.NewDataStoreKey(res.Key) | ||||||
if err != nil { | ||||||
return immutable.None[EncodedDocument](), err | ||||||
} | ||||||
|
||||||
kv := keyValue{ | ||||||
Key: dsKey, | ||||||
Value: res.Value, | ||||||
} | ||||||
|
||||||
if dsKey.DocID != f.currentKV.Key.DocID { | ||||||
f.nextKV = immutable.Some(kv) | ||||||
break | ||||||
} | ||||||
|
||||||
err = f.appendKv(&doc, kv) | ||||||
if err != nil { | ||||||
return immutable.None[EncodedDocument](), err | ||||||
} | ||||||
} | ||||||
|
||||||
return immutable.Some[EncodedDocument](&doc), nil | ||||||
} | ||||||
|
||||||
func (f *document) appendKv(doc *encodedDocument, kv keyValue) error { | ||||||
if kv.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID { | ||||||
doc.schemaVersionID = string(kv.Value) | ||||||
return nil | ||||||
} | ||||||
|
||||||
// we have to skip the object marker | ||||||
if bytes.Equal(kv.Value, []byte{base.ObjectMarker}) { | ||||||
return nil | ||||||
} | ||||||
|
||||||
fieldID, err := kv.Key.FieldIDAsUint() | ||||||
if err != nil { | ||||||
return err | ||||||
} | ||||||
|
||||||
fieldDesc, ok := f.fieldsByID[fieldID] | ||||||
if !ok { | ||||||
return nil | ||||||
} | ||||||
|
||||||
f.execInfo.FieldsFetched++ | ||||||
|
||||||
doc.properties[fieldDesc] = &encProperty{ | ||||||
Desc: fieldDesc, | ||||||
Raw: kv.Value, | ||||||
} | ||||||
|
||||||
return nil | ||||||
} | ||||||
|
||||||
func (f *document) Close() error { | ||||||
return f.kvResultsIter.Close() | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Why not rename to
deletedFetcher
, I know the package name isfetcher
but still prefer the more descriptive name especially as you also havenewDeletedFetcher
function.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dislike
fetcher.deletedFetcher
now, and I would have expected @fredcarle to complain if I had named it so. Happy to rename if the majority want to though, otherwise leaving as-is.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fred and I have rather similar views on this kind of naming stuff and although I'm not speaking for him, I do prefer the explicitness of
deletedFetcher
here.deleted
is just too generic of a term, same withdocument
which given the context of the package could just as easily refer to an actual document.As for the import/package full naming (ie:
fetcher.deletedFetcher
) this is also OK and not against any language idioms or best practises, because the alternative (fetcher.deleted
) is not a clear enough naming scheme (ignoring the fact that this isnt a public type).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why
newDeletedFetcher
overnewDeleted
then also haha? The only reasondeleted
is easy to understand for us is because we have context. A new developer just stumbling on this file even though the package is calledfetcher
might not grasp what a general term likedeleted
is referring to here, let alone what a "deleted fetcher" is.I do understand that this is subjective so happy to follow consensus. I do think I am bias to longer and descriptive names haha my ideal would definitely be
deletedAndActiveDocumentFetcher
oractiveDocumentAndDeletedFetcher
for this andactiveDocumentFetecher
for the other one that is currentlydocument