diff --git a/code/go/0chain.net/blobbercore/allocation/connection.go b/code/go/0chain.net/blobbercore/allocation/connection.go index 612d7b141..ae366b768 100644 --- a/code/go/0chain.net/blobbercore/allocation/connection.go +++ b/code/go/0chain.net/blobbercore/allocation/connection.go @@ -4,6 +4,8 @@ import ( "context" "sync" "time" + + "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" ) var ( @@ -15,18 +17,23 @@ var ( var ( connectionObjSizeMap = make(map[string]*ConnectionObjSize) - connectionObjMutex sync.Mutex + connectionObjMutex sync.RWMutex ) type ConnectionObjSize struct { Size int64 UpdatedAt time.Time + Changes map[string]*ConnectionChanges +} + +type ConnectionChanges struct { + Hasher *filestore.CommitHasher } // GetConnectionObjSize gets the connection size from the memory func GetConnectionObjSize(connectionID string) int64 { - connectionObjMutex.Lock() - defer connectionObjMutex.Unlock() + connectionObjMutex.RLock() + defer connectionObjMutex.RUnlock() connectionObjSize := connectionObjSizeMap[connectionID] if connectionObjSize == nil { return 0 @@ -43,6 +50,7 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) { connectionObjSizeMap[connectionID] = &ConnectionObjSize{ Size: addSize, UpdatedAt: time.Now(), + Changes: make(map[string]*ConnectionChanges), } return } @@ -51,6 +59,34 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) { connectionObjSize.UpdatedAt = time.Now() } +func GetHasher(connectionID, pathHash string) *filestore.CommitHasher { + connectionObjMutex.RLock() + defer connectionObjMutex.RUnlock() + connectionObj := connectionObjSizeMap[connectionID] + if connectionObj == nil { + return nil + } + if connectionObj.Changes[pathHash] == nil { + return nil + } + return connectionObj.Changes[pathHash].Hasher +} + +func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) { + connectionObjMutex.Lock() + defer connectionObjMutex.Unlock() + connectionObj := connectionObjSizeMap[connectionID] + if connectionObj == nil { + connectionObjSizeMap[connectionID] = &ConnectionObjSize{ + UpdatedAt: time.Now(), + Changes: make(map[string]*ConnectionChanges), + } + } + connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{ + Hasher: hasher, + } +} + // DeleteConnectionObjEntry remove the connectionID entry from map // If the given connectionID is not present, then it is no-op. func DeleteConnectionObjEntry(connectionID string) { diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go index 698b59974..c8e733dde 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_base.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_base.go @@ -5,6 +5,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" ) // BaseFileChanger base file change processor @@ -92,6 +93,10 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error { fileInputData.ValidationRoot = fc.ValidationRoot fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot fileInputData.ChunkSize = fc.ChunkSize + fileInputData.Hasher = GetHasher(fc.ConnectionID, encryption.Hash(fc.Path)) + if fileInputData.Hasher == nil { + return common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for commit.") + } _, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData) if err != nil { return common.NewError("file_store_error", "Error committing to file store. "+err.Error()) diff --git a/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go b/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go index f75c8624f..6ee435013 100644 --- a/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go +++ b/code/go/0chain.net/blobbercore/allocation/file_changer_upload_test.go @@ -14,6 +14,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/gosdk/constants" "github.com/0chain/gosdk/core/zcncrypto" @@ -98,6 +99,9 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) { ctx := datastore.GetStore().CreateTransaction(context.TODO()) fPath := "/new" + hasher := filestore.GetNewCommitHasher(2310) + pathHash := encryption.Hash(fPath) + UpdateConnectionObjWithHasher("connection_id", pathHash, hasher) change := &UploadFileChanger{ BaseFileChanger: BaseFileChanger{ Filename: filepath.Base(fPath), @@ -107,6 +111,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) { ValidationRoot: tc.validationRoot, Size: 2310, ChunkSize: 65536, + ConnectionID: "connection_id", }, } diff --git a/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go b/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go index d6e2238e6..4452299ef 100644 --- a/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go +++ b/code/go/0chain.net/blobbercore/allocation/updatefilechange_test.go @@ -9,6 +9,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/gosdk/core/zcncrypto" "github.com/0chain/gosdk/zboxcore/client" @@ -337,6 +338,9 @@ func TestBlobberCore_UpdateFile(t *testing.T) { tc.setupDbMock() ctx := datastore.GetStore().CreateTransaction(context.TODO()) + hasher := filestore.GetNewCommitHasher(2310) + pathHash := encryption.Hash(tc.path) + UpdateConnectionObjWithHasher("connection_id", pathHash, hasher) change := &UpdateFileChanger{ BaseFileChanger: BaseFileChanger{ @@ -352,6 +356,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) { ThumbnailSize: 92, ChunkSize: 65536, IsFinal: true, + ConnectionID: "connection_id", }, } diff --git a/code/go/0chain.net/blobbercore/filestore/storage.go b/code/go/0chain.net/blobbercore/filestore/storage.go index 56cc92979..0a1781d13 100644 --- a/code/go/0chain.net/blobbercore/filestore/storage.go +++ b/code/go/0chain.net/blobbercore/filestore/storage.go @@ -59,7 +59,7 @@ const ( ) func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) { - tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, encryption.Hash(fileData.Path), conID) + tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID) var initialSize int64 finfo, err := os.Stat(tempFilePath) if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -73,7 +73,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i return nil, common.NewError("dir_creation_error", err.Error()) } - f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY, 0644) + f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644) if err != nil { return nil, common.NewError("file_open_error", err.Error()) } @@ -88,6 +88,17 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i if err != nil { return nil, common.NewError("file_write_error", err.Error()) } + if !fileData.IsThumbnail { + _, err = f.Seek(fileData.UploadOffset, io.SeekStart) + if err != nil { + return nil, common.NewError("file_seek_error", err.Error()) + } + + _, err = io.CopyBuffer(fileData.Hasher, f, buf) + if err != nil { + return nil, common.NewError("file_read_error", err.Error()) + } + } finfo, err = f.Stat() if err != nil { @@ -243,27 +254,18 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData) } fileSize := rStat.Size() - hasher := GetNewCommitHasher(fileSize) bufSize := BufferSize if fileSize < BufferSize { bufSize = int(fileSize) } buffer := make([]byte, bufSize) - _, err = io.CopyBuffer(hasher, r, buffer) - if err != nil { - return false, common.NewError("read_write_error", err.Error()) - } - err = hasher.Finalize() - if err != nil { - return false, common.NewError("finalize_error", err.Error()) - } - fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(f) + fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(f) if err != nil { return false, common.NewError("fmt_hash_calculation_error", err.Error()) } - validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(f) + validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(f) if err != nil { return false, common.NewError("validation_hash_calculation_error", err.Error()) } @@ -279,12 +281,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData) "calculated validation root does not match with client's validation root") } - _, err = r.Seek(0, io.SeekStart) - if err != nil { - return false, common.NewError("seek_error", err.Error()) - } - - _, err = io.Copy(f, r) + _, err = io.CopyBuffer(f, r, buffer) if err != nil { return false, common.NewError("write_error", err.Error()) } diff --git a/code/go/0chain.net/blobbercore/filestore/store.go b/code/go/0chain.net/blobbercore/filestore/store.go index 481395fe8..9d4a614e7 100644 --- a/code/go/0chain.net/blobbercore/filestore/store.go +++ b/code/go/0chain.net/blobbercore/filestore/store.go @@ -20,8 +20,10 @@ type FileInputData struct { //Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer. UploadOffset int64 //IsFinal the request is final chunk - IsFinal bool - IsThumbnail bool + IsFinal bool + IsThumbnail bool + FilePathHash string + Hasher *CommitHasher } type FileOutputData struct { diff --git a/code/go/0chain.net/blobbercore/filestore/store_test.go b/code/go/0chain.net/blobbercore/filestore/store_test.go index 340da9581..bcef5f2ec 100644 --- a/code/go/0chain.net/blobbercore/filestore/store_test.go +++ b/code/go/0chain.net/blobbercore/filestore/store_test.go @@ -248,23 +248,26 @@ func TestStoreStorageWriteAndCommit(t *testing.T) { size := 640 * KB validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size)) require.Nil(t, err) - + pathHash := encryption.Hash(test.remotePath) + hasher := GetNewCommitHasher(int64(size)) fid := &FileInputData{ Name: test.fileName, Path: test.remotePath, ValidationRoot: validationRoot, FixedMerkleRoot: fixedMerkleRoot, ChunkSize: 64 * KB, + FilePathHash: pathHash, + Hasher: hasher, } f, err := os.Open(fPath) require.Nil(t, err) defer f.Close() - _, err = fs.WriteFile(test.allocID, test.connID, fid, f) require.Nil(t, err) + err = hasher.Finalize() + require.Nil(t, err) - pathHash := encryption.Hash(test.remotePath) tempFilePath := fs.getTempPathForFile(test.allocID, test.fileName, pathHash, test.connID) tF, err := os.Stat(tempFilePath) require.Nil(t, err) @@ -329,13 +332,16 @@ func TestDeletePreCommitDir(t *testing.T) { size := 640 * KB validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size)) require.Nil(t, err) - + pathHash := encryption.Hash(remotePath) + hasher := GetNewCommitHasher(int64(size)) fid := &FileInputData{ Name: fileName, Path: remotePath, ValidationRoot: validationRoot, FixedMerkleRoot: fixedMerkleRoot, ChunkSize: 64 * KB, + FilePathHash: pathHash, + Hasher: hasher, } // checkc if file to be uploaded exists f, err := os.Open(fPath) @@ -344,9 +350,9 @@ func TestDeletePreCommitDir(t *testing.T) { _, err = fs.WriteFile(allocID, connID, fid, f) require.Nil(t, err) f.Close() - + err = hasher.Finalize() + require.Nil(t, err) // check if file is written to temp location - pathHash := encryption.Hash(remotePath) tempFilePath := fs.getTempPathForFile(allocID, fileName, pathHash, connID) tF, err := os.Stat(tempFilePath) require.Nil(t, err) @@ -368,6 +374,8 @@ func TestDeletePreCommitDir(t *testing.T) { fid.ValidationRoot = validationRoot fid.FixedMerkleRoot = fixedMerkleRoot + hasher = GetNewCommitHasher(int64(size)) + fid.Hasher = hasher // Write file to temp location f, err = os.Open(fPath) @@ -379,6 +387,8 @@ func TestDeletePreCommitDir(t *testing.T) { tempFilePath = fs.getTempPathForFile(allocID, fileName, pathHash, connID) _, err = os.Stat(tempFilePath) require.Nil(t, err) + err = hasher.Finalize() + require.Nil(t, err) success, err = fs.CommitWrite(allocID, connID, fid) require.Nil(t, err) @@ -422,13 +432,16 @@ func TestStorageUploadUpdate(t *testing.T) { size := 640 * KB validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size)) require.Nil(t, err) - + pathHash := encryption.Hash(remotePath) + hasher := GetNewCommitHasher(int64(size)) fid := &FileInputData{ Name: fileName, Path: remotePath, ValidationRoot: validationRoot, FixedMerkleRoot: fixedMerkleRoot, ChunkSize: 64 * KB, + FilePathHash: pathHash, + Hasher: hasher, } // checkc if file to be uploaded exists f, err := os.Open(fPath) @@ -437,9 +450,9 @@ func TestStorageUploadUpdate(t *testing.T) { _, err = fs.WriteFile(allocID, connID, fid, f) require.Nil(t, err) f.Close() - + err = hasher.Finalize() + require.Nil(t, err) // check if file is written to temp location - pathHash := encryption.Hash(remotePath) tempFilePath := fs.getTempPathForFile(allocID, fileName, pathHash, connID) tF, err := os.Stat(tempFilePath) require.Nil(t, err) diff --git a/code/go/0chain.net/blobbercore/filestore/tree_validation.go b/code/go/0chain.net/blobbercore/filestore/tree_validation.go index 33ae0eafe..b44145707 100644 --- a/code/go/0chain.net/blobbercore/filestore/tree_validation.go +++ b/code/go/0chain.net/blobbercore/filestore/tree_validation.go @@ -400,21 +400,25 @@ func getNewValidationTree(dataSize int64) *validationTree { // commitHasher is used to calculate and store tree nodes for fixed merkle tree and // validation tree when client commits file with the writemarker. -type commitHasher struct { +type CommitHasher struct { fmt *fixedMerkleTree vt *validationTree isInitialized bool } -func GetNewCommitHasher(dataSize int64) *commitHasher { - c := new(commitHasher) +func GetNewCommitHasher(dataSize int64) *CommitHasher { + c := new(CommitHasher) c.fmt = getNewFixedMerkleTree() c.vt = getNewValidationTree(dataSize) c.isInitialized = true return c } -func (c *commitHasher) Write(b []byte) (int, error) { +func (c *CommitHasher) Write(b []byte) (int, error) { + if !c.isInitialized || c.fmt == nil || c.vt == nil { + return 0, errors.New("commit hasher is not initialized") + } + var ( wg sync.WaitGroup errChan = make(chan error, 2) @@ -445,7 +449,7 @@ func (c *commitHasher) Write(b []byte) (int, error) { return n, nil } -func (c *commitHasher) Finalize() error { +func (c *CommitHasher) Finalize() error { var ( wg sync.WaitGroup errChan = make(chan error, 2) @@ -473,10 +477,10 @@ func (c *commitHasher) Finalize() error { return nil } -func (c *commitHasher) GetFixedMerkleRoot() string { +func (c *CommitHasher) GetFixedMerkleRoot() string { return c.fmt.GetMerkleRoot() } -func (c *commitHasher) GetValidationMerkleRoot() string { +func (c *CommitHasher) GetValidationMerkleRoot() string { return hex.EncodeToString(c.vt.GetValidationRoot()) } diff --git a/code/go/0chain.net/blobbercore/handler/file_command_update.go b/code/go/0chain.net/blobbercore/handler/file_command_update.go index fd8b7c1d8..e62e2db4e 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_update.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_update.go @@ -11,6 +11,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" "github.com/0chain/blobber/code/go/0chain.net/core/logging" sdkConst "github.com/0chain/gosdk/constants" "github.com/0chain/gosdk/zboxcore/fileref" @@ -94,17 +95,42 @@ func (cmd *UpdateFileCommand) ProcessContent(ctx context.Context, req *http.Requ cmd.reloadChange(connectionObj) + if cmd.fileChanger.Size == 0 { + return result, common.NewError("invalid_parameters", "Invalid parameters. Size cannot be zero") + } + + var hasher *filestore.CommitHasher + filePathHash := encryption.Hash(cmd.fileChanger.Path) + if cmd.fileChanger.UploadOffset == 0 { + hasher = filestore.GetNewCommitHasher(cmd.fileChanger.Size) + allocation.UpdateConnectionObjWithHasher(connectionObj.ID, filePathHash, hasher) + } else { + hasher = allocation.GetHasher(connectionObj.ID, filePathHash) + if hasher == nil { + return result, common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for upload.") + } + } + fileInputData := &filestore.FileInputData{ Name: cmd.fileChanger.Filename, Path: cmd.fileChanger.Path, UploadOffset: cmd.fileChanger.UploadOffset, IsFinal: cmd.fileChanger.IsFinal, + FilePathHash: filePathHash, + Hasher: hasher, } fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, fileInputData, origfile) if err != nil { return result, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) } + if cmd.fileChanger.IsFinal { + err = hasher.Finalize() + if err != nil { + return result, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) + } + } + result.ValidationRoot = fileOutputData.ValidationRoot result.FixedMerkleRoot = fileOutputData.FixedMerkleRoot result.Size = fileOutputData.Size @@ -121,7 +147,7 @@ func (cmd *UpdateFileCommand) ProcessContent(ctx context.Context, req *http.Requ } cmd.fileChanger.AllocationID = allocationObj.ID - cmd.fileChanger.Size += fileOutputData.Size + // cmd.fileChanger.Size += fileOutputData.Size cmd.allocationChange = &allocation.AllocationChange{} cmd.allocationChange.ConnectionID = connectionObj.ID @@ -145,7 +171,7 @@ func (cmd *UpdateFileCommand) ProcessThumbnail(ctx context.Context, req *http.Re if thumbHeader != nil { defer thumbfile.Close() - thumbInputData := &filestore.FileInputData{Name: thumbHeader.Filename, Path: cmd.fileChanger.Path} + thumbInputData := &filestore.FileInputData{Name: thumbHeader.Filename, Path: cmd.fileChanger.Path, IsThumbnail: true, FilePathHash: encryption.Hash(cmd.fileChanger.Path)} thumbOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, thumbInputData, thumbfile) if err != nil { return common.NewError("upload_error", "Failed to upload the thumbnail. "+err.Error()) diff --git a/code/go/0chain.net/blobbercore/handler/file_command_upload.go b/code/go/0chain.net/blobbercore/handler/file_command_upload.go index 101d5ef79..b639bc6c2 100644 --- a/code/go/0chain.net/blobbercore/handler/file_command_upload.go +++ b/code/go/0chain.net/blobbercore/handler/file_command_upload.go @@ -13,6 +13,7 @@ import ( "github.com/0chain/blobber/code/go/0chain.net/blobbercore/blobberhttp" "github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore" "github.com/0chain/blobber/code/go/0chain.net/core/common" + "github.com/0chain/blobber/code/go/0chain.net/core/encryption" "github.com/0chain/blobber/code/go/0chain.net/core/logging" "github.com/0chain/gosdk/constants" "github.com/0chain/gosdk/zboxcore/fileref" @@ -110,6 +111,22 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Requ cmd.reloadChange(connectionObj) + var hasher *filestore.CommitHasher + filePathHash := encryption.Hash(cmd.fileChanger.Path) + if cmd.fileChanger.Size == 0 { + return result, common.NewError("invalid_parameters", "Invalid parameters. Size cannot be zero") + } + fmt.Println("cmd.fileChanger.Size", cmd.fileChanger.Size) + if cmd.fileChanger.UploadOffset == 0 { + hasher = filestore.GetNewCommitHasher(cmd.fileChanger.Size) + allocation.UpdateConnectionObjWithHasher(connectionObj.ID, filePathHash, hasher) + } else { + hasher = allocation.GetHasher(connectionObj.ID, filePathHash) + if hasher == nil { + return result, common.NewError("invalid_parameters", "Error getting hasher for upload.") + } + } + fileInputData := &filestore.FileInputData{ Name: cmd.fileChanger.Filename, Path: cmd.fileChanger.Path, @@ -117,10 +134,19 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Requ ChunkSize: cmd.fileChanger.ChunkSize, UploadOffset: cmd.fileChanger.UploadOffset, IsFinal: cmd.fileChanger.IsFinal, + FilePathHash: filePathHash, + Hasher: hasher, } fileOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, fileInputData, origfile) if err != nil { - return result, common.NewError("upload_error", "Failed to upload the file. "+err.Error()) + return result, common.NewError("upload_error", "Failed to write file. "+err.Error()) + } + + if cmd.fileChanger.IsFinal { + err = hasher.Finalize() + if err != nil { + return result, common.NewError("upload_error", "Failed to finalize the hasher. "+err.Error()) + } } result.Filename = cmd.fileChanger.Filename @@ -140,7 +166,7 @@ func (cmd *UploadFileCommand) ProcessContent(ctx context.Context, req *http.Requ } cmd.fileChanger.AllocationID = allocationObj.ID - cmd.fileChanger.Size += fileOutputData.Size + // cmd.fileChanger.Size += fileOutputData.Size cmd.allocationChange = &allocation.AllocationChange{} cmd.allocationChange.ConnectionID = connectionObj.ID @@ -159,7 +185,7 @@ func (cmd *UploadFileCommand) ProcessThumbnail(ctx context.Context, req *http.Re if thumbHeader != nil { defer thumbfile.Close() - thumbInputData := &filestore.FileInputData{Name: thumbHeader.Filename, Path: cmd.fileChanger.Path} + thumbInputData := &filestore.FileInputData{Name: thumbHeader.Filename, Path: cmd.fileChanger.Path, IsThumbnail: true, FilePathHash: encryption.Hash(cmd.fileChanger.Path)} thumbOutputData, err := filestore.GetFileStore().WriteFile(allocationObj.ID, connectionObj.ID, thumbInputData, thumbfile) if err != nil { return common.NewError("upload_error", "Failed to upload the thumbnail. "+err.Error()) diff --git a/code/go/0chain.net/blobbercore/handler/handler_test.go b/code/go/0chain.net/blobbercore/handler/handler_test.go index 69f7de720..1c7343e20 100644 --- a/code/go/0chain.net/blobbercore/handler/handler_test.go +++ b/code/go/0chain.net/blobbercore/handler/handler_test.go @@ -756,11 +756,20 @@ func TestHandlers_Requiring_Signature(t *testing.T) { if err != nil { t.Fatal() } - + root, _ := os.Getwd() + file, err := os.Open(root + "/handler_test.go") + if err != nil { + t.Fatal(err) + } + stat, err := file.Stat() + if err != nil { + t.Fatal(err) + } + size := stat.Size() q := url.Query() formFieldByt, err := json.Marshal( &allocation.UploadFileChanger{ - BaseFileChanger: allocation.BaseFileChanger{Path: path}}) + BaseFileChanger: allocation.BaseFileChanger{Path: path, Size: size}}) if err != nil { t.Fatal(err) } @@ -772,20 +781,15 @@ func TestHandlers_Requiring_Signature(t *testing.T) { body := bytes.NewBuffer(nil) formWriter := multipart.NewWriter(body) - root, _ := os.Getwd() - file, err := os.Open(root + "/handler_test.go") - if err != nil { - t.Fatal(err) - } fileField, err := formWriter.CreateFormFile("uploadFile", file.Name()) if err != nil { t.Fatal(err) } - fileB := make([]byte, 0) - if _, err := io.ReadFull(file, fileB); err != nil { + data, err := io.ReadAll(file) + if err != nil { t.Fatal(err) } - if _, err := fileField.Write(fileB); err != nil { + if _, err := fileField.Write(data); err != nil { t.Fatal(err) } if err := formWriter.Close(); err != nil { diff --git a/code/go/0chain.net/blobbercore/writemarker/protocol.go b/code/go/0chain.net/blobbercore/writemarker/protocol.go index cfe950b40..b0745aae0 100644 --- a/code/go/0chain.net/blobbercore/writemarker/protocol.go +++ b/code/go/0chain.net/blobbercore/writemarker/protocol.go @@ -57,6 +57,10 @@ func (wme *WriteMarkerEntity) VerifyMarker(ctx context.Context, dbAllocation *al return common.NewError("write_marker_validation_failed", "Signature exceeds maximum length") } + if wme.WM.AllocationRoot == dbAllocation.AllocationRoot { + return common.NewError("write_marker_validation_failed", "Write Marker allocation root is the same as the allocation root on record") + } + if wme.WM.PreviousAllocationRoot != dbAllocation.AllocationRoot { return common.NewError("invalid_write_marker", "Invalid write marker. Prev Allocation root does not match the allocation root on record") } @@ -86,7 +90,7 @@ func (wme *WriteMarkerEntity) VerifyMarker(ctx context.Context, dbAllocation *al } currTime := common.Now() - // blobber clock is allowed to be 10 seconds behind the current time + // blobber clock is allowed to be 60 seconds behind the current time if wme.WM.Timestamp > currTime+60 { return common.NewError("write_marker_validation_failed", "Write Marker timestamp is in the future") } @@ -185,6 +189,10 @@ func (wme *WriteMarkerEntity) VerifyRollbackMarker(ctx context.Context, dbAlloca return common.NewError("empty write_marker_validation_failed", fmt.Sprintf("Write Marker size is %v but should be 0", wme.WM.Size)) } + if wme.WM.AllocationRoot == dbAllocation.AllocationRoot { + return common.NewError("write_marker_validation_failed", "Write Marker allocation root is the same as the allocation root on record") + } + if wme.WM.AllocationRoot != latestWM.WM.PreviousAllocationRoot { return common.NewError("write_marker_validation_failed", fmt.Sprintf("Write Marker allocation root %v does not match the previous allocation root of latest write marker %v", wme.WM.AllocationRoot, latestWM.WM.PreviousAllocationRoot)) }