Skip to content

Commit

Permalink
Merge pull request #15 from 0chain/feat/enterprise-blobber
Browse files Browse the repository at this point in the history
File write and callback changes
  • Loading branch information
Hitenjain14 authored Nov 25, 2024
2 parents 3b828e1 + 7d6a1cc commit 10382bb
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 41 deletions.
64 changes: 49 additions & 15 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,9 @@ func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, w
// - remotePath : remote path of the file
// - authTicket : auth ticket of the file, if the file is shared
// - lookupHash : lookup hash of the file, which is used to locate the file if remotepath and allocation id are not provided
// - writeChunkFuncName : callback function name to write the chunk, if empty the function will return arrayBuffer otherwise will return nil

func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, startBlock, endBlock int64) ([]byte, error) {
func downloadBlocks(allocId, remotePath, authTicket, lookupHash, writeChunkFuncName string, startBlock, endBlock int64) ([]byte, error) {

if len(remotePath) == 0 && len(authTicket) == 0 {
return nil, RequiredArg("remotePath/authTicket")
Expand All @@ -1017,37 +1018,58 @@ func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, s
statusBar = &StatusBar{wg: wg, totalBytesMap: make(map[string]int)}
)

pathHash := encryption.FastHash(remotePath)
fs, err := sys.Files.Open(pathHash)
if err != nil {
return nil, fmt.Errorf("could not open local file: %v", err)
if lookupHash == "" {
lookupHash = getLookupHash(allocId, remotePath)
}

mf, _ := fs.(*sys.MemFile)
if mf == nil {
return nil, fmt.Errorf("invalid memfile")
}
var fh sys.File
if writeChunkFuncName == "" {
pathHash := encryption.FastHash(fmt.Sprintf("%s:%d:%d", lookupHash, startBlock, endBlock))
fs, err := sys.Files.Open(pathHash)
if err != nil {
return nil, fmt.Errorf("could not open local file: %v", err)
}

defer sys.Files.Remove(pathHash) //nolint
mf, _ := fs.(*sys.MemFile)
if mf == nil {
return nil, fmt.Errorf("invalid memfile")
}
fh = mf
defer sys.Files.Remove(pathHash) //nolint
} else {
fh = jsbridge.NewFileCallbackWriter(writeChunkFuncName)
}

wg.Add(1)
if authTicket != "" {
err = alloc.DownloadByBlocksToFileHandlerFromAuthTicket(mf, authTicket, lookupHash, startBlock, endBlock, 100, remotePath, false, statusBar, true)
err = alloc.DownloadByBlocksToFileHandlerFromAuthTicket(fh, authTicket, lookupHash, startBlock, endBlock, 100, remotePath, false, statusBar, true, sdk.WithFileCallback(
func() {
fh.Close() //nolint:errcheck
},
))
} else {
err = alloc.DownloadByBlocksToFileHandler(
mf,
fh,
remotePath,
startBlock,
endBlock,
100,
false,
statusBar, true)
statusBar, true, sdk.WithFileCallback(
func() {
fh.Close() //nolint:errcheck
},
))
}
if err != nil {
return nil, err
}
wg.Wait()
return mf.Buffer, nil
var buf []byte
if mf, ok := fh.(*sys.MemFile); ok {
buf = mf.Buffer
}
return buf, nil
}

// getBlobbers get list of active blobbers, and format them as array json string
Expand Down Expand Up @@ -1086,7 +1108,19 @@ func repairAllocation(allocationID, callbackFuncName string) error {
return err
}
wg.Wait()
return statusBar.err
if statusBar.err != nil {
fmt.Println("Error in repair allocation: ", statusBar.err)
return statusBar.err
}
status, _, err := alloc.CheckAllocStatus()
if err != nil {
return err
}
if status == sdk.Repair || status == sdk.Broken {
fmt.Println("allocation repair failed")
return errors.New("allocation repair failed")
}
return nil
}

// checkAllocStatus check the status of the allocation, either it is ok, needs repair or broken
Expand Down
114 changes: 100 additions & 14 deletions wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package jsbridge

import (
"errors"
"io"
"io/fs"
"syscall/js"

"github.com/0chain/gosdk/core/common"
"github.com/valyala/bytebufferpool"
)

type FileWriter struct {
Expand All @@ -31,29 +33,42 @@ func (w *FileWriter) Write(p []byte) (int, error) {

//copy bytes to buf
if w.bufWriteOffset+len(p) > len(w.buf) {
w.writeError = true
return 0, io.ErrShortWrite
err := w.flush()
if err != nil {
return 0, err
}
}
n := copy(w.buf[w.bufWriteOffset:], p)
w.bufWriteOffset += n
if w.bufWriteOffset == len(w.buf) {
//write to file
if w.bufLen != len(w.buf) {
w.bufLen = len(w.buf)
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
err := w.flush()
if err != nil {
return 0, err
}
js.CopyBytesToJS(w.uint8Array, w.buf)
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return 0, errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
}
return len(p), nil
}

func (w *FileWriter) flush() error {
if w.bufWriteOffset == 0 {
return nil
}
if w.bufLen != w.bufWriteOffset {
w.bufLen = w.bufWriteOffset
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
}
js.CopyBytesToJS(w.uint8Array, w.buf[:w.bufWriteOffset])
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
return nil
}

// func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) {
// uint8Array := js.Global().Get("Uint8Array").New(len(p))
// js.CopyBytesToJS(uint8Array, p)
Expand Down Expand Up @@ -148,3 +163,74 @@ func NewFileWriterFromHandle(dirHandler js.Value, name string) (*FileWriter, err
fileHandle: fileHandler[0],
}, nil
}

type FileCallbackWriter struct {
writeChunk js.Value
buf []byte
offset int64
}

const bufCallbackCap = 4 * 1024 * 1024 //4MB

func NewFileCallbackWriter(writeChunkFuncName string) *FileCallbackWriter {
writeChunk := js.Global().Get(writeChunkFuncName)
return &FileCallbackWriter{
writeChunk: writeChunk,
}
}

func (wc *FileCallbackWriter) Write(p []byte) (int, error) {
if len(wc.buf) == 0 {
buff := common.MemPool.Get()
if cap(buff.B) < bufCallbackCap {
buff.B = make([]byte, 0, bufCallbackCap)
}
wc.buf = buff.B
}
if len(wc.buf)+len(p) > cap(wc.buf) {
uint8Array := js.Global().Get("Uint8Array").New(len(wc.buf))
js.CopyBytesToJS(uint8Array, wc.buf)
_, err := Await(wc.writeChunk.Invoke(uint8Array, wc.offset))
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_writer: " + err[0].String())
}
wc.offset += int64(len(wc.buf))
wc.buf = wc.buf[:0]
}
wc.buf = append(wc.buf, p...)
return len(p), nil
}

func (wc *FileCallbackWriter) Close() error {
if len(wc.buf) > 0 {
uint8Array := js.Global().Get("Uint8Array").New(len(wc.buf))
js.CopyBytesToJS(uint8Array, wc.buf)
_, err := Await(wc.writeChunk.Invoke(uint8Array, wc.offset))
if len(err) > 0 && !err[0].IsNull() {
return errors.New("file_writer: " + err[0].String())
}
wc.offset += int64(len(wc.buf))
wc.buf = wc.buf[:0]
}
buff := &bytebufferpool.ByteBuffer{
B: wc.buf,
}
common.MemPool.Put(buff)
return nil
}

func (wc *FileCallbackWriter) Read(p []byte) (int, error) {
return 0, errors.New("file_writer: not supported")
}

func (wc *FileCallbackWriter) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}

func (wc *FileCallbackWriter) Sync() error {
return nil
}

func (wc *FileCallbackWriter) Stat() (fs.FileInfo, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion zboxcore/sdk/chunked_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error {
if strings.Contains(err.Error(), "duplicate") {
su.consensus.Done()
errC := atomic.AddInt32(&su.addConsensus, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
return
Expand Down
10 changes: 5 additions & 5 deletions zboxcore/sdk/chunked_upload_process_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
eventChan := allEventChan[pos]
if eventChan.C == nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- thrown.New("upload_failed", "Upload failed. Worker event channel not found")
}
return
Expand All @@ -272,7 +272,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
if !ok {
logger.Logger.Error("chan closed from: ", blobber.blobber.Baseurl)
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
if su.ctx.Err() != nil {
wgErrors <- context.Cause(su.ctx)
} else {
Expand All @@ -284,7 +284,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
msgType, data, err := jsbridge.GetMsgType(event)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- errors.Wrap(err, "could not get msgType")
}
return
Expand All @@ -294,7 +294,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
case "auth":
if err := su.processWebWorkerAuthRequest(data, eventChan); err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
return
Expand All @@ -306,7 +306,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er
isFinal, err = su.processWebWorkerUpload(data, blobber, pos)
if err != nil {
errC := atomic.AddInt32(&errCount, 1)
if errC >= int32(su.consensus.consensusThresh) {
if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) {
wgErrors <- err
}
} else {
Expand Down
10 changes: 6 additions & 4 deletions zboxcore/sdk/copyworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,13 @@ func (req *CopyRequest) ProcessWithBlobbers() ([]fileref.RefEntity, error) {
}(int(pos))
}
wg.Wait()
err := zboxutil.MajorError(blobberErrors)
if err != nil && strings.Contains(err.Error(), objAlreadyExists) && consensusRef.Type == fileref.DIRECTORY {
return nil, errNoChange
var err error
if !req.isConsensusOk() {
err = zboxutil.MajorError(blobberErrors)
if err != nil && strings.Contains(err.Error(), objAlreadyExists) && consensusRef.Type == fileref.DIRECTORY {
return nil, errNoChange
}
}

return objectTreeRefs, err
}

Expand Down
6 changes: 5 additions & 1 deletion zboxcore/sdk/moveworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ func (req *MoveRequest) ProcessWithBlobbers() ([]fileref.RefEntity, error) {
}(int(pos))
}
wg.Wait()
return objectTreeRefs, zboxutil.MajorError(blobberErrors)
var err error
if !req.isConsensusOk() {
err = zboxutil.MajorError(blobberErrors)
}
return objectTreeRefs, err
}

func (req *MoveRequest) ProcessMove() error {
Expand Down
6 changes: 5 additions & 1 deletion zboxcore/sdk/renameworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,11 @@ func (req *RenameRequest) ProcessWithBlobbers() ([]fileref.RefEntity, error) {
}
req.wg.Wait()

return objectTreeRefs, zboxutil.MajorError(blobberErrors)
var err error
if !req.consensus.isConsensusOk() {
err = zboxutil.MajorError(blobberErrors)
}
return objectTreeRefs, err
}

func (req *RenameRequest) ProcessRename() error {
Expand Down

0 comments on commit 10382bb

Please sign in to comment.