From 233112a90022901e5a2f08cbd477b55526925afc Mon Sep 17 00:00:00 2001 From: Hitenjain14 <57557631+Hitenjain14@users.noreply.github.com> Date: Mon, 18 Sep 2023 02:22:17 +0530 Subject: [PATCH] Async processing (#1225) * fix pre download * rmv hash * add async processing * use range * fix unit test * fix delete change * add logs * fix unit test * fix delete cmd * save file ref * add conn timing logs * set default to 32MB * fix timing log * add inner lock * fix test * add ctx to cancel go routine * parallel write to file * fix connection * revert storage changes * empty commit * Fix phase locking (#1230) * add logs for 2 phase lock * add update log * add root in log * add update log * use update lock * add log in update repo * use save * use exec * log rows affected * use no key update * use repo update * cleanup * add defer * add defer * rmv allocObj from connObj --- .../blobbercore/allocation/connection.go | 344 ++++++++++++++++-- .../allocation/file_changer_base.go | 38 ++ .../allocation/file_changer_upload_test.go | 1 + .../allocation/updatefilechange_test.go | 1 + .../blobbercore/blobberhttp/response.go | 24 +- .../blobbercore/convert/response_creator.go | 6 +- .../blobbercore/convert/response_handler.go | 4 +- .../blobbercore/filestore/storage.go | 1 - .../blobbercore/handler/file_command.go | 22 +- .../handler/file_command_delete.go | 38 +- .../handler/file_command_update.go | 159 ++++---- .../handler/file_command_upload.go | 150 ++++---- .../blobbercore/handler/handler_test.go | 4 +- .../handler/object_operation_handler.go | 110 ++---- .../go/0chain.net/core/common/request_form.go | 2 +- 15 files changed, 572 insertions(+), 332 deletions(-) diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index ae366b768..bc4052de3 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -5,7 +5,12 @@ import ( "sync" "time" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" + "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/logging" + "go.uber.org/zap" ) var ( @@ -16,74 +21,348 @@ var ( ) var ( - connectionObjSizeMap = make(map[string]*ConnectionObjSize) - connectionObjMutex sync.RWMutex + connectionProcessor = make(map[string]*ConnectionProcessor) + connectionObjMutex sync.RWMutex ) -type ConnectionObjSize struct { +type ConnectionProcessor struct { Size int64 UpdatedAt time.Time - Changes map[string]*ConnectionChanges + lock sync.RWMutex + changes map[string]*ConnectionChange + ClientID string + ctx context.Context + ctxCancel context.CancelFunc } -type ConnectionChanges struct { - Hasher *filestore.CommitHasher +type ConnectionChange struct { + hasher *filestore.CommitHasher + baseChanger *BaseFileChanger + existingRef *reference.Ref + processError error + ProcessChan chan FileCommand + wg sync.WaitGroup + isFinalized bool +} + +func CreateConnectionChange(connectionID, pathHash string, allocationObj *Allocation) *ConnectionChange { + connectionObjMutex.Lock() + connectionObj := connectionProcessor[connectionID] + if connectionObj == nil { + ctx, cancel := context.WithCancel(context.Background()) + connectionObj = &ConnectionProcessor{ + UpdatedAt: time.Now(), + changes: make(map[string]*ConnectionChange), + ctx: ctx, + ctxCancel: cancel, + } + connectionProcessor[connectionID] = connectionObj + } + connectionObjMutex.Unlock() + connectionObj.lock.Lock() + connChange := &ConnectionChange{ + ProcessChan: make(chan FileCommand, 2), + wg: sync.WaitGroup{}, + } + connectionObj.changes[pathHash] = connChange + connectionObj.lock.Unlock() + connChange.wg.Add(1) + go func() { + processCommand(connectionObj.ctx, connChange.ProcessChan, allocationObj, connectionID, connectionObj.ClientID, pathHash) + connChange.wg.Done() + }() + return connChange +} + +func GetConnectionChange(connectionID, pathHash string) *ConnectionChange { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return nil + } + return connectionObj.changes[pathHash] +} + +func GetFileChanger(connectionID, pathHash string) *BaseFileChanger { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return nil + } + connectionObj.lock.RLock() + defer connectionObj.lock.RUnlock() + if connectionObj.changes[pathHash] == nil { + return nil + } + return connectionObj.changes[pathHash].baseChanger +} + +func SaveFileChanger(connectionID string, fileChanger *BaseFileChanger) error { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return common.NewError("connection_not_found", "connection not found") + } + connectionObj.lock.Lock() + if connectionObj.changes[fileChanger.PathHash] == nil { + return common.NewError("connection_change_not_found", "connection change not found") + } + connectionObj.changes[fileChanger.PathHash].baseChanger = fileChanger + connectionObj.lock.Unlock() + return nil +} + +func SaveExistingRef(connectionID, pathHash string, existingRef *reference.Ref) error { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return common.NewError("connection_not_found", "connection not found") + } + connectionObj.lock.Lock() + defer connectionObj.lock.Unlock() + if connectionObj.changes[pathHash] == nil { + return common.NewError("connection_change_not_found", "connection change not found") + } + connectionObj.changes[pathHash].existingRef = existingRef + return nil +} + +func GetExistingRef(connectionID, pathHash string) *reference.Ref { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return nil + } + connectionObj.lock.RLock() + defer connectionObj.lock.RUnlock() + if connectionObj.changes[pathHash] == nil { + return nil + } + return connectionObj.changes[pathHash].existingRef +} + +func SetFinalized(connectionID, pathHash string, cmd FileCommand) error { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return common.NewError("connection_not_found", "connection not found") + } + connectionObj.lock.Lock() + connChange := connectionObj.changes[pathHash] + // Can happen due to resume or redundant call + if connChange.isFinalized { + connectionObj.lock.Unlock() + connChange.wg.Wait() + return nil + } + connChange.isFinalized = true + connectionObj.lock.Unlock() + connChange.ProcessChan <- cmd + close(connChange.ProcessChan) + connChange.wg.Wait() + return GetError(connectionID, pathHash) +} + +func SendCommand(connectionID, pathHash string, cmd FileCommand) error { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return common.NewError("connection_not_found", "connection not found") + } + connectionObj.lock.RLock() + defer connectionObj.lock.RUnlock() + connChange := connectionObj.changes[pathHash] + if connChange == nil { + return common.NewError("connection_change_not_found", "connection change not found") + } + if connChange.processError != nil { + return connChange.processError + } + if connChange.isFinalized { + return common.NewError("connection_change_finalized", "connection change finalized") + } + connChange.ProcessChan <- cmd + return nil +} + +func GetConnectionProcessor(connectionID string) *ConnectionProcessor { + connectionObjMutex.RLock() + defer connectionObjMutex.RUnlock() + return connectionProcessor[connectionID] +} + +func CreateConnectionProcessor(connectionID string) *ConnectionProcessor { + connectionObjMutex.Lock() + defer connectionObjMutex.Unlock() + connectionObj := connectionProcessor[connectionID] + if connectionObj == nil { + ctx, cancel := context.WithCancel(context.Background()) + connectionObj = &ConnectionProcessor{ + UpdatedAt: time.Now(), + changes: make(map[string]*ConnectionChange), + ctx: ctx, + ctxCancel: cancel, + } + connectionProcessor[connectionID] = connectionObj + } + return connectionObj +} + +func SetError(connectionID, pathHash string, err error) { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return + } + connectionObj.lock.Lock() + connChange := connectionObj.changes[pathHash] + connChange.processError = err + connectionObj.lock.Unlock() + drainChan(connChange.ProcessChan) // drain the channel so that the no commands are blocked +} + +func GetError(connectionID, pathHash string) error { + connectionObjMutex.RLock() + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() + if connectionObj == nil { + return nil + } + connectionObj.lock.RLock() + defer connectionObj.lock.RUnlock() + connChange := connectionObj.changes[pathHash] + if connChange == nil { + return nil + } + return connChange.processError } // GetConnectionObjSize gets the connection size from the memory func GetConnectionObjSize(connectionID string) int64 { connectionObjMutex.RLock() defer connectionObjMutex.RUnlock() - connectionObjSize := connectionObjSizeMap[connectionID] - if connectionObjSize == nil { + connectionObj := connectionProcessor[connectionID] + if connectionObj == nil { return 0 } - return connectionObjSizeMap[connectionID].Size + return connectionObj.Size } // UpdateConnectionObjSize updates the connection size by addSize in memory func UpdateConnectionObjSize(connectionID string, addSize int64) { connectionObjMutex.Lock() defer connectionObjMutex.Unlock() - connectionObjSize := connectionObjSizeMap[connectionID] - if connectionObjSize == nil { - connectionObjSizeMap[connectionID] = &ConnectionObjSize{ + connectionObj := connectionProcessor[connectionID] + if connectionObj == nil { + connectionProcessor[connectionID] = &ConnectionProcessor{ Size: addSize, UpdatedAt: time.Now(), - Changes: make(map[string]*ConnectionChanges), + changes: make(map[string]*ConnectionChange), } return } - connectionObjSize.Size = connectionObjSize.Size + addSize - connectionObjSize.UpdatedAt = time.Now() + connectionObj.Size = connectionObj.Size + addSize + connectionObj.UpdatedAt = time.Now() } func GetHasher(connectionID, pathHash string) *filestore.CommitHasher { connectionObjMutex.RLock() - defer connectionObjMutex.RUnlock() - connectionObj := connectionObjSizeMap[connectionID] + connectionObj := connectionProcessor[connectionID] + connectionObjMutex.RUnlock() if connectionObj == nil { return nil } - if connectionObj.Changes[pathHash] == nil { + connectionObj.lock.RLock() + defer connectionObj.lock.RUnlock() + if connectionObj.changes[pathHash] == nil { return nil } - return connectionObj.Changes[pathHash].Hasher + return connectionObj.changes[pathHash].hasher } func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) { connectionObjMutex.Lock() - defer connectionObjMutex.Unlock() - connectionObj := connectionObjSizeMap[connectionID] + connectionObj := connectionProcessor[connectionID] if connectionObj == nil { - connectionObjSizeMap[connectionID] = &ConnectionObjSize{ + connectionObj = &ConnectionProcessor{ UpdatedAt: time.Now(), - Changes: make(map[string]*ConnectionChanges), + changes: make(map[string]*ConnectionChange), } + connectionProcessor[connectionID] = connectionObj } - connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{ - Hasher: hasher, + connectionObjMutex.Unlock() + connectionObj.lock.Lock() + connectionObj.changes[pathHash].hasher = hasher + connectionObj.lock.Unlock() +} + +func processCommand(ctx context.Context, processorChan chan FileCommand, allocationObj *Allocation, connectionID, clientID, pathHash string) { + + defer func() { + if r := recover(); r != nil { + logging.Logger.Error("Recovered panic", zap.String("connection_id", connectionID), zap.Any("error", r)) + SetError(connectionID, pathHash, common.NewError("panic", "Recovered panic")) + } + }() + + for { + select { + case <-ctx.Done(): + return + case cmd, ok := <-processorChan: + if cmd == nil || !ok { + return + } + res, err := cmd.ProcessContent(allocationObj) + if err != nil { + logging.Logger.Error("Error processing command", zap.String("connection_id", connectionID), zap.String("path", cmd.GetPath()), zap.Error(err)) + SetError(connectionID, pathHash, err) + return + } + err = cmd.ProcessThumbnail(allocationObj) + if err != nil { + logging.Logger.Error("Error processing command", zap.String("connection_id", connectionID), zap.String("path", cmd.GetPath()), zap.Error(err)) + SetError(connectionID, pathHash, err) + return + } + if res.IsFinal { + err = datastore.GetStore().WithNewTransaction(func(ctx context.Context) error { + connectionObj, err := GetAllocationChanges(ctx, connectionID, allocationObj.ID, clientID) + if err != nil { + return err + } + return cmd.UpdateChange(ctx, connectionObj) + }) + if err != nil { + logging.Logger.Error("Error processing command", zap.String("connection_id", connectionID), zap.String("path", cmd.GetPath()), zap.Error(err)) + SetError(connectionID, pathHash, err) + } + return + } + } + } + +} + +func drainChan(processorChan chan FileCommand) { + for { + select { + case _, ok := <-processorChan: + if !ok { + return + } + default: + return + } } } @@ -91,7 +370,11 @@ func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filest // If the given connectionID is not present, then it is no-op. func DeleteConnectionObjEntry(connectionID string) { connectionObjMutex.Lock() - delete(connectionObjSizeMap, connectionID) + connectionObj := connectionProcessor[connectionID] + if connectionObj != nil && connectionObj.ctxCancel != nil { + connectionObj.ctxCancel() + } + delete(connectionProcessor, connectionID) connectionObjMutex.Unlock() } @@ -99,10 +382,13 @@ func DeleteConnectionObjEntry(connectionID string) { // for which deadline is exceeded. func cleanConnectionObj() { connectionObjMutex.Lock() - for connectionID, connectionObjSize := range connectionObjSizeMap { - diff := time.Since(connectionObjSize.UpdatedAt) + for connectionID, connectionObj := range connectionProcessor { + diff := time.Since(connectionObj.UpdatedAt) if diff >= ConnectionObjTimeout { - delete(connectionObjSizeMap, connectionID) + if connectionObj.ctxCancel != nil { + connectionObj.ctxCancel() + } + delete(connectionProcessor, connectionID) } } connectionObjMutex.Unlock() diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go index be437ce84..8f9afdb5a 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go @@ -2,9 +2,11 @@ package allocation import ( "context" + "net/http" "sync" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/encryption" ) @@ -57,6 +59,42 @@ type BaseFileChanger struct { ChunkEndIndex int `json:"chunk_end_index,omitempty"` // end index of chunks. all chunks MUST be uploaded one by one because of CompactMerkleTree ChunkHash string `json:"chunk_hash,omitempty"` UploadOffset int64 `json:"upload_offset,omitempty"` // It is next position that new incoming chunk should be append to + PathHash string `json:"-"` // hash of path +} + +// swagger:model UploadResult +type UploadResult struct { + Filename string `json:"filename"` + Size int64 `json:"size"` + Hash string `json:"hash"` + ValidationRoot string `json:"validation_root"` + FixedMerkleRoot string `json:"fixed_merkle_root"` + + // UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. + UploadLength int64 `json:"upload_length"` + // Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. + UploadOffset int64 `json:"upload_offset"` + IsFinal bool `json:"-"` +} + +type FileCommand interface { + + // GetExistingFileRef get file ref if it exists + GetExistingFileRef() *reference.Ref + + GetPath() string + + // IsValidated validate request, and try build ChangeProcesser instance + IsValidated(ctx context.Context, req *http.Request, allocationObj *Allocation, clientID string) error + + // ProcessContent flush file to FileStorage + ProcessContent(allocationObj *Allocation) (UploadResult, error) + + // ProcessThumbnail flush thumbnail file to FileStorage if it has. + ProcessThumbnail(allocationObj *Allocation) error + + // UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation + UpdateChange(ctx context.Context, connectionObj *AllocationChangeCollector) error } func (fc *BaseFileChanger) DeleteTempFile() error { diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go b/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go index 3994650f2..a5449c2da 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go @@ -102,6 +102,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) { fPath := "/new" hasher := filestore.GetNewCommitHasher(2310) pathHash := encryption.Hash(fPath) + CreateConnectionChange("connection_id", pathHash, alloc) UpdateConnectionObjWithHasher("connection_id", pathHash, hasher) change := &UploadFileChanger{ BaseFileChanger: BaseFileChanger{ diff --git a/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go b/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go index 452f1dfb7..02f2113d8 100644 --- a/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go @@ -341,6 +341,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) { ctx := datastore.GetStore().CreateTransaction(context.TODO()) hasher := filestore.GetNewCommitHasher(2310) pathHash := encryption.Hash(tc.path) + CreateConnectionChange("connection_id", pathHash, alloc) UpdateConnectionObjWithHasher("connection_id", pathHash, hasher) change := &UpdateFileChanger{ diff --git a/code/go/0chain.net/blobbercore/blobberhttp/response.go b/code/go/0chain.net/blobbercore/blobberhttp/response.go index dc0ecd069..b48fcdbd6 100644 --- a/code/go/0chain.net/blobbercore/blobberhttp/response.go +++ b/code/go/0chain.net/blobbercore/blobberhttp/response.go @@ -1,27 +1,12 @@ package blobberhttp import ( - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/readmarker" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/writemarker" "github.com/0chain/blobber/code/go/0chain.net/core/common" ) -// swagger:model UploadResult -type UploadResult struct { - Filename string `json:"filename"` - Size int64 `json:"size"` - Hash string `json:"hash"` - ValidationRoot string `json:"validation_root"` - FixedMerkleRoot string `json:"fixed_merkle_root"` - - // UploadLength indicates the size of the entire upload in bytes. The value MUST be a non-negative integer. - UploadLength int64 `json:"upload_length"` - // Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. - UploadOffset int64 `json:"upload_offset"` -} - // swagger:model ConnectionResult type ConnectionResult struct { AllocationRoot string `json:"allocation_root"` @@ -30,11 +15,10 @@ type ConnectionResult struct { // swagger:model CommitResult type CommitResult struct { - AllocationRoot string `json:"allocation_root"` - WriteMarker *writemarker.WriteMarker `json:"write_marker"` - Success bool `json:"success"` - ErrorMessage string `json:"error_msg,omitempty"` - Changes []*allocation.AllocationChange `json:"-"` + AllocationRoot string `json:"allocation_root"` + WriteMarker *writemarker.WriteMarker `json:"write_marker"` + Success bool `json:"success"` + ErrorMessage string `json:"error_msg,omitempty"` //Result []*UploadResult `json:"result"` } diff --git a/code/go/0chain.net/blobbercore/convert/response_creator.go b/code/go/0chain.net/blobbercore/convert/response_creator.go index 6def34499..39748dbdd 100644 --- a/code/go/0chain.net/blobbercore/convert/response_creator.go +++ b/code/go/0chain.net/blobbercore/convert/response_creator.go @@ -155,7 +155,7 @@ func CopyObjectResponseCreator(r interface{}) *blobbergrpc.CopyObjectResponse { return nil } - httpResp, _ := r.(*blobberhttp.UploadResult) + httpResp, _ := r.(*allocation.UploadResult) return &blobbergrpc.CopyObjectResponse{ Filename: httpResp.Filename, Size: httpResp.Size, @@ -171,7 +171,7 @@ func RenameObjectResponseCreator(r interface{}) *blobbergrpc.RenameObjectRespons return nil } - httpResp, _ := r.(*blobberhttp.UploadResult) + httpResp, _ := r.(*allocation.UploadResult) return &blobbergrpc.RenameObjectResponse{ Filename: httpResp.Filename, Size: httpResp.Size, @@ -209,7 +209,7 @@ func UploadFileResponseCreator(r interface{}) *blobbergrpc.UploadFileResponse { return nil } - httpResp, _ := r.(*blobberhttp.UploadResult) + httpResp, _ := r.(*allocation.UploadResult) return &blobbergrpc.UploadFileResponse{ Filename: httpResp.Filename, Size: httpResp.Size, diff --git a/code/go/0chain.net/blobbercore/convert/response_handler.go b/code/go/0chain.net/blobbercore/convert/response_handler.go index d7c0fc444..ba4dc171e 100644 --- a/code/go/0chain.net/blobbercore/convert/response_handler.go +++ b/code/go/0chain.net/blobbercore/convert/response_handler.go @@ -117,8 +117,8 @@ func GetCommitMetaTxnHandlerResponse(response *blobbergrpc.CommitMetaTxnResponse return result } -func CopyObjectResponseHandler(copyObjectResponse *blobbergrpc.CopyObjectResponse) *blobberhttp.UploadResult { - return &blobberhttp.UploadResult{ +func CopyObjectResponseHandler(copyObjectResponse *blobbergrpc.CopyObjectResponse) *allocation.UploadResult { + return &allocation.UploadResult{ Filename: copyObjectResponse.Filename, Size: copyObjectResponse.Size, ValidationRoot: copyObjectResponse.ValidationRoot, diff --git a/code/go/0chain.net/blobbercore/filestore/storage.go b/code/go/0chain.net/blobbercore/filestore/storage.go index 2b686b7ab..820cfd6ba 100644 --- a/code/go/0chain.net/blobbercore/filestore/storage.go +++ b/code/go/0chain.net/blobbercore/filestore/storage.go @@ -103,7 +103,6 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i if err != nil { return nil, common.NewError("file_seek_error", err.Error()) } - _, err = io.CopyBuffer(fileData.Hasher, f, buf) if err != nil { return nil, common.NewError("file_read_error", err.Error()) diff --git a/code/go/0chain.net/blobbercore/handler/file_command.go b/code/go/0chain.net/blobbercore/handler/file_command.go index 2035f98b0..078d4241c 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command.go +++ b/code/go/0chain.net/blobbercore/handler/file_command.go @@ -6,35 +6,15 @@ import ( "net/http" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/logging" ) // FileCommand execute command for a file operation -type FileCommand interface { - - // GetExistingFileRef get file ref if it exists - GetExistingFileRef() *reference.Ref - - GetPath() string - - // IsValidated validate request, and try build ChangeProcesser instance - IsValidated(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, clientID string) error - - // ProcessContent flush file to FileStorage - ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error) - - // ProcessThumbnail flush thumbnail file to FileStorage if it has. - ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error - - // UpdateChange update AllocationChangeProcessor. It will be president in db for committing transcation - UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error -} // createFileCommand create file command for UPLOAD,UPDATE and DELETE -func createFileCommand(req *http.Request) FileCommand { +func createFileCommand(req *http.Request) allocation.FileCommand { switch req.Method { case http.MethodPost: return &UploadFileCommand{} diff --git a/code/go/0chain.net/blobbercore/handler/file_command_delete.go b/code/go/0chain.net/blobbercore/handler/file_command_delete.go index 8c51bbdb3..a3f1e5b5a 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_delete.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_delete.go @@ -9,9 +9,9 @@ import ( "gorm.io/gorm" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" ) // DeleteFileCommand command for deleting file @@ -20,6 +20,7 @@ type DeleteFileCommand struct { changeProcessor *allocation.DeleteFileChange allocationChange *allocation.AllocationChange path string + connectionID string } func (cmd *DeleteFileCommand) GetExistingFileRef() *reference.Ref { @@ -43,15 +44,28 @@ func (cmd *DeleteFileCommand) IsValidated(ctx context.Context, req *http.Request cmd.path = path + connectionID, ok := common.GetField(req, "connection_id") + if !ok { + return common.NewError("invalid_parameters", "Invalid connection id passed") + } + cmd.connectionID = connectionID var err error - cmd.existingFileRef, err = reference.GetLimitedRefFieldsByPath(ctx, allocationObj.ID, path, []string{"path", "name", "size", "hash", "fixed_merkle_root"}) + pathHash := encryption.Hash(path) + err = allocation.GetError(connectionID, pathHash) + if err != nil { + return err + } + lookUpHash := reference.GetReferenceLookup(allocationObj.ID, path) + cmd.existingFileRef, err = reference.GetLimitedRefFieldsByLookupHashWith(ctx, allocationObj.ID, lookUpHash, []string{"path", "name", "size", "hash", "fixed_merkle_root"}) if err != nil { if errors.Is(gorm.ErrRecordNotFound, err) { return common.ErrFileWasDeleted } return common.NewError("bad_db_operation", err.Error()) } - return nil + allocation.CreateConnectionChange(connectionID, pathHash, allocationObj) + + return allocation.SetFinalized(connectionID, pathHash, cmd) } // UpdateChange add DeleteFileChange in db @@ -62,32 +76,32 @@ func (cmd *DeleteFileCommand) UpdateChange(ctx context.Context, connectionObj *a } // ProcessContent flush file to FileStorage -func (cmd *DeleteFileCommand) ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error) { +func (cmd *DeleteFileCommand) ProcessContent(allocationObj *allocation.Allocation) (allocation.UploadResult, error) { deleteSize := cmd.existingFileRef.Size - - cmd.changeProcessor = &allocation.DeleteFileChange{ConnectionID: connectionObj.ID, - AllocationID: connectionObj.AllocationID, Name: cmd.existingFileRef.Name, + connectionID := cmd.connectionID + cmd.changeProcessor = &allocation.DeleteFileChange{ConnectionID: connectionID, + AllocationID: allocationObj.ID, Name: cmd.existingFileRef.Name, Hash: cmd.existingFileRef.Hash, Path: cmd.existingFileRef.Path, Size: deleteSize} - result := blobberhttp.UploadResult{} + result := allocation.UploadResult{} result.Filename = cmd.existingFileRef.Name result.ValidationRoot = cmd.existingFileRef.ValidationRoot result.FixedMerkleRoot = cmd.existingFileRef.FixedMerkleRoot result.Size = cmd.existingFileRef.Size + result.IsFinal = true cmd.allocationChange = &allocation.AllocationChange{} - cmd.allocationChange.ConnectionID = connectionObj.ID + cmd.allocationChange.ConnectionID = connectionID cmd.allocationChange.Size = 0 - deleteSize cmd.allocationChange.Operation = constants.FileOperationDelete - connectionObj.Size += cmd.allocationChange.Size - allocation.UpdateConnectionObjSize(connectionObj.ID, cmd.allocationChange.Size) + allocation.UpdateConnectionObjSize(connectionID, cmd.allocationChange.Size) return result, nil } // ProcessThumbnail no thumbnail should be processed for delete. A deffered delete command has been added on ProcessContent -func (cmd *DeleteFileCommand) ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error { +func (cmd *DeleteFileCommand) ProcessThumbnail(allocationObj *allocation.Allocation) error { //DO NOTHING return nil } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_update.go b/code/go/0chain.net/blobbercore/handler/file_command_update.go index b5fd08714..b1f806b60 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_update.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_update.go @@ -4,10 +4,10 @@ import ( "context" "encoding/json" "fmt" + "mime/multipart" "net/http" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" @@ -27,6 +27,9 @@ type UpdateFileCommand struct { existingFileRef *reference.Ref fileChanger *allocation.UpdateFileChanger allocationChange *allocation.AllocationChange + contentFile multipart.File + thumbFile multipart.File + thumbHeader *multipart.FileHeader } func (cmd *UpdateFileCommand) GetExistingFileRef() *reference.Ref { @@ -42,6 +45,10 @@ func (cmd *UpdateFileCommand) GetPath() string { // IsValidated validate request. func (cmd *UpdateFileCommand) IsValidated(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, clientID string) error { + if allocationObj.OwnerID != clientID && allocationObj.RepairerID != clientID { + return common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation") + } + uploadMetaString := req.FormValue(UploadMeta) if uploadMetaString == "" { @@ -54,64 +61,85 @@ func (cmd *UpdateFileCommand) IsValidated(ctx context.Context, req *http.Request return common.NewError("invalid_parameters", "Invalid parameters. Error parsing the meta data for upload."+err.Error()) } - logging.Logger.Info("UpdateFileCommand", zap.Any("allocation_id", allocationObj.ID), zap.Any("validation_rooot", cmd.fileChanger.ValidationRoot), zap.Any("thumb_hash", cmd.fileChanger.ThumbnailHash)) + + if cmd.fileChanger.ConnectionID == "" { + return common.NewError("invalid_connection", "Invalid connection id") + } + + cmd.fileChanger.PathHash = encryption.Hash(cmd.fileChanger.Path) if cmd.fileChanger.ChunkSize <= 0 { cmd.fileChanger.ChunkSize = fileref.CHUNK_SIZE } - cmd.existingFileRef, _ = reference.GetReference(ctx, allocationObj.ID, cmd.fileChanger.Path) - if cmd.existingFileRef == nil { - return common.NewError("invalid_file_update", "File at path does not exist for update") + err = allocation.GetError(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash) + if err != nil { + return err } - if allocationObj.OwnerID != clientID && - allocationObj.RepairerID != clientID { - return common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation") + // Check if ref exists at start of update or get existing ref + if cmd.fileChanger.UploadOffset == 0 { + logging.Logger.Info("UpdateFile ref exists check") + cmd.existingFileRef, _ = reference.GetReference(ctx, allocationObj.ID, cmd.fileChanger.Path) + if cmd.existingFileRef == nil { + return common.NewError("invalid_file_update", "File at path does not exist for update") + } + logging.Logger.Info("UpdateFile ref exists check done", zap.Any("ref", cmd.existingFileRef)) + allocation.CreateConnectionChange(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash, allocationObj) + err = allocation.SaveExistingRef(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash, cmd.existingFileRef) + if err != nil { + return common.NewError("invalid_file_update", "Error saving existing ref") + } + } else { + cmd.existingFileRef = allocation.GetExistingRef(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash) + if cmd.existingFileRef == nil { + return common.NewError("invalid_file_update", "Existing file reference is nil") + } } - _, thumbHeader, _ := req.FormFile(UploadThumbnailFile) + thumbFile, thumbHeader, _ := req.FormFile(UploadThumbnailFile) if thumbHeader != nil { if thumbHeader.Size > MaxThumbnailSize { return common.NewError("max_thumbnail_size", fmt.Sprintf("thumbnail size %d should not be greater than %d", thumbHeader.Size, MaxThumbnailSize)) } + cmd.thumbFile = thumbFile + cmd.thumbHeader = thumbHeader } - return nil + origfile, _, err := req.FormFile(UploadFile) + if err != nil { + return common.NewError("invalid_parameters", "Error Reading multi parts for file."+err.Error()) + } + cmd.contentFile = origfile + if cmd.fileChanger.IsFinal { + return allocation.SetFinalized(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash, cmd) + } + return allocation.SendCommand(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash, cmd) } // ProcessContent flush file to FileStorage -func (cmd *UpdateFileCommand) ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error) { - result := blobberhttp.UploadResult{} +func (cmd *UpdateFileCommand) ProcessContent(allocationObj *allocation.Allocation) (allocation.UploadResult, error) { + result := allocation.UploadResult{} result.Filename = cmd.fileChanger.Filename - - isFinal := cmd.fileChanger.IsFinal - cmd.fileChanger.IsFinal = false - cmd.reloadChange(connectionObj) + defer cmd.contentFile.Close() if cmd.fileChanger.IsFinal { - return result, nil - } - cmd.fileChanger.IsFinal = isFinal - - origfile, _, err := req.FormFile(UploadFile) - if err != nil { - return result, common.NewError("invalid_parameters", "Error Reading multi parts for file."+err.Error()) + cmd.reloadChange() } - defer origfile.Close() if cmd.fileChanger.Size == 0 { return result, common.NewError("invalid_parameters", "Invalid parameters. Size cannot be zero") } var hasher *filestore.CommitHasher - filePathHash := encryption.Hash(cmd.fileChanger.Path) + filePathHash := cmd.fileChanger.PathHash + connID := cmd.fileChanger.ConnectionID if cmd.fileChanger.UploadOffset == 0 { hasher = filestore.GetNewCommitHasher(cmd.fileChanger.Size) - allocation.UpdateConnectionObjWithHasher(connectionObj.ID, filePathHash, hasher) + allocation.UpdateConnectionObjWithHasher(connID, filePathHash, hasher) } else { - hasher = allocation.GetHasher(connectionObj.ID, filePathHash) + hasher = allocation.GetHasher(connID, filePathHash) if hasher == nil { return result, common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for upload.") } @@ -125,7 +153,7 @@ func (cmd *UpdateFileCommand) ProcessContent(ctx context.Context, req *http.Requ FilePathHash: filePathHash, Hasher: hasher, } - fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, fileInputData, origfile) + fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connID, fileInputData, cmd.contentFile) if err != nil { return result, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) } @@ -135,17 +163,18 @@ func (cmd *UpdateFileCommand) ProcessContent(ctx context.Context, req *http.Requ if err != nil { return result, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) } + result.IsFinal = true } result.ValidationRoot = fileOutputData.ValidationRoot result.FixedMerkleRoot = fileOutputData.FixedMerkleRoot result.Size = fileOutputData.Size - allocationSize := connectionObj.Size + allocationSize := allocation.GetConnectionObjSize(connID) if fileOutputData.ChunkUploaded { allocationSize += fileOutputData.Size - allocation.UpdateConnectionObjSize(connectionObj.ID, fileOutputData.Size) + allocation.UpdateConnectionObjSize(connID, fileOutputData.Size) } if allocationObj.BlobberSizeUsed+(allocationSize-cmd.existingFileRef.Size) > allocationObj.BlobberSize { @@ -153,88 +182,50 @@ func (cmd *UpdateFileCommand) ProcessContent(ctx context.Context, req *http.Requ } cmd.fileChanger.AllocationID = allocationObj.ID - // cmd.fileChanger.Size += fileOutputData.Size cmd.allocationChange = &allocation.AllocationChange{} - cmd.allocationChange.ConnectionID = connectionObj.ID + cmd.allocationChange.ConnectionID = connID cmd.allocationChange.Size = cmd.fileChanger.Size - cmd.existingFileRef.Size cmd.allocationChange.Operation = sdkConst.FileOperationUpdate if cmd.fileChanger.IsFinal { - connectionObj.Size = allocationSize - cmd.existingFileRef.Size - allocation.UpdateConnectionObjSize(connectionObj.ID, -cmd.existingFileRef.Size) - } else { - connectionObj.Size = allocationSize + allocation.UpdateConnectionObjSize(connID, -cmd.existingFileRef.Size) } return result, nil } // ProcessThumbnail flush thumbnail file to FileStorage if it has. -func (cmd *UpdateFileCommand) ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error { - thumbfile, thumbHeader, _ := req.FormFile(UploadThumbnailFile) - - if thumbHeader != nil { - defer thumbfile.Close() +func (cmd *UpdateFileCommand) ProcessThumbnail(allocationObj *allocation.Allocation) error { + connectionID := cmd.fileChanger.ConnectionID + if cmd.thumbHeader != nil { + defer cmd.thumbFile.Close() - thumbInputData := &filestore.FileInputData{Name: thumbHeader.Filename, Path: cmd.fileChanger.Path, IsThumbnail: true, FilePathHash: encryption.Hash(cmd.fileChanger.Path)} - thumbOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, thumbInputData, thumbfile) + thumbInputData := &filestore.FileInputData{Name: cmd.thumbHeader.Filename, Path: cmd.fileChanger.Path, IsThumbnail: true, FilePathHash: cmd.fileChanger.PathHash} + thumbOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionID, thumbInputData, cmd.thumbFile) if err != nil { return common.NewError("upload_error", "Failed to upload the thumbnail. "+err.Error()) } cmd.fileChanger.ThumbnailSize = thumbOutputData.Size cmd.fileChanger.ThumbnailFilename = thumbInputData.Name + err = allocation.SaveFileChanger(connectionID, &cmd.fileChanger.BaseFileChanger) + return err } - return nil } -func (cmd *UpdateFileCommand) reloadChange(connectionObj *allocation.AllocationChangeCollector) { - for _, c := range connectionObj.Changes { - filePath, _ := c.GetOrParseAffectedFilePath() - if c.Operation != sdkConst.FileOperationUpdate || cmd.fileChanger.Path != filePath { - continue - } - - dbFileChanger := &allocation.UpdateFileChanger{} - - err := dbFileChanger.Unmarshal(c.Input) - if err != nil { - logging.Logger.Error("reloadChange", zap.Error(err)) - } - - // reload uploaded size from db, it was chunk size from client - cmd.fileChanger.ThumbnailFilename = dbFileChanger.ThumbnailFilename - cmd.fileChanger.ThumbnailSize = dbFileChanger.ThumbnailSize - cmd.fileChanger.ThumbnailHash = dbFileChanger.ThumbnailHash - cmd.fileChanger.IsFinal = dbFileChanger.IsFinal - return +func (cmd *UpdateFileCommand) reloadChange() { + changer := allocation.GetFileChanger(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash) + if changer != nil { + cmd.fileChanger.ThumbnailFilename = changer.ThumbnailFilename + cmd.fileChanger.ThumbnailSize = changer.ThumbnailSize + cmd.fileChanger.ThumbnailHash = changer.ThumbnailHash } } // UpdateChange add UpdateFileChanger in db func (cmd *UpdateFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error { - for _, c := range connectionObj.Changes { - filePath, _ := c.GetOrParseAffectedFilePath() - if c.Operation != sdkConst.FileOperationUpdate || cmd.fileChanger.Path != filePath { - continue - } - - c.Size = connectionObj.Size - c.Input, _ = cmd.fileChanger.Marshal() - - //c.ModelWithTS.UpdatedAt = time.Now() - err := connectionObj.Save(ctx) - if err != nil { - return err - } - - return c.Save(ctx) - } - - //NOT FOUND connectionObj.AddChange(cmd.allocationChange, cmd.fileChanger) - return connectionObj.Save(ctx) } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index b319d53c1..bcdba7899 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -4,20 +4,20 @@ import ( "context" "encoding/json" "fmt" + "mime/multipart" "net/http" "path/filepath" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" + "go.uber.org/zap" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/allocation" - "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/core/common" "github.com/0chain/blobber/code/go/0chain.net/core/encryption" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/gosdk/constants" "github.com/0chain/gosdk/zboxcore/fileref" - "go.uber.org/zap" ) const ( @@ -31,6 +31,9 @@ const ( type UploadFileCommand struct { allocationChange *allocation.AllocationChange fileChanger *allocation.UploadFileChanger + contentFile multipart.File + thumbFile multipart.File + thumbHeader *multipart.FileHeader } func (cmd *UploadFileCommand) GetExistingFileRef() *reference.Ref { @@ -66,63 +69,76 @@ func (cmd *UploadFileCommand) IsValidated(ctx context.Context, req *http.Request return common.NewError("invalid_path", fmt.Sprintf("%v is not absolute path", fileChanger.Path)) } - isExist, err := reference.IsRefExist(ctx, allocationObj.ID, fileChanger.Path) + if fileChanger.ConnectionID == "" { + return common.NewError("invalid_connection", "Invalid connection id") + } + fileChanger.PathHash = encryption.Hash(fileChanger.Path) + + err = allocation.GetError(fileChanger.ConnectionID, fileChanger.PathHash) if err != nil { - logging.Logger.Error(err.Error()) - return common.NewError("database_error", "Got db error while getting ref") + return err } - if isExist { - msg := fmt.Sprintf("File at path :%s: already exists", fileChanger.Path) - return common.NewError("duplicate_file", msg) + if fileChanger.UploadOffset == 0 { + isExist, err := reference.IsRefExist(ctx, allocationObj.ID, fileChanger.Path) + + if err != nil { + logging.Logger.Error(err.Error()) + return common.NewError("database_error", "Got db error while getting ref") + } + + if isExist { + msg := fmt.Sprintf("File at path :%s: already exists", fileChanger.Path) + return common.NewError("duplicate_file", msg) + } + allocation.CreateConnectionChange(fileChanger.ConnectionID, fileChanger.PathHash, allocationObj) } - _, thumbHeader, _ := req.FormFile(UploadThumbnailFile) + thumbFile, thumbHeader, _ := req.FormFile(UploadThumbnailFile) if thumbHeader != nil { if thumbHeader.Size > MaxThumbnailSize { return common.NewError("max_thumbnail_size", fmt.Sprintf("thumbnail size %d should not be greater than %d", thumbHeader.Size, MaxThumbnailSize)) } + cmd.thumbFile = thumbFile + cmd.thumbHeader = thumbHeader } if fileChanger.ChunkSize <= 0 { fileChanger.ChunkSize = fileref.CHUNK_SIZE } + origfile, _, err := req.FormFile(UploadFile) + if err != nil { + return common.NewError("invalid_parameters", "Error Reading multi parts for file."+err.Error()) + } + cmd.contentFile = origfile cmd.fileChanger = fileChanger - return nil + logging.Logger.Info("UploadFileCommand.IsValidated") + if fileChanger.IsFinal { + return allocation.SetFinalized(fileChanger.ConnectionID, fileChanger.PathHash, cmd) + } + return allocation.SendCommand(fileChanger.ConnectionID, fileChanger.PathHash, cmd) } // ProcessContent flush file to FileStorage -func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) (blobberhttp.UploadResult, error) { - result := blobberhttp.UploadResult{} - - origfile, _, err := req.FormFile(UploadFile) - if err != nil { - return result, common.NewError("invalid_parameters", "Error Reading multi parts for file."+err.Error()) - } - defer origfile.Close() - isFinal := cmd.fileChanger.IsFinal - cmd.fileChanger.IsFinal = false - cmd.reloadChange(connectionObj) +func (cmd *UploadFileCommand) ProcessContent(allocationObj *allocation.Allocation) (allocation.UploadResult, error) { + result := allocation.UploadResult{} + defer cmd.contentFile.Close() if cmd.fileChanger.IsFinal { - result.Filename = cmd.fileChanger.Filename - return result, nil + cmd.reloadChange() } - cmd.fileChanger.IsFinal = isFinal - + connectionID := cmd.fileChanger.ConnectionID var hasher *filestore.CommitHasher - filePathHash := encryption.Hash(cmd.fileChanger.Path) if cmd.fileChanger.Size == 0 { return result, common.NewError("invalid_parameters", "Invalid parameters. Size cannot be zero") } - if cmd.fileChanger.UploadOffset == 0 { hasher = filestore.GetNewCommitHasher(cmd.fileChanger.Size) - allocation.UpdateConnectionObjWithHasher(connectionObj.ID, filePathHash, hasher) + allocation.UpdateConnectionObjWithHasher(connectionID, cmd.fileChanger.PathHash, hasher) } else { - hasher = allocation.GetHasher(connectionObj.ID, filePathHash) + hasher = allocation.GetHasher(connectionID, cmd.fileChanger.PathHash) if hasher == nil { return result, common.NewError("invalid_parameters", "Error getting hasher for upload.") } @@ -135,11 +151,12 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Requ ChunkSize: cmd.fileChanger.ChunkSize, UploadOffset: cmd.fileChanger.UploadOffset, IsFinal: cmd.fileChanger.IsFinal, - FilePathHash: filePathHash, + FilePathHash: cmd.fileChanger.PathHash, Hasher: hasher, } - fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, fileInputData, origfile) + fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionID, fileInputData, cmd.contentFile) if err != nil { + logging.Logger.Error("UploadFileCommand.ProcessContent", zap.Error(err)) return result, common.NewError("upload_error", "Failed to write file. "+err.Error()) } @@ -148,18 +165,19 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Requ if err != nil { return result, common.NewError("upload_error", "Failed to finalize the hasher. "+err.Error()) } + result.IsFinal = true } result.Filename = cmd.fileChanger.Filename result.ValidationRoot = fileOutputData.ValidationRoot result.Size = fileOutputData.Size - allocationSize := connectionObj.Size + allocationSize := allocation.GetConnectionObjSize(connectionID) // only update connection size when the chunk is uploaded. if fileOutputData.ChunkUploaded { allocationSize += fileOutputData.Size - allocation.UpdateConnectionObjSize(connectionObj.ID, fileOutputData.Size) + allocation.UpdateConnectionObjSize(connectionID, fileOutputData.Size) } if allocationObj.BlobberSizeUsed+allocationSize > allocationObj.BlobberSize { @@ -167,81 +185,45 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Requ } cmd.fileChanger.AllocationID = allocationObj.ID - // cmd.fileChanger.Size += fileOutputData.Size cmd.allocationChange = &allocation.AllocationChange{} - cmd.allocationChange.ConnectionID = connectionObj.ID + cmd.allocationChange.ConnectionID = connectionID cmd.allocationChange.Size = cmd.fileChanger.Size cmd.allocationChange.Operation = constants.FileOperationInsert - - connectionObj.Size = allocationSize - + logging.Logger.Info("Chunk processed") return result, nil } // ProcessThumbnail flush thumbnail file to FileStorage if it has. -func (cmd *UploadFileCommand) ProcessThumbnail(ctx context.Context, req *http.Request, allocationObj *allocation.Allocation, connectionObj *allocation.AllocationChangeCollector) error { - thumbfile, thumbHeader, _ := req.FormFile(UploadThumbnailFile) +func (cmd *UploadFileCommand) ProcessThumbnail(allocationObj *allocation.Allocation) error { + connectionID := cmd.fileChanger.ConnectionID + if cmd.thumbHeader != nil { + defer cmd.thumbFile.Close() - if thumbHeader != nil { - defer thumbfile.Close() - - thumbInputData := &filestore.FileInputData{Name: thumbHeader.Filename, Path: cmd.fileChanger.Path, IsThumbnail: true, FilePathHash: encryption.Hash(cmd.fileChanger.Path)} - thumbOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, thumbInputData, thumbfile) + thumbInputData := &filestore.FileInputData{Name: cmd.thumbHeader.Filename, Path: cmd.fileChanger.Path, IsThumbnail: true, FilePathHash: cmd.fileChanger.PathHash} + thumbOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionID, thumbInputData, cmd.thumbFile) if err != nil { return common.NewError("upload_error", "Failed to upload the thumbnail. "+err.Error()) } cmd.fileChanger.ThumbnailSize = thumbOutputData.Size cmd.fileChanger.ThumbnailFilename = thumbInputData.Name + return allocation.SaveFileChanger(connectionID, &cmd.fileChanger.BaseFileChanger) } - return nil } -func (cmd *UploadFileCommand) reloadChange(connectionObj *allocation.AllocationChangeCollector) { - for _, c := range connectionObj.Changes { - filePath, _ := c.GetOrParseAffectedFilePath() - if c.Operation != constants.FileOperationInsert || cmd.fileChanger.Path != filePath { - continue - } - - dbChangeProcessor := &allocation.UploadFileChanger{} - - err := dbChangeProcessor.Unmarshal(c.Input) - if err != nil { - logging.Logger.Error("reloadChange", zap.Error(err)) - } - - cmd.fileChanger.ThumbnailFilename = dbChangeProcessor.ThumbnailFilename - cmd.fileChanger.ThumbnailSize = dbChangeProcessor.ThumbnailSize - cmd.fileChanger.ThumbnailHash = dbChangeProcessor.ThumbnailHash - cmd.fileChanger.IsFinal = dbChangeProcessor.IsFinal - return +func (cmd *UploadFileCommand) reloadChange() { + changer := allocation.GetFileChanger(cmd.fileChanger.ConnectionID, cmd.fileChanger.PathHash) + if changer != nil { + cmd.fileChanger.ThumbnailFilename = changer.ThumbnailFilename + cmd.fileChanger.ThumbnailSize = changer.ThumbnailSize + cmd.fileChanger.ThumbnailHash = changer.ThumbnailHash } } // UpdateChange replace AddFileChange in db func (cmd *UploadFileCommand) UpdateChange(ctx context.Context, connectionObj *allocation.AllocationChangeCollector) error { - for _, c := range connectionObj.Changes { - filePath, _ := c.GetOrParseAffectedFilePath() - if c.Operation != constants.FileOperationInsert || cmd.fileChanger.Path != filePath { - continue - } - c.Size = connectionObj.Size - c.Input, _ = cmd.fileChanger.Marshal() - - //c.ModelWithTS.UpdatedAt = time.Now() - err := connectionObj.Save(ctx) - if err != nil { - return err - } - - return c.Save(ctx) - } - - //NOT FOUND connectionObj.AddChange(cmd.allocationChange, cmd.fileChanger) - return connectionObj.Save(ctx) } diff --git a/code/go/0chain.net/blobbercore/handler/handler_test.go b/code/go/0chain.net/blobbercore/handler/handler_test.go index fa84417db..49a7b712a 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_test.go @@ -354,7 +354,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { q := url.Query() formFieldByt, err := json.Marshal( &allocation.UploadFileChanger{ - BaseFileChanger: allocation.BaseFileChanger{Path: path}}) + BaseFileChanger: allocation.BaseFileChanger{Path: path, ConnectionID: connectionID}}) if err != nil { t.Fatal(err) } @@ -769,7 +769,7 @@ func TestHandlers_Requiring_Signature(t *testing.T) { q := url.Query() formFieldByt, err := json.Marshal( &allocation.UploadFileChanger{ - BaseFileChanger: allocation.BaseFileChanger{Path: path, Size: size}}) + BaseFileChanger: allocation.BaseFileChanger{Path: path, Size: size, ConnectionID: connectionID}}) if err != nil { t.Fatal(err) } diff --git a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go index 0e2cd57af..a9ae2c411 100644 --- a/code/go/0chain.net/blobbercore/handler/object_operation_handler.go +++ b/code/go/0chain.net/blobbercore/handler/object_operation_handler.go @@ -670,8 +670,6 @@ func (fsh *StorageHandler) CommitWrite(ctx context.Context, r *http.Request) (*b return nil, common.NewError("write_marker_error", "Error redeeming the write marker") } - result.Changes = connectionObj.Changes - connectionObj.DeleteChanges(ctx) db.Model(connectionObj).Updates(allocation.AllocationChangeCollector{Status: allocation.CommittedConnection}) @@ -784,7 +782,7 @@ func (fsh *StorageHandler) RenameObject(ctx context.Context, r *http.Request) (i return nil, common.NewError("connection_write_error", "Error writing the connection meta data") } - result := &blobberhttp.UploadResult{} + result := &allocation.UploadResult{} result.Filename = new_name result.Hash = objectRef.Hash result.ValidationRoot = objectRef.ValidationRoot @@ -894,7 +892,7 @@ func (fsh *StorageHandler) CopyObject(ctx context.Context, r *http.Request) (int return nil, common.NewError("connection_write_error", "Error writing the connection meta data") } - result := &blobberhttp.UploadResult{} + result := &allocation.UploadResult{} result.Filename = objectRef.Name result.Hash = objectRef.Hash result.ValidationRoot = objectRef.ValidationRoot @@ -1009,7 +1007,7 @@ func (fsh *StorageHandler) MoveObject(ctx context.Context, r *http.Request) (int return nil, common.NewError("connection_write_error", "Error writing the connection meta data") } - result := &blobberhttp.UploadResult{} + result := &allocation.UploadResult{} result.Filename = objectRef.Name result.Hash = objectRef.Hash result.ValidationRoot = objectRef.ValidationRoot @@ -1018,7 +1016,7 @@ func (fsh *StorageHandler) MoveObject(ctx context.Context, r *http.Request) (int return result, nil } -func (fsh *StorageHandler) DeleteFile(ctx context.Context, r *http.Request, connectionObj *allocation.AllocationChangeCollector) (*blobberhttp.UploadResult, error) { +func (fsh *StorageHandler) DeleteFile(ctx context.Context, r *http.Request, connectionObj *allocation.AllocationChangeCollector) (*allocation.UploadResult, error) { path := r.FormValue("path") if path == "" { @@ -1046,7 +1044,7 @@ func (fsh *StorageHandler) DeleteFile(ctx context.Context, r *http.Request, conn connectionObj.AddChange(allocationChange, dfc) - result := &blobberhttp.UploadResult{} + result := &allocation.UploadResult{} result.Filename = fileRef.Name result.Hash = fileRef.Hash result.ValidationRoot = fileRef.ValidationRoot @@ -1059,7 +1057,7 @@ func (fsh *StorageHandler) DeleteFile(ctx context.Context, r *http.Request, conn return nil, common.NewError("invalid_file", "File does not exist at path") } -func (fsh *StorageHandler) CreateDir(ctx context.Context, r *http.Request) (*blobberhttp.UploadResult, error) { +func (fsh *StorageHandler) CreateDir(ctx context.Context, r *http.Request) (*allocation.UploadResult, error) { allocationId := ctx.Value(constants.ContextKeyAllocationID).(string) allocationTx := ctx.Value(constants.ContextKeyAllocation).(string) clientID := ctx.Value(constants.ContextKeyClient).(string) @@ -1089,7 +1087,7 @@ func (fsh *StorageHandler) CreateDir(ctx context.Context, r *http.Request) (*blo Logger.Error("Error file reference", zap.Error(err)) } - result := &blobberhttp.UploadResult{ + result := &allocation.UploadResult{ Filename: dirPath, } @@ -1147,10 +1145,8 @@ func (fsh *StorageHandler) CreateDir(ctx context.Context, r *http.Request) (*blo } // WriteFile stores the file into the blobber files system from the HTTP request -func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*blobberhttp.UploadResult, error) { - +func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*allocation.UploadResult, error) { startTime := time.Now() - if r.Method == "GET" { return nil, common.NewError("invalid_method", "Invalid method used for the upload URL. Use multi-part form POST / PUT / DELETE / PATCH instead") } @@ -1158,13 +1154,27 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*blo allocationId := ctx.Value(constants.ContextKeyAllocationID).(string) allocationTx := ctx.Value(constants.ContextKeyAllocation).(string) clientID := ctx.Value(constants.ContextKeyClient).(string) + connectionID, ok := common.GetField(r, "connection_id") + if !ok { + return nil, common.NewError("invalid_parameters", "Invalid connection id passed") + } + elapsedParseForm := time.Since(startTime) + st := time.Now() + connectionProcessor := allocation.GetConnectionProcessor(connectionID) + if connectionProcessor == nil { + connectionProcessor = allocation.CreateConnectionProcessor(connectionID) + } + + elapsedGetConnectionProcessor := time.Since(st) + st = time.Now() allocationObj, err := fsh.verifyAllocation(ctx, allocationId, allocationTx, false) if err != nil { return nil, common.NewError("invalid_parameters", "Invalid allocation id passed."+err.Error()) } + connectionProcessor.ClientID = clientID - elapsedAllocation := time.Since(startTime) + elapsedAllocation := time.Since(st) if r.Method == http.MethodPost && !allocationObj.CanUpload() { return nil, common.NewError("prohibited_allocation_file_options", "Cannot upload data to this allocation.") @@ -1178,18 +1188,7 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*blo return nil, common.NewError("prohibited_allocation_file_options", "Cannot delete data in this allocation.") } - st := time.Now() - allocationID := allocationObj.ID - cmd := createFileCommand(r) - err = cmd.IsValidated(ctx, r, allocationObj, clientID) - - if err != nil { - return nil, err - } - - elapsedValidate := time.Since(st) st = time.Now() - publicKey := allocationObj.OwnerPublicKey valid, err := verifySignatureFromRequest(allocationTx, r.Header.Get(common.ClientSignatureHeader), publicKey) @@ -1198,66 +1197,31 @@ func (fsh *StorageHandler) WriteFile(ctx context.Context, r *http.Request) (*blo return nil, common.NewError("invalid_signature", "Invalid signature") } - connectionID, ok := common.GetField(r, "connection_id") - if !ok { - return nil, common.NewError("invalid_parameters", "Invalid connection id passed") - } - - elapsedRef := time.Since(st) - st = time.Now() - - connectionObj, err := allocation.GetAllocationChanges(ctx, connectionID, allocationID, clientID) - if err != nil { - return nil, common.NewError("meta_error", "Error reading metadata for connection") - } - - elapsedAllocationChanges := time.Since(st) - - Logger.Info("[upload] Processing content for allocation and connection", - zap.String("allocationID", allocationID), - zap.String("connectionID", connectionID), - ) - st = time.Now() - result, err := cmd.ProcessContent(ctx, r, allocationObj, connectionObj) - - if err != nil { - return nil, err + if clientID == "" { + return nil, common.NewError("invalid_operation", "Operation needs to be performed by the owner or the payer of the allocation") } - Logger.Info("[upload] Content processed for allocation and connection", - zap.String("allocationID", allocationID), - zap.String("connectionID", connectionID), - ) - - err = cmd.ProcessThumbnail(ctx, r, allocationObj, connectionObj) + elapsedVerifySig := time.Since(st) + allocationID := allocationObj.ID + cmd := createFileCommand(r) + err = cmd.IsValidated(ctx, r, allocationObj, clientID) if err != nil { return nil, err } - - elapsedProcess := time.Since(st) - st = time.Now() - err = cmd.UpdateChange(ctx, connectionObj) - - if err != nil { - Logger.Error("Error in writing the connection meta data", zap.Error(err)) - return nil, common.NewError("connection_write_error", err.Error()) //"Error writing the connection meta data") + result := allocation.UploadResult{ + Filename: cmd.GetPath(), } - - elapsedUpdateChange := time.Since(st) - Logger.Info("[upload]elapsed", zap.String("alloc_id", allocationID), zap.String("file", cmd.GetPath()), + zap.Duration("parse_form", elapsedParseForm), + zap.Duration("get_processor", elapsedGetConnectionProcessor), zap.Duration("get_alloc", elapsedAllocation), - zap.Duration("validate", elapsedValidate), - zap.Duration("ref", elapsedRef), - zap.Duration("load_changes", elapsedAllocationChanges), - zap.Duration("process", elapsedProcess), - zap.Duration("update_changes", elapsedUpdateChange), - zap.Duration("total", time.Since(startTime)), - ) - + zap.Duration("sig", elapsedVerifySig), + zap.Duration("validate", time.Since(st)), + zap.Duration("total", time.Since(startTime))) return &result, nil + } func sanitizeString(input string) string { diff --git a/code/go/0chain.net/core/common/request_form.go b/code/go/0chain.net/core/common/request_form.go index c5d91a867..4bdfe933a 100644 --- a/code/go/0chain.net/core/common/request_form.go +++ b/code/go/0chain.net/core/common/request_form.go @@ -3,7 +3,7 @@ package common import "net/http" const ( - FormFileParseMaxMemory = 10 * 1024 * 1024 + FormFileParseMaxMemory = 32 * 1024 * 1024 ) // TryParseForm try populates r.Form and r.PostForm.