Skip to content

Commit

Permalink
Async processing (#1225)
Browse files Browse the repository at this point in the history
* fix pre download

* rmv hash

* add async processing

* use range

* fix unit test

* fix delete change

* add logs

* fix unit test

* fix delete cmd

* save file ref

* add conn timing logs

* set default to 32MB

* fix timing log

* add inner lock

* fix test

* add ctx to cancel go routine

* parallel write to file

* fix connection

* revert storage changes

* empty commit

* Fix phase locking (#1230)

* add logs for 2 phase lock

* add update log

* add root in log

* add update log

* use update lock

* add log in update repo

* use save

* use exec

* log rows affected

* use no key update

* use repo update

* cleanup

* add defer

* add defer

* rmv allocObj from connObj
  • Loading branch information
Hitenjain14 authored Sep 17, 2023
1 parent aebec77 commit 233112a
Show file tree
Hide file tree
Showing 15 changed files with 572 additions and 332 deletions.
344 changes: 315 additions & 29 deletions code/go/0chain.net/blobbercore/allocation/connection.go

Large diffs are not rendered by default.

38 changes: 38 additions & 0 deletions code/go/0chain.net/blobbercore/allocation/file_changer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package allocation

import (
"context"
"net/http"
"sync"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)
Expand Down Expand Up @@ -57,6 +59,42 @@ type BaseFileChanger struct {
ChunkEndIndex int `json:"chunk_end_index,omitempty"` // end index of chunks. all chunks MUST be uploaded one by one because of CompactMerkleTree
ChunkHash string `json:"chunk_hash,omitempty"`
UploadOffset int64 `json:"upload_offset,omitempty"` // It is next position that new incoming chunk should be append to
PathHash string `json:"-"` // hash of path
}

// swagger:model UploadResult
type UploadResult struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
Hash string `json:"hash"`
ValidationRoot string `json:"validation_root"`
FixedMerkleRoot string `json:"fixed_merkle_root"`

// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length"`
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset"`
IsFinal bool `json:"-"`
}

type FileCommand interface {

// GetExistingFileRef get file ref if it exists
GetExistingFileRef() *reference.Ref

GetPath() string

// IsValidated validate request, and try build ChangeProcesser instance
IsValidated(ctx context.Context, req *http.Request, allocationObj *Allocation, clientID string) error

// ProcessContent flush file to FileStorage
ProcessContent(allocationObj *Allocation) (UploadResult, error)

// ProcessThumbnail flush thumbnail file to FileStorage if it has.
ProcessThumbnail(allocationObj *Allocation) error

// UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation
UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error
}

func (fc *BaseFileChanger) DeleteTempFile() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
fPath := "/new"
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(fPath)
CreateConnectionChange("connection_id", pathHash, alloc)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)
change := &UploadFileChanger{
BaseFileChanger: BaseFileChanger{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(tc.path)
CreateConnectionChange("connection_id", pathHash, alloc)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)

change := &UpdateFileChanger{
Expand Down
24 changes: 4 additions & 20 deletions code/go/0chain.net/blobbercore/blobberhttp/response.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,12 @@
package blobberhttp

import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
)

// swagger:model UploadResult
type UploadResult struct {
Filename string `json:"filename"`
Size int64 `json:"size"`
Hash string `json:"hash"`
ValidationRoot string `json:"validation_root"`
FixedMerkleRoot string `json:"fixed_merkle_root"`

// UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer.
UploadLength int64 `json:"upload_length"`
// Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64 `json:"upload_offset"`
}

// swagger:model ConnectionResult
type ConnectionResult struct {
AllocationRoot string `json:"allocation_root"`
Expand All @@ -30,11 +15,10 @@ type ConnectionResult struct {

// swagger:model CommitResult
type CommitResult struct {
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
Changes []*allocation.AllocationChange `json:"-"`
AllocationRoot string `json:"allocation_root"`
WriteMarker *writemarker.WriteMarker `json:"write_marker"`
Success bool `json:"success"`
ErrorMessage string `json:"error_msg,omitempty"`
//Result []*UploadResult `json:"result"`
}

Expand Down
6 changes: 3 additions & 3 deletions code/go/0chain.net/blobbercore/convert/response_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func CopyObjectResponseCreator(r interface{}) *blobbergrpc.CopyObjectResponse {
return nil
}

httpResp, _ := r.(*blobberhttp.UploadResult)
httpResp, _ := r.(*allocation.UploadResult)
return &blobbergrpc.CopyObjectResponse{
Filename: httpResp.Filename,
Size: httpResp.Size,
Expand All @@ -171,7 +171,7 @@ func RenameObjectResponseCreator(r interface{}) *blobbergrpc.RenameObjectRespons
return nil
}

httpResp, _ := r.(*blobberhttp.UploadResult)
httpResp, _ := r.(*allocation.UploadResult)
return &blobbergrpc.RenameObjectResponse{
Filename: httpResp.Filename,
Size: httpResp.Size,
Expand Down Expand Up @@ -209,7 +209,7 @@ func UploadFileResponseCreator(r interface{}) *blobbergrpc.UploadFileResponse {
return nil
}

httpResp, _ := r.(*blobberhttp.UploadResult)
httpResp, _ := r.(*allocation.UploadResult)
return &blobbergrpc.UploadFileResponse{
Filename: httpResp.Filename,
Size: httpResp.Size,
Expand Down
4 changes: 2 additions & 2 deletions code/go/0chain.net/blobbercore/convert/response_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func GetCommitMetaTxnHandlerResponse(response *blobbergrpc.CommitMetaTxnResponse
return result
}

func CopyObjectResponseHandler(copyObjectResponse *blobbergrpc.CopyObjectResponse) *blobberhttp.UploadResult {
return &blobberhttp.UploadResult{
func CopyObjectResponseHandler(copyObjectResponse *blobbergrpc.CopyObjectResponse) *allocation.UploadResult {
return &allocation.UploadResult{
Filename: copyObjectResponse.Filename,
Size: copyObjectResponse.Size,
ValidationRoot: copyObjectResponse.ValidationRoot,
Expand Down
1 change: 0 additions & 1 deletion code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

_, err = io.CopyBuffer(fileData.Hasher, f, buf)
if err != nil {
return nil, common.NewError("file_read_error", err.Error())
Expand Down
22 changes: 1 addition & 21 deletions code/go/0chain.net/blobbercore/handler/file_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,15 @@ import (
"net/http"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
)

// FileCommand execute command for a file operation
type FileCommand interface {

// GetExistingFileRef get file ref if it exists
GetExistingFileRef() *reference.Ref

GetPath() string

// IsValidated validate request, and try build ChangeProcesser instance
IsValidated(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, clientID string) error

// ProcessContent flush file to FileStorage
ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error)

// ProcessThumbnail flush thumbnail file to FileStorage if it has.
ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error

// UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation
UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error
}

// createFileCommand create file command for UPLOAD,UPDATE and DELETE
func createFileCommand(req *http.Request) FileCommand {
func createFileCommand(req *http.Request) allocation.FileCommand {
switch req.Method {
case http.MethodPost:
return &UploadFileCommand{}
Expand Down
38 changes: 26 additions & 12 deletions code/go/0chain.net/blobbercore/handler/file_command_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"gorm.io/gorm"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)

// DeleteFileCommand command for deleting file
Expand All @@ -20,6 +20,7 @@ type DeleteFileCommand struct {
changeProcessor *allocation.DeleteFileChange
allocationChange *allocation.AllocationChange
path string
connectionID string
}

func (cmd *DeleteFileCommand) GetExistingFileRef() *reference.Ref {
Expand All @@ -43,15 +44,28 @@ func (cmd *DeleteFileCommand) IsValidated(ctx context.Context, req *http.Request

cmd.path = path

connectionID, ok := common.GetField(req, "connection_id")
if !ok {
return common.NewError("invalid_parameters", "Invalid connection id passed")
}
cmd.connectionID = connectionID
var err error
cmd.existingFileRef, err = reference.GetLimitedRefFieldsByPath(ctx, allocationObj.ID, path, []string{"path", "name", "size", "hash", "fixed_merkle_root"})
pathHash := encryption.Hash(path)
err = allocation.GetError(connectionID, pathHash)
if err != nil {
return err
}
lookUpHash := reference.GetReferenceLookup(allocationObj.ID, path)
cmd.existingFileRef, err = reference.GetLimitedRefFieldsByLookupHashWith(ctx, allocationObj.ID, lookUpHash, []string{"path", "name", "size", "hash", "fixed_merkle_root"})
if err != nil {
if errors.Is(gorm.ErrRecordNotFound, err) {
return common.ErrFileWasDeleted
}
return common.NewError("bad_db_operation", err.Error())
}
return nil
allocation.CreateConnectionChange(connectionID, pathHash, allocationObj)

return allocation.SetFinalized(connectionID, pathHash, cmd)
}

// UpdateChange add DeleteFileChange in db
Expand All @@ -62,32 +76,32 @@ func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context, connectionObj *a
}

// ProcessContent flush file to FileStorage
func (cmd *DeleteFileCommand) ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error) {
func (cmd *DeleteFileCommand) ProcessContent(allocationObj *allocation.Allocation) (allocation.UploadResult, error) {
deleteSize := cmd.existingFileRef.Size

cmd.changeProcessor = &allocation.DeleteFileChange{ConnectionID: connectionObj.ID,
AllocationID: connectionObj.AllocationID, Name: cmd.existingFileRef.Name,
connectionID := cmd.connectionID
cmd.changeProcessor = &allocation.DeleteFileChange{ConnectionID: connectionID,
AllocationID: allocationObj.ID, Name: cmd.existingFileRef.Name,
Hash: cmd.existingFileRef.Hash, Path: cmd.existingFileRef.Path, Size: deleteSize}

result := blobberhttp.UploadResult{}
result := allocation.UploadResult{}
result.Filename = cmd.existingFileRef.Name
result.ValidationRoot = cmd.existingFileRef.ValidationRoot
result.FixedMerkleRoot = cmd.existingFileRef.FixedMerkleRoot
result.Size = cmd.existingFileRef.Size
result.IsFinal = true

cmd.allocationChange = &allocation.AllocationChange{}
cmd.allocationChange.ConnectionID = connectionObj.ID
cmd.allocationChange.ConnectionID = connectionID
cmd.allocationChange.Size = 0 - deleteSize
cmd.allocationChange.Operation = constants.FileOperationDelete

connectionObj.Size += cmd.allocationChange.Size
allocation.UpdateConnectionObjSize(connectionObj.ID, cmd.allocationChange.Size)
allocation.UpdateConnectionObjSize(connectionID, cmd.allocationChange.Size)

return result, nil
}

// ProcessThumbnail no thumbnail should be processed for delete. A deffered delete command has been added on ProcessContent
func (cmd *DeleteFileCommand) ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error {
func (cmd *DeleteFileCommand) ProcessThumbnail(allocationObj *allocation.Allocation) error {
//DO NOTHING
return nil
}
Loading

0 comments on commit 233112a

Please sign in to comment.