Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async processing #1225

Merged
merged 30 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
390a974
fix pre download
Hitenjain14 Aug 23, 2023
18d3427
rmv hash
Hitenjain14 Aug 23, 2023
365093a
Merge branch 'sprint-1.10' of https://github.com/0chain/blobber into …
Hitenjain14 Aug 26, 2023
13def25
add async processing
Hitenjain14 Aug 26, 2023
0ca8109
use range
Hitenjain14 Aug 26, 2023
dc720eb
fix unit test
Hitenjain14 Aug 26, 2023
879915e
fix delete change
Hitenjain14 Aug 26, 2023
a11840f
add logs
Hitenjain14 Aug 26, 2023
b19f899
fix unit test
Hitenjain14 Aug 26, 2023
db15d6e
fix delete cmd
Hitenjain14 Aug 27, 2023
dab13ad
save file ref
Hitenjain14 Aug 27, 2023
7dbd988
add conn timing logs
Hitenjain14 Aug 27, 2023
36e2e36
set default to 32MB
Hitenjain14 Aug 27, 2023
db5bd8a
fix timing log
Hitenjain14 Aug 29, 2023
9ab7487
merge sprint-1.10
Hitenjain14 Aug 30, 2023
4f800e9
add inner lock
Hitenjain14 Aug 30, 2023
e115ebc
fix test
Hitenjain14 Aug 30, 2023
cc0dba9
add ctx to cancel go routine
Hitenjain14 Sep 1, 2023
3fe0347
merge sprint-1
Hitenjain14 Sep 1, 2023
6701ae7
Merge branch 'staging' of https://github.com/0chain/blobber into feat…
Hitenjain14 Sep 2, 2023
6afa6f2
parallel write to file
Hitenjain14 Sep 2, 2023
4a41030
fix connection
Hitenjain14 Sep 2, 2023
b7ad54a
revert storage changes
Hitenjain14 Sep 2, 2023
35e44e4
empty commit
Hitenjain14 Sep 2, 2023
46015ce
Fix phase locking (#1230)
Hitenjain14 Sep 3, 2023
087d6bb
Merge branch 'staging' of https://github.com/0chain/blobber into feat…
Hitenjain14 Sep 4, 2023
6bd5bf6
Merge branch 'sprint-1.11' into feat/async-processing
Hitenjain14 Sep 16, 2023
895ec86
Merge branch 'sprint-1.11' into feat/async-processing
Hitenjain14 Sep 17, 2023
4ea19d8
rmv allocObj from connObj
Hitenjain14 Sep 17, 2023
1c45399
merge sprint-1.11
Hitenjain14 Sep 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
344 changes: 315 additions & 29 deletions code/go/0chain.net/blobbercore/allocation/connection.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package allocation

import (
"context"
"net/http"
"sync"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)
Expand Down Expand Up @@ -57,6 +59,42 @@ type BaseFileChanger struct {
ChunkEndIndex int `json:"chunk_end_index,omitempty"` // end index of chunks. all chunks MUST be uploaded one by one because of CompactMerkleTree
ChunkHash string `json:"chunk_hash,omitempty"`
UploadOffset int64 `json:"upload_offset,omitempty"` // It is next position that new incoming chunk should be append to
PathHash string `json:"-"` // hash of path
}

// swagger:model UploadResult
type UploadResult struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
Hash string `json:"hash"`
ValidationRoot string `json:"validation_root"`
FixedMerkleRoot string `json:"fixed_merkle_root"`

// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length"`
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset"`
IsFinal bool `json:"-"`
}

type FileCommand interface {

// GetExistingFileRef get file ref if it exists
GetExistingFileRef() *reference.Ref

GetPath() string

// IsValidated validate request, and try build ChangeProcesser instance
IsValidated(ctx context.Context, req *http.Request, allocationObj *Allocation, clientID string) error

// ProcessContent flush file to FileStorage
ProcessContent(allocationObj *Allocation) (UploadResult, error)

// ProcessThumbnail flush thumbnail file to FileStorage if it has.
ProcessThumbnail(allocationObj *Allocation) error

// UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation
UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error
}

func (fc *BaseFileChanger) DeleteTempFile() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
fPath := "/new"
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(fPath)
CreateConnectionChange("connection_id", pathHash, alloc)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)
change := &UploadFileChanger{
BaseFileChanger: BaseFileChanger{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(tc.path)
CreateConnectionChange("connection_id", pathHash, alloc)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)

change := &UpdateFileChanger{
Expand Down
24 changes: 4 additions & 20 deletions code/go/0chain.net/blobbercore/blobberhttp/response.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
package blobberhttp

import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)

// swagger:model UploadResult
type UploadResult struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
Hash string `json:"hash"`
ValidationRoot string `json:"validation_root"`
FixedMerkleRoot string `json:"fixed_merkle_root"`

// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length"`
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset"`
}

// swagger:model ConnectionResult
type ConnectionResult struct {
AllocationRoot string `json:"allocation_root"`
Expand All @@ -30,11 +15,10 @@ type ConnectionResult struct {

// swagger:model CommitResult
type CommitResult struct {
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
Changes []*allocation.AllocationChange `json:"-"`
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
//Result []*UploadResult `json:"result"`
}

Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func CopyObjectResponseCreator(r interface{}) *blobbergrpc.CopyObjectResponse {
return nil
}

httpResp, _ := r.(*blobberhttp.UploadResult)
httpResp, _ := r.(*allocation.UploadResult)
return &blobbergrpc.CopyObjectResponse{
Filename: httpResp.Filename,
Size: httpResp.Size,
Expand All @@ -171,7 +171,7 @@ func RenameObjectResponseCreator(r interface{}) *blobbergrpc.RenameObjectRespons
return nil
}

httpResp, _ := r.(*blobberhttp.UploadResult)
httpResp, _ := r.(*allocation.UploadResult)
return &blobbergrpc.RenameObjectResponse{
Filename: httpResp.Filename,
Size: httpResp.Size,
Expand Down Expand Up @@ -209,7 +209,7 @@ func UploadFileResponseCreator(r interface{}) *blobbergrpc.UploadFileResponse {
return nil
}

httpResp, _ := r.(*blobberhttp.UploadResult)
httpResp, _ := r.(*allocation.UploadResult)
return &blobbergrpc.UploadFileResponse{
Filename: httpResp.Filename,
Size: httpResp.Size,
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/convert/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func GetCommitMetaTxnHandlerResponse(response *blobbergrpc.CommitMetaTxnResponse
return result
}

func CopyObjectResponseHandler(copyObjectResponse *blobbergrpc.CopyObjectResponse) *blobberhttp.UploadResult {
return &blobberhttp.UploadResult{
func CopyObjectResponseHandler(copyObjectResponse *blobbergrpc.CopyObjectResponse) *allocation.UploadResult {
return &allocation.UploadResult{
Filename: copyObjectResponse.Filename,
Size: copyObjectResponse.Size,
ValidationRoot: copyObjectResponse.ValidationRoot,
Expand Down
1 change: 0 additions & 1 deletion code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

_, err = io.CopyBuffer(fileData.Hasher, f, buf)
if err != nil {
return nil, common.NewError("file_read_error", err.Error())
Expand Down
22 changes: 1 addition & 21 deletions code/go/0chain.net/blobbercore/handler/file_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,15 @@ import (
"net/http"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
)

// FileCommand execute command for a file operation
type FileCommand interface {

// GetExistingFileRef get file ref if it exists
GetExistingFileRef() *reference.Ref

GetPath() string

// IsValidated validate request, and try build ChangeProcesser instance
IsValidated(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, clientID string) error

// ProcessContent flush file to FileStorage
ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error)

// ProcessThumbnail flush thumbnail file to FileStorage if it has.
ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error

// UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation
UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error
}

// createFileCommand create file command for UPLOAD,UPDATE and DELETE
func createFileCommand(req *http.Request) FileCommand {
func createFileCommand(req *http.Request) allocation.FileCommand {
switch req.Method {
case http.MethodPost:
return &UploadFileCommand{}
Expand Down
38 changes: 26 additions & 12 deletions code/go/0chain.net/blobbercore/handler/file_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"gorm.io/gorm"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)

// DeleteFileCommand command for deleting file
Expand All @@ -20,6 +20,7 @@ type DeleteFileCommand struct {
changeProcessor *allocation.DeleteFileChange
allocationChange *allocation.AllocationChange
path string
connectionID string
}

func (cmd *DeleteFileCommand) GetExistingFileRef() *reference.Ref {
Expand All @@ -43,15 +44,28 @@ func (cmd *DeleteFileCommand) IsValidated(ctx context.Context, req *http.Request

cmd.path = path

connectionID, ok := common.GetField(req, "connection_id")
if !ok {
return common.NewError("invalid_parameters", "Invalid connection id passed")
}
cmd.connectionID = connectionID
var err error
cmd.existingFileRef, err = reference.GetLimitedRefFieldsByPath(ctx, allocationObj.ID, path, []string{"path", "name", "size", "hash", "fixed_merkle_root"})
pathHash := encryption.Hash(path)
err = allocation.GetError(connectionID, pathHash)
if err != nil {
return err
}
lookUpHash := reference.GetReferenceLookup(allocationObj.ID, path)
cmd.existingFileRef, err = reference.GetLimitedRefFieldsByLookupHashWith(ctx, allocationObj.ID, lookUpHash, []string{"path", "name", "size", "hash", "fixed_merkle_root"})
if err != nil {
if errors.Is(gorm.ErrRecordNotFound, err) {
return common.ErrFileWasDeleted
}
return common.NewError("bad_db_operation", err.Error())
}
return nil
allocation.CreateConnectionChange(connectionID, pathHash, allocationObj)

return allocation.SetFinalized(connectionID, pathHash, cmd)
}

// UpdateChange add DeleteFileChange in db
Expand All @@ -62,32 +76,32 @@ func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context, connectionObj *a
}

// ProcessContent flush file to FileStorage
func (cmd *DeleteFileCommand) ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error) {
func (cmd *DeleteFileCommand) ProcessContent(allocationObj *allocation.Allocation) (allocation.UploadResult, error) {
deleteSize := cmd.existingFileRef.Size

cmd.changeProcessor = &allocation.DeleteFileChange{ConnectionID: connectionObj.ID,
AllocationID: connectionObj.AllocationID, Name: cmd.existingFileRef.Name,
connectionID := cmd.connectionID
cmd.changeProcessor = &allocation.DeleteFileChange{ConnectionID: connectionID,
AllocationID: allocationObj.ID, Name: cmd.existingFileRef.Name,
Hash: cmd.existingFileRef.Hash, Path: cmd.existingFileRef.Path, Size: deleteSize}

result := blobberhttp.UploadResult{}
result := allocation.UploadResult{}
result.Filename = cmd.existingFileRef.Name
result.ValidationRoot = cmd.existingFileRef.ValidationRoot
result.FixedMerkleRoot = cmd.existingFileRef.FixedMerkleRoot
result.Size = cmd.existingFileRef.Size
result.IsFinal = true

cmd.allocationChange = &allocation.AllocationChange{}
cmd.allocationChange.ConnectionID = connectionObj.ID
cmd.allocationChange.ConnectionID = connectionID
cmd.allocationChange.Size = 0 - deleteSize
cmd.allocationChange.Operation = constants.FileOperationDelete

connectionObj.Size += cmd.allocationChange.Size
allocation.UpdateConnectionObjSize(connectionObj.ID, cmd.allocationChange.Size)
allocation.UpdateConnectionObjSize(connectionID, cmd.allocationChange.Size)

return result, nil
}

// ProcessThumbnail no thumbnail should be processed for delete. A deffered delete command has been added on ProcessContent
func (cmd *DeleteFileCommand) ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error {
func (cmd *DeleteFileCommand) ProcessThumbnail(allocationObj *allocation.Allocation) error {
//DO NOTHING
return nil
}
Loading
Loading