diff --git a/wasmsdk/blobber.go b/wasmsdk/blobber.go index 3f4add853..8047507ee 100644 --- a/wasmsdk/blobber.go +++ b/wasmsdk/blobber.go @@ -998,8 +998,9 @@ func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, w // - remotePath : remote path of the file // - authTicket : auth ticket of the file, if the file is shared // - lookupHash : lookup hash of the file, which is used to locate the file if remotepath and allocation id are not provided +// - writeChunkFuncName : callback function name to write the chunk, if empty the function will return arrayBuffer otherwise will return nil -func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, startBlock, endBlock int64) ([]byte, error) { +func downloadBlocks(allocId, remotePath, authTicket, lookupHash, writeChunkFuncName string, startBlock, endBlock int64) ([]byte, error) { if len(remotePath) == 0 && len(authTicket) == 0 { return nil, RequiredArg("remotePath/authTicket") @@ -1017,37 +1018,58 @@ func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, s statusBar = &StatusBar{wg: wg, totalBytesMap: make(map[string]int)} ) - pathHash := encryption.FastHash(remotePath) - fs, err := sys.Files.Open(pathHash) - if err != nil { - return nil, fmt.Errorf("could not open local file: %v", err) + if lookupHash == "" { + lookupHash = getLookupHash(allocId, remotePath) } - mf, _ := fs.(*sys.MemFile) - if mf == nil { - return nil, fmt.Errorf("invalid memfile") - } + var fh sys.File + if writeChunkFuncName == "" { + pathHash := encryption.FastHash(fmt.Sprintf("%s:%d:%d", lookupHash, startBlock, endBlock)) + fs, err := sys.Files.Open(pathHash) + if err != nil { + return nil, fmt.Errorf("could not open local file: %v", err) + } - defer sys.Files.Remove(pathHash) //nolint + mf, _ := fs.(*sys.MemFile) + if mf == nil { + return nil, fmt.Errorf("invalid memfile") + } + fh = mf + defer sys.Files.Remove(pathHash) //nolint + } else { + fh = jsbridge.NewFileCallbackWriter(writeChunkFuncName) + } wg.Add(1) if authTicket != "" { - err = alloc.DownloadByBlocksToFileHandlerFromAuthTicket(mf, authTicket, lookupHash, startBlock, endBlock, 100, remotePath, false, statusBar, true) + err = alloc.DownloadByBlocksToFileHandlerFromAuthTicket(fh, authTicket, lookupHash, startBlock, endBlock, 100, remotePath, false, statusBar, true, sdk.WithFileCallback( + func() { + fh.Close() //nolint:errcheck + }, + )) } else { err = alloc.DownloadByBlocksToFileHandler( - mf, + fh, remotePath, startBlock, endBlock, 100, false, - statusBar, true) + statusBar, true, sdk.WithFileCallback( + func() { + fh.Close() //nolint:errcheck + }, + )) } if err != nil { return nil, err } wg.Wait() - return mf.Buffer, nil + var buf []byte + if mf, ok := fh.(*sys.MemFile); ok { + buf = mf.Buffer + } + return buf, nil } // getBlobbers get list of active blobbers, and format them as array json string @@ -1086,7 +1108,19 @@ func repairAllocation(allocationID, callbackFuncName string) error { return err } wg.Wait() - return statusBar.err + if statusBar.err != nil { + fmt.Println("Error in repair allocation: ", statusBar.err) + return statusBar.err + } + status, _, err := alloc.CheckAllocStatus() + if err != nil { + return err + } + if status == sdk.Repair || status == sdk.Broken { + fmt.Println("allocation repair failed") + return errors.New("allocation repair failed") + } + return nil } // checkAllocStatus check the status of the allocation, either it is ok, needs repair or broken diff --git a/wasmsdk/jsbridge/file_writer.go b/wasmsdk/jsbridge/file_writer.go index 21b0e17b2..e265117fa 100644 --- a/wasmsdk/jsbridge/file_writer.go +++ b/wasmsdk/jsbridge/file_writer.go @@ -5,9 +5,11 @@ package jsbridge import ( "errors" - "io" "io/fs" "syscall/js" + + "github.com/0chain/gosdk/core/common" + "github.com/valyala/bytebufferpool" ) type FileWriter struct { @@ -31,29 +33,42 @@ func (w *FileWriter) Write(p []byte) (int, error) { //copy bytes to buf if w.bufWriteOffset+len(p) > len(w.buf) { - w.writeError = true - return 0, io.ErrShortWrite + err := w.flush() + if err != nil { + return 0, err + } } n := copy(w.buf[w.bufWriteOffset:], p) w.bufWriteOffset += n if w.bufWriteOffset == len(w.buf) { //write to file - if w.bufLen != len(w.buf) { - w.bufLen = len(w.buf) - w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) + err := w.flush() + if err != nil { + return 0, err } - js.CopyBytesToJS(w.uint8Array, w.buf) - _, err := Await(w.writableStream.Call("write", w.uint8Array)) - if len(err) > 0 && !err[0].IsNull() { - w.writeError = true - return 0, errors.New("file_writer: " + err[0].String()) - } - //reset buffer - w.bufWriteOffset = 0 } return len(p), nil } +func (w *FileWriter) flush() error { + if w.bufWriteOffset == 0 { + return nil + } + if w.bufLen != w.bufWriteOffset { + w.bufLen = w.bufWriteOffset + w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen) + } + js.CopyBytesToJS(w.uint8Array, w.buf[:w.bufWriteOffset]) + _, err := Await(w.writableStream.Call("write", w.uint8Array)) + if len(err) > 0 && !err[0].IsNull() { + w.writeError = true + return errors.New("file_writer: " + err[0].String()) + } + //reset buffer + w.bufWriteOffset = 0 + return nil +} + // func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) { // uint8Array := js.Global().Get("Uint8Array").New(len(p)) // js.CopyBytesToJS(uint8Array, p) @@ -148,3 +163,74 @@ func NewFileWriterFromHandle(dirHandler js.Value, name string) (*FileWriter, err fileHandle: fileHandler[0], }, nil } + +type FileCallbackWriter struct { + writeChunk js.Value + buf []byte + offset int64 +} + +const bufCallbackCap = 4 * 1024 * 1024 //4MB + +func NewFileCallbackWriter(writeChunkFuncName string) *FileCallbackWriter { + writeChunk := js.Global().Get(writeChunkFuncName) + return &FileCallbackWriter{ + writeChunk: writeChunk, + } +} + +func (wc *FileCallbackWriter) Write(p []byte) (int, error) { + if len(wc.buf) == 0 { + buff := common.MemPool.Get() + if cap(buff.B) < bufCallbackCap { + buff.B = make([]byte, 0, bufCallbackCap) + } + wc.buf = buff.B + } + if len(wc.buf)+len(p) > cap(wc.buf) { + uint8Array := js.Global().Get("Uint8Array").New(len(wc.buf)) + js.CopyBytesToJS(uint8Array, wc.buf) + _, err := Await(wc.writeChunk.Invoke(uint8Array, wc.offset)) + if len(err) > 0 && !err[0].IsNull() { + return 0, errors.New("file_writer: " + err[0].String()) + } + wc.offset += int64(len(wc.buf)) + wc.buf = wc.buf[:0] + } + wc.buf = append(wc.buf, p...) + return len(p), nil +} + +func (wc *FileCallbackWriter) Close() error { + if len(wc.buf) > 0 { + uint8Array := js.Global().Get("Uint8Array").New(len(wc.buf)) + js.CopyBytesToJS(uint8Array, wc.buf) + _, err := Await(wc.writeChunk.Invoke(uint8Array, wc.offset)) + if len(err) > 0 && !err[0].IsNull() { + return errors.New("file_writer: " + err[0].String()) + } + wc.offset += int64(len(wc.buf)) + wc.buf = wc.buf[:0] + } + buff := &bytebufferpool.ByteBuffer{ + B: wc.buf, + } + common.MemPool.Put(buff) + return nil +} + +func (wc *FileCallbackWriter) Read(p []byte) (int, error) { + return 0, errors.New("file_writer: not supported") +} + +func (wc *FileCallbackWriter) Seek(offset int64, whence int) (int64, error) { + return 0, nil +} + +func (wc *FileCallbackWriter) Sync() error { + return nil +} + +func (wc *FileCallbackWriter) Stat() (fs.FileInfo, error) { + return nil, nil +} diff --git a/zboxcore/sdk/chunked_upload.go b/zboxcore/sdk/chunked_upload.go index b7018b98f..b6bea2232 100644 --- a/zboxcore/sdk/chunked_upload.go +++ b/zboxcore/sdk/chunked_upload.go @@ -696,7 +696,7 @@ func (su *ChunkedUpload) uploadToBlobbers(uploadData UploadData) error { if strings.Contains(err.Error(), "duplicate") { su.consensus.Done() errC := atomic.AddInt32(&su.addConsensus, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- err } return diff --git a/zboxcore/sdk/chunked_upload_process_js.go b/zboxcore/sdk/chunked_upload_process_js.go index 8b3af20de..bc8db2ef0 100644 --- a/zboxcore/sdk/chunked_upload_process_js.go +++ b/zboxcore/sdk/chunked_upload_process_js.go @@ -262,7 +262,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er eventChan := allEventChan[pos] if eventChan.C == nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- thrown.New("upload_failed", "Upload failed. Worker event channel not found") } return @@ -272,7 +272,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er if !ok { logger.Logger.Error("chan closed from: ", blobber.blobber.Baseurl) errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { if su.ctx.Err() != nil { wgErrors <- context.Cause(su.ctx) } else { @@ -284,7 +284,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er msgType, data, err := jsbridge.GetMsgType(event) if err != nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- errors.Wrap(err, "could not get msgType") } return @@ -294,7 +294,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er case "auth": if err := su.processWebWorkerAuthRequest(data, eventChan); err != nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- err } return @@ -306,7 +306,7 @@ func (su *ChunkedUpload) listen(allEventChan []eventChanWorker, respChan chan er isFinal, err = su.processWebWorkerUpload(data, blobber, pos) if err != nil { errC := atomic.AddInt32(&errCount, 1) - if errC >= int32(su.consensus.consensusThresh) { + if errC > int32(su.consensus.fullconsensus-su.consensus.consensusThresh) { wgErrors <- err } } else { diff --git a/zboxcore/sdk/copyworker.go b/zboxcore/sdk/copyworker.go index 8113f6c77..9210e5593 100644 --- a/zboxcore/sdk/copyworker.go +++ b/zboxcore/sdk/copyworker.go @@ -247,11 +247,13 @@ func (req *CopyRequest) ProcessWithBlobbers() ([]fileref.RefEntity, error) { }(int(pos)) } wg.Wait() - err := zboxutil.MajorError(blobberErrors) - if err != nil && strings.Contains(err.Error(), objAlreadyExists) && consensusRef.Type == fileref.DIRECTORY { - return nil, errNoChange + var err error + if !req.isConsensusOk() { + err = zboxutil.MajorError(blobberErrors) + if err != nil && strings.Contains(err.Error(), objAlreadyExists) && consensusRef.Type == fileref.DIRECTORY { + return nil, errNoChange + } } - return objectTreeRefs, err } diff --git a/zboxcore/sdk/moveworker.go b/zboxcore/sdk/moveworker.go index 5b3298023..28af96d4d 100644 --- a/zboxcore/sdk/moveworker.go +++ b/zboxcore/sdk/moveworker.go @@ -260,7 +260,11 @@ func (req *MoveRequest) ProcessWithBlobbers() ([]fileref.RefEntity, error) { }(int(pos)) } wg.Wait() - return objectTreeRefs, zboxutil.MajorError(blobberErrors) + var err error + if !req.isConsensusOk() { + err = zboxutil.MajorError(blobberErrors) + } + return objectTreeRefs, err } func (req *MoveRequest) ProcessMove() error { diff --git a/zboxcore/sdk/renameworker.go b/zboxcore/sdk/renameworker.go index c6eb19834..48deac045 100644 --- a/zboxcore/sdk/renameworker.go +++ b/zboxcore/sdk/renameworker.go @@ -263,7 +263,11 @@ func (req *RenameRequest) ProcessWithBlobbers() ([]fileref.RefEntity, error) { } req.wg.Wait() - return objectTreeRefs, zboxutil.MajorError(blobberErrors) + var err error + if !req.consensus.isConsensusOk() { + err = zboxutil.MajorError(blobberErrors) + } + return objectTreeRefs, err } func (req *RenameRequest) ProcessRename() error {