Skip to content

Commit

Permalink
Merge branch 'sprint-1.10' into replace-with-multiops
Browse files Browse the repository at this point in the history
  • Loading branch information
din-mukhammed authored Aug 23, 2023
2 parents f23ebd2 + 090e715 commit 5e2a551
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 56 deletions.
42 changes: 39 additions & 3 deletions code/go/0chain.net/blobbercore/allocation/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
)

var (
Expand All @@ -15,18 +17,23 @@ var (

var (
connectionObjSizeMap = make(map[string]*ConnectionObjSize)
connectionObjMutex sync.Mutex
connectionObjMutex sync.RWMutex
)

type ConnectionObjSize struct {
Size int64
UpdatedAt time.Time
Changes map[string]*ConnectionChanges
}

type ConnectionChanges struct {
Hasher *filestore.CommitHasher
}

// GetConnectionObjSize gets the connection size from the memory
func GetConnectionObjSize(connectionID string) int64 {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObjSize := connectionObjSizeMap[connectionID]
if connectionObjSize == nil {
return 0
Expand All @@ -43,6 +50,7 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
Size: addSize,
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
return
}
Expand All @@ -51,6 +59,34 @@ func UpdateConnectionObjSize(connectionID string, addSize int64) {
connectionObjSize.UpdatedAt = time.Now()
}

func GetHasher(connectionID, pathHash string) *filestore.CommitHasher {
connectionObjMutex.RLock()
defer connectionObjMutex.RUnlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
return nil
}
if connectionObj.Changes[pathHash] == nil {
return nil
}
return connectionObj.Changes[pathHash].Hasher
}

func UpdateConnectionObjWithHasher(connectionID, pathHash string, hasher *filestore.CommitHasher) {
connectionObjMutex.Lock()
defer connectionObjMutex.Unlock()
connectionObj := connectionObjSizeMap[connectionID]
if connectionObj == nil {
connectionObjSizeMap[connectionID] = &ConnectionObjSize{
UpdatedAt: time.Now(),
Changes: make(map[string]*ConnectionChanges),
}
}
connectionObjSizeMap[connectionID].Changes[pathHash] = &ConnectionChanges{
Hasher: hasher,
}
}

// DeleteConnectionObjEntry remove the connectionID entry from map
// If the given connectionID is not present, then it is no-op.
func DeleteConnectionObjEntry(connectionID string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
)

// BaseFileChanger base file change processor
Expand Down Expand Up @@ -92,6 +93,10 @@ func (fc *BaseFileChanger) CommitToFileStore(ctx context.Context) error {
fileInputData.ValidationRoot = fc.ValidationRoot
fileInputData.FixedMerkleRoot = fc.FixedMerkleRoot
fileInputData.ChunkSize = fc.ChunkSize
fileInputData.Hasher = GetHasher(fc.ConnectionID, encryption.Hash(fc.Path))
if fileInputData.Hasher == nil {
return common.NewError("invalid_parameters", "Invalid parameters. Error getting hasher for commit.")
}
_, err := filestore.GetFileStore().CommitWrite(fc.AllocationID, fc.ConnectionID, fileInputData)
if err != nil {
return common.NewError("file_store_error", "Error committing to file store. "+err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/0chain/blobber/code/go/0chain.net/blobbercore/datastore"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/constants"
"github.com/0chain/gosdk/core/zcncrypto"
Expand Down Expand Up @@ -98,6 +99,9 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ctx := datastore.GetStore().CreateTransaction(context.TODO())

fPath := "/new"
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(fPath)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)
change := &UploadFileChanger{
BaseFileChanger: BaseFileChanger{
Filename: filepath.Base(fPath),
Expand All @@ -107,6 +111,7 @@ func TestBlobberCore_FileChangerUpload(t *testing.T) {
ValidationRoot: tc.validationRoot,
Size: 2310,
ChunkSize: 65536,
ConnectionID: "connection_id",
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/filestore"
"github.com/0chain/blobber/code/go/0chain.net/blobbercore/reference"
"github.com/0chain/blobber/code/go/0chain.net/core/common"
"github.com/0chain/blobber/code/go/0chain.net/core/encryption"
"github.com/0chain/blobber/code/go/0chain.net/core/logging"
"github.com/0chain/gosdk/core/zcncrypto"
"github.com/0chain/gosdk/zboxcore/client"
Expand Down Expand Up @@ -337,6 +338,9 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
tc.setupDbMock()

ctx := datastore.GetStore().CreateTransaction(context.TODO())
hasher := filestore.GetNewCommitHasher(2310)
pathHash := encryption.Hash(tc.path)
UpdateConnectionObjWithHasher("connection_id", pathHash, hasher)

change := &UpdateFileChanger{
BaseFileChanger: BaseFileChanger{
Expand All @@ -352,6 +356,7 @@ func TestBlobberCore_UpdateFile(t *testing.T) {
ThumbnailSize: 92,
ChunkSize: 65536,
IsFinal: true,
ConnectionID: "connection_id",
},
}

Expand Down
35 changes: 16 additions & 19 deletions code/go/0chain.net/blobbercore/filestore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const (
)

func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, infile multipart.File) (*FileOutputData, error) {
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, encryption.Hash(fileData.Path), conID)
tempFilePath := fs.getTempPathForFile(allocID, fileData.Name, fileData.FilePathHash, conID)
var initialSize int64
finfo, err := os.Stat(tempFilePath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
Expand All @@ -73,7 +73,7 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
return nil, common.NewError("dir_creation_error", err.Error())
}

f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_WRONLY, 0644)
f, err := os.OpenFile(tempFilePath, os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, common.NewError("file_open_error", err.Error())
}
Expand All @@ -88,6 +88,17 @@ func (fs *FileStore) WriteFile(allocID, conID string, fileData *FileInputData, i
if err != nil {
return nil, common.NewError("file_write_error", err.Error())
}
if !fileData.IsThumbnail {
_, err = f.Seek(fileData.UploadOffset, io.SeekStart)
if err != nil {
return nil, common.NewError("file_seek_error", err.Error())
}

_, err = io.CopyBuffer(fileData.Hasher, f, buf)
if err != nil {
return nil, common.NewError("file_read_error", err.Error())
}
}

finfo, err = f.Stat()
if err != nil {
Expand Down Expand Up @@ -243,27 +254,18 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
}

fileSize := rStat.Size()
hasher := GetNewCommitHasher(fileSize)
bufSize := BufferSize
if fileSize < BufferSize {
bufSize = int(fileSize)
}
buffer := make([]byte, bufSize)
_, err = io.CopyBuffer(hasher, r, buffer)
if err != nil {
return false, common.NewError("read_write_error", err.Error())
}

err = hasher.Finalize()
if err != nil {
return false, common.NewError("finalize_error", err.Error())
}
fmtRootBytes, err := hasher.fmt.CalculateRootAndStoreNodes(f)
fmtRootBytes, err := fileData.Hasher.fmt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("fmt_hash_calculation_error", err.Error())
}

validationRootBytes, err := hasher.vt.CalculateRootAndStoreNodes(f)
validationRootBytes, err := fileData.Hasher.vt.CalculateRootAndStoreNodes(f)
if err != nil {
return false, common.NewError("validation_hash_calculation_error", err.Error())
}
Expand All @@ -279,12 +281,7 @@ func (fs *FileStore) CommitWrite(allocID, conID string, fileData *FileInputData)
"calculated validation root does not match with client's validation root")
}

_, err = r.Seek(0, io.SeekStart)
if err != nil {
return false, common.NewError("seek_error", err.Error())
}

_, err = io.Copy(f, r)
_, err = io.CopyBuffer(f, r, buffer)
if err != nil {
return false, common.NewError("write_error", err.Error())
}
Expand Down
6 changes: 4 additions & 2 deletions code/go/0chain.net/blobbercore/filestore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ type FileInputData struct {
//Upload-Offset indicates a byte offset within a resource. The value MUST be a non-negative integer.
UploadOffset int64
//IsFinal the request is final chunk
IsFinal bool
IsThumbnail bool
IsFinal bool
IsThumbnail bool
FilePathHash string
Hasher *CommitHasher
}

type FileOutputData struct {
Expand Down
31 changes: 22 additions & 9 deletions code/go/0chain.net/blobbercore/filestore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,26 @@ func TestStoreStorageWriteAndCommit(t *testing.T) {
size := 640 * KB
validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size))
require.Nil(t, err)

pathHash := encryption.Hash(test.remotePath)
hasher := GetNewCommitHasher(int64(size))
fid := &FileInputData{
Name: test.fileName,
Path: test.remotePath,
ValidationRoot: validationRoot,
FixedMerkleRoot: fixedMerkleRoot,
ChunkSize: 64 * KB,
FilePathHash: pathHash,
Hasher: hasher,
}

f, err := os.Open(fPath)
require.Nil(t, err)
defer f.Close()

_, err = fs.WriteFile(test.allocID, test.connID, fid, f)
require.Nil(t, err)
err = hasher.Finalize()
require.Nil(t, err)

pathHash := encryption.Hash(test.remotePath)
tempFilePath := fs.getTempPathForFile(test.allocID, test.fileName, pathHash, test.connID)
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
Expand Down Expand Up @@ -329,13 +332,16 @@ func TestDeletePreCommitDir(t *testing.T) {
size := 640 * KB
validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size))
require.Nil(t, err)

pathHash := encryption.Hash(remotePath)
hasher := GetNewCommitHasher(int64(size))
fid := &FileInputData{
Name: fileName,
Path: remotePath,
ValidationRoot: validationRoot,
FixedMerkleRoot: fixedMerkleRoot,
ChunkSize: 64 * KB,
FilePathHash: pathHash,
Hasher: hasher,
}
// checkc if file to be uploaded exists
f, err := os.Open(fPath)
Expand All @@ -344,9 +350,9 @@ func TestDeletePreCommitDir(t *testing.T) {
_, err = fs.WriteFile(allocID, connID, fid, f)
require.Nil(t, err)
f.Close()

err = hasher.Finalize()
require.Nil(t, err)
// check if file is written to temp location
pathHash := encryption.Hash(remotePath)
tempFilePath := fs.getTempPathForFile(allocID, fileName, pathHash, connID)
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
Expand All @@ -368,6 +374,8 @@ func TestDeletePreCommitDir(t *testing.T) {

fid.ValidationRoot = validationRoot
fid.FixedMerkleRoot = fixedMerkleRoot
hasher = GetNewCommitHasher(int64(size))
fid.Hasher = hasher

// Write file to temp location
f, err = os.Open(fPath)
Expand All @@ -379,6 +387,8 @@ func TestDeletePreCommitDir(t *testing.T) {
tempFilePath = fs.getTempPathForFile(allocID, fileName, pathHash, connID)
_, err = os.Stat(tempFilePath)
require.Nil(t, err)
err = hasher.Finalize()
require.Nil(t, err)

success, err = fs.CommitWrite(allocID, connID, fid)
require.Nil(t, err)
Expand Down Expand Up @@ -422,13 +432,16 @@ func TestStorageUploadUpdate(t *testing.T) {
size := 640 * KB
validationRoot, fixedMerkleRoot, err := generateRandomData(fPath, int64(size))
require.Nil(t, err)

pathHash := encryption.Hash(remotePath)
hasher := GetNewCommitHasher(int64(size))
fid := &FileInputData{
Name: fileName,
Path: remotePath,
ValidationRoot: validationRoot,
FixedMerkleRoot: fixedMerkleRoot,
ChunkSize: 64 * KB,
FilePathHash: pathHash,
Hasher: hasher,
}
// checkc if file to be uploaded exists
f, err := os.Open(fPath)
Expand All @@ -437,9 +450,9 @@ func TestStorageUploadUpdate(t *testing.T) {
_, err = fs.WriteFile(allocID, connID, fid, f)
require.Nil(t, err)
f.Close()

err = hasher.Finalize()
require.Nil(t, err)
// check if file is written to temp location
pathHash := encryption.Hash(remotePath)
tempFilePath := fs.getTempPathForFile(allocID, fileName, pathHash, connID)
tF, err := os.Stat(tempFilePath)
require.Nil(t, err)
Expand Down
Loading

0 comments on commit 5e2a551

Please sign in to comment.