Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enterprise blobber #1566

Draft
wants to merge 60 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
f9eb675
add version marker and block hasher
Hitenjain14 Jul 21, 2024
02e203d
add repair info in commit worker
Hitenjain14 Jul 21, 2024
c981f06
add check for repair marker
Hitenjain14 Jul 23, 2024
b5cbb79
add allocation version and check for directory
Hitenjain14 Jul 24, 2024
526a117
prefix delete
Hitenjain14 Jul 25, 2024
6486442
fix consensus in delete
Hitenjain14 Jul 25, 2024
f9f8237
fix consensus thresh check
Hitenjain14 Jul 26, 2024
e015027
fix path level in delete
Hitenjain14 Jul 26, 2024
a8bf8bb
remove vt from downloadworker
Hitenjain14 Jul 26, 2024
b91b592
rm vt in reader
Hitenjain14 Jul 27, 2024
9beaccc
Merge branch 'feat/enterprise-blobber' of https://github.com/0chain/g…
Hitenjain14 Jul 27, 2024
f5b51c0
remove single blobber for single client mode
Hitenjain14 Jul 27, 2024
4d33f27
add check for nil ref
Hitenjain14 Jul 27, 2024
796de38
remove mask reset
Hitenjain14 Jul 27, 2024
da1aa55
check change count
Hitenjain14 Jul 27, 2024
a9814df
chane ref check
Hitenjain14 Jul 27, 2024
08de511
remove validation root from Ref
Hitenjain14 Jul 29, 2024
e7df719
add list logs
Hitenjain14 Jul 31, 2024
3f00405
Merge branch 'feat/enterprise-blobber' of https://github.com/0chain/g…
Hitenjain14 Jul 31, 2024
c2e8b89
add response
Hitenjain14 Jul 31, 2024
4496ab6
add debug logs
Hitenjain14 Jul 31, 2024
daa353d
add log for selected file ref
Hitenjain14 Jul 31, 2024
974959f
add check for rate limit
Hitenjain14 Jul 31, 2024
ec1a470
remove get ref log
Hitenjain14 Jul 31, 2024
facdb4f
add repair for enterprise blobbers
Hitenjain14 Aug 5, 2024
d15f3ce
add delete for dir remotepath in move
Hitenjain14 Aug 7, 2024
f2bb064
fix no change check
Hitenjain14 Aug 7, 2024
b0e385f
fix consensus and add check for no change err
Hitenjain14 Aug 7, 2024
a2081f3
fix list completed repair
Hitenjain14 Aug 7, 2024
af515ba
use dir as destpath
Hitenjain14 Aug 8, 2024
e973828
base of dest name
Hitenjain14 Aug 8, 2024
f4ba999
fix copy op
Hitenjain14 Aug 8, 2024
94ef0df
check copy destpath
Hitenjain14 Aug 8, 2024
dbbc11b
join base and dest path
Hitenjain14 Aug 8, 2024
8390f11
set path hash
Hitenjain14 Aug 9, 2024
c552f97
add size for root dir
Hitenjain14 Aug 10, 2024
c14befd
Merge branch 'feat/enterprise-blobber' of https://github.com/0chain/g…
Hitenjain14 Aug 10, 2024
645ded9
check vm
Hitenjain14 Aug 13, 2024
007b982
move upload success to debug
Hitenjain14 Aug 14, 2024
b293286
Merge branch 'sprint-1.17' into feat/enterprise-blobber
Hitenjain14 Aug 16, 2024
a9a39a9
Merge branch 'sprint-1.17' of https://github.com/0chain/gosdk into fe…
Hitenjain14 Aug 16, 2024
2119c4a
fix nested move and rename
Hitenjain14 Aug 22, 2024
6648ada
fix nested rename
Hitenjain14 Aug 23, 2024
acf47eb
rmv log for invalid path
Hitenjain14 Aug 23, 2024
970cd10
Merge branch 'sprint-1.17' of https://github.com/0chain/gosdk into fe…
Hitenjain14 Aug 25, 2024
50adab4
Merge remote-tracking branch 'origin' into feat/enterprise-blobber
Hitenjain14 Aug 27, 2024
75bab85
Merge remote-tracking branch 'origin' into feat/enterprise-blobber
Hitenjain14 Sep 3, 2024
f71928d
Merge branch 'staging' into feat/enterprise-blobber
Hitenjain14 Sep 23, 2024
c8f84a9
Merge branch 'staging' into feat/enterprise-blobber
Hitenjain14 Sep 24, 2024
aaf11db
Merge branch 'staging' into feat/enterprise-blobber
Jayashsatolia403 Sep 24, 2024
271ccc6
Merge remote-tracking branch 'origin' into feat/enterprise-blobber
Hitenjain14 Oct 12, 2024
237970b
comment test
Hitenjain14 Oct 12, 2024
43dcb10
Merge branch 'feat/enterprise-blobber' of https://github.com/0chain/g…
Hitenjain14 Oct 12, 2024
d4d99f5
fix release of buffer
Hitenjain14 Oct 23, 2024
655bacd
change cdn and wasm path
Hitenjain14 Nov 8, 2024
f967f8a
update go version in make wasm
Hitenjain14 Nov 8, 2024
2f06303
add method for wasm type
Hitenjain14 Nov 14, 2024
d8f100c
set wasm type in window object
Hitenjain14 Nov 14, 2024
2f3d040
merge file write and callback changes
Hitenjain14 Nov 25, 2024
0fe81ae
fix build
Hitenjain14 Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/build-sdks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Set up Go 1.22
uses: actions/setup-go@v3
with:
go-version: 1.22
go-version: 1.22.5

- name: Clean build
run: make clean-mobilesdk
Expand Down Expand Up @@ -99,7 +99,7 @@ jobs:
- name: Set up Go 1.22
uses: actions/setup-go@v3
with:
go-version: 1.22
go-version: 1.22.5

- name: Install deps
run: |
Expand Down Expand Up @@ -202,7 +202,7 @@ jobs:
- name: Set up Go 1.22
uses: actions/setup-go@v3
with:
go-version: 1.22
go-version: 1.22.5

- name: Clean build
run: make clean-mobilesdk
Expand Down Expand Up @@ -274,7 +274,7 @@ jobs:
- name: Set up Go 1.22
uses: actions/setup-go@v3
with:
go-version: 1.22
go-version: 1.22.5

- name: Install deps
run: |
Expand Down Expand Up @@ -338,7 +338,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v3
with:
go-version: 1.21.5
go-version: 1.22.5

- name: Checkout
uses: actions/checkout@v2
Expand All @@ -349,7 +349,7 @@ jobs:
sudo apt-get -y install build-essential nghttp2 libnghttp2-dev libssl-dev wget

- name: Build
run: docker run --rm -v $PWD:/gosdk -w /gosdk golang:1.21.5 make wasm-build
run: docker run --rm -v $PWD:/gosdk -w /gosdk golang:1.22.5 make wasm-build

- name: 'Upload Artifact'
uses: actions/upload-artifact@v3
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v3
with:
go-version: 1.21.5
go-version: 1.22.5

- uses: actions/checkout@v3

Expand All @@ -51,7 +51,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v3
with:
go-version: 1.21.5
go-version: 1.22.5

- name: Install deps
run: |
Expand Down Expand Up @@ -170,7 +170,7 @@ jobs:
- name: Set up Go 1.x
uses: actions/setup-go@v3
with:
go-version: 1.21.5
go-version: 1.22.5

- uses: actions/setup-node@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/0chain/gosdk

go 1.21
go 1.22.5

require (
github.com/0chain/common v0.0.6-0.20230127095721-8df4d1d72565
Expand Down
64 changes: 49 additions & 15 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,8 +998,9 @@ func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, w
// - remotePath : remote path of the file
// - authTicket : auth ticket of the file, if the file is shared
// - lookupHash : lookup hash of the file, which is used to locate the file if remotepath and allocation id are not provided
// - writeChunkFuncName : callback function name to write the chunk, if empty the function will return arrayBuffer otherwise will return nil

func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, startBlock, endBlock int64) ([]byte, error) {
func downloadBlocks(allocId, remotePath, authTicket, lookupHash, writeChunkFuncName string, startBlock, endBlock int64) ([]byte, error) {

if len(remotePath) == 0 && len(authTicket) == 0 {
return nil, RequiredArg("remotePath/authTicket")
Expand All @@ -1017,37 +1018,58 @@ func downloadBlocks(allocId string, remotePath, authTicket, lookupHash string, s
statusBar = &StatusBar{wg: wg, totalBytesMap: make(map[string]int)}
)

pathHash := encryption.FastHash(remotePath)
fs, err := sys.Files.Open(pathHash)
if err != nil {
return nil, fmt.Errorf("could not open local file: %v", err)
if lookupHash == "" {
lookupHash = getLookupHash(allocId, remotePath)
}

mf, _ := fs.(*sys.MemFile)
if mf == nil {
return nil, fmt.Errorf("invalid memfile")
}
var fh sys.File
if writeChunkFuncName == "" {
pathHash := encryption.FastHash(fmt.Sprintf("%s:%d:%d", lookupHash, startBlock, endBlock))
fs, err := sys.Files.Open(pathHash)
if err != nil {
return nil, fmt.Errorf("could not open local file: %v", err)
}

defer sys.Files.Remove(pathHash) //nolint
mf, _ := fs.(*sys.MemFile)
if mf == nil {
return nil, fmt.Errorf("invalid memfile")
}
fh = mf
defer sys.Files.Remove(pathHash) //nolint
} else {
fh = jsbridge.NewFileCallbackWriter(writeChunkFuncName)
}

wg.Add(1)
if authTicket != "" {
err = alloc.DownloadByBlocksToFileHandlerFromAuthTicket(mf, authTicket, lookupHash, startBlock, endBlock, 100, remotePath, false, statusBar, true)
err = alloc.DownloadByBlocksToFileHandlerFromAuthTicket(fh, authTicket, lookupHash, startBlock, endBlock, 100, remotePath, false, statusBar, true, sdk.WithFileCallback(
func() {
fh.Close() //nolint:errcheck
},
))
} else {
err = alloc.DownloadByBlocksToFileHandler(
mf,
fh,
remotePath,
startBlock,
endBlock,
100,
false,
statusBar, true)
statusBar, true, sdk.WithFileCallback(
func() {
fh.Close() //nolint:errcheck
},
))
}
if err != nil {
return nil, err
}
wg.Wait()
return mf.Buffer, nil
var buf []byte
if mf, ok := fh.(*sys.MemFile); ok {
buf = mf.Buffer
}
return buf, nil
}

// getBlobbers get list of active blobbers, and format them as array json string
Expand Down Expand Up @@ -1086,7 +1108,19 @@ func repairAllocation(allocationID, callbackFuncName string) error {
return err
}
wg.Wait()
return statusBar.err
if statusBar.err != nil {
fmt.Println("Error in repair allocation: ", statusBar.err)
return statusBar.err
}
status, _, err := alloc.CheckAllocStatus()
if err != nil {
return err
}
if status == sdk.Repair || status == sdk.Broken {
fmt.Println("allocation repair failed")
return errors.New("allocation repair failed")
}
return nil
}

// checkAllocStatus check the status of the allocation, either it is ok, needs repair or broken
Expand Down
8 changes: 8 additions & 0 deletions wasmsdk/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,11 @@ func (h *hasher) WriteToValidationMT(_ []byte) error {
func (h *hasher) Finalize() error {
return nil
}

func (h *hasher) GetBlockHash() (string, error) {
return "", nil
}

func (h *hasher) WriteToBlockHasher(buf []byte) error {
return nil
}
114 changes: 100 additions & 14 deletions wasmsdk/jsbridge/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package jsbridge

import (
"errors"
"io"
"io/fs"
"syscall/js"

"github.com/0chain/gosdk/core/common"
"github.com/valyala/bytebufferpool"
)

type FileWriter struct {
Expand All @@ -31,29 +33,42 @@ func (w *FileWriter) Write(p []byte) (int, error) {

//copy bytes to buf
if w.bufWriteOffset+len(p) > len(w.buf) {
w.writeError = true
return 0, io.ErrShortWrite
err := w.flush()
if err != nil {
return 0, err
}
}
n := copy(w.buf[w.bufWriteOffset:], p)
w.bufWriteOffset += n
if w.bufWriteOffset == len(w.buf) {
//write to file
if w.bufLen != len(w.buf) {
w.bufLen = len(w.buf)
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
err := w.flush()
if err != nil {
return 0, err
}
js.CopyBytesToJS(w.uint8Array, w.buf)
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return 0, errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
}
return len(p), nil
}

func (w *FileWriter) flush() error {
if w.bufWriteOffset == 0 {
return nil
}
if w.bufLen != w.bufWriteOffset {
w.bufLen = w.bufWriteOffset
w.uint8Array = js.Global().Get("Uint8Array").New(w.bufLen)
}
js.CopyBytesToJS(w.uint8Array, w.buf[:w.bufWriteOffset])
_, err := Await(w.writableStream.Call("write", w.uint8Array))
if len(err) > 0 && !err[0].IsNull() {
w.writeError = true
return errors.New("file_writer: " + err[0].String())
}
//reset buffer
w.bufWriteOffset = 0
return nil
}

// func (w *FileWriter) WriteAt(p []byte, offset int64) (int, error) {
// uint8Array := js.Global().Get("Uint8Array").New(len(p))
// js.CopyBytesToJS(uint8Array, p)
Expand Down Expand Up @@ -148,3 +163,74 @@ func NewFileWriterFromHandle(dirHandler js.Value, name string) (*FileWriter, err
fileHandle: fileHandler[0],
}, nil
}

type FileCallbackWriter struct {
writeChunk js.Value
buf []byte
offset int64
}

const bufCallbackCap = 4 * 1024 * 1024 //4MB

func NewFileCallbackWriter(writeChunkFuncName string) *FileCallbackWriter {
writeChunk := js.Global().Get(writeChunkFuncName)
return &FileCallbackWriter{
writeChunk: writeChunk,
}
}

func (wc *FileCallbackWriter) Write(p []byte) (int, error) {
if len(wc.buf) == 0 {
buff := common.MemPool.Get()
if cap(buff.B) < bufCallbackCap {
buff.B = make([]byte, 0, bufCallbackCap)
}
wc.buf = buff.B
}
if len(wc.buf)+len(p) > cap(wc.buf) {
uint8Array := js.Global().Get("Uint8Array").New(len(wc.buf))
js.CopyBytesToJS(uint8Array, wc.buf)
_, err := Await(wc.writeChunk.Invoke(uint8Array, wc.offset))
if len(err) > 0 && !err[0].IsNull() {
return 0, errors.New("file_writer: " + err[0].String())
}
wc.offset += int64(len(wc.buf))
wc.buf = wc.buf[:0]
}
wc.buf = append(wc.buf, p...)
return len(p), nil
}

func (wc *FileCallbackWriter) Close() error {
if len(wc.buf) > 0 {
uint8Array := js.Global().Get("Uint8Array").New(len(wc.buf))
js.CopyBytesToJS(uint8Array, wc.buf)
_, err := Await(wc.writeChunk.Invoke(uint8Array, wc.offset))
if len(err) > 0 && !err[0].IsNull() {
return errors.New("file_writer: " + err[0].String())
}
wc.offset += int64(len(wc.buf))
wc.buf = wc.buf[:0]
}
buff := &bytebufferpool.ByteBuffer{
B: wc.buf,
}
common.MemPool.Put(buff)
return nil
}

func (wc *FileCallbackWriter) Read(p []byte) (int, error) {
return 0, errors.New("file_writer: not supported")
}

func (wc *FileCallbackWriter) Seek(offset int64, whence int) (int64, error) {
return 0, nil
}

func (wc *FileCallbackWriter) Sync() error {
return nil
}

func (wc *FileCallbackWriter) Stat() (fs.FileInfo, error) {
return nil, nil
}
2 changes: 1 addition & 1 deletion wasmsdk/jsbridge/template_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func buildJS(args, env []string, wasmPath string, tpl []byte) (string, error) {
if suffix == "" {
suffix = "dev"
}
cdnPath := fmt.Sprintf("https://d2os1u2xwjukgr.cloudfront.net/%s/zcn.wasm", suffix)
cdnPath := fmt.Sprintf("https://webapps-staticfiles.s3.us-east-2.amazonaws.com/%s/enterprise-zcn.wasm", suffix)
data := templateData{
Path: cdnPath,
Args: args,
Expand Down
2 changes: 1 addition & 1 deletion wasmsdk/jsbridge/webworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewWasmWebWorker(blobberID, blobberURL, clientID, clientKey, peerPublicKey,
"IS_SPLIT=" + strconv.FormatBool(isSplit),
"MNEMONIC=" + mnemonic,
"ZAUTH_SERVER=" + gZauthServer},
Path: "zcn.wasm",
Path: "enterprise-zcn.wasm",
subscribers: make(map[string]chan worker.MessageEvent),
}

Expand Down
2 changes: 1 addition & 1 deletion wasmsdk/jsbridge/zcnworker.js.tpl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
importScripts('https://cdn.jsdelivr.net/gh/golang/go@go1.21.5/misc/wasm/wasm_exec.js','https://cdn.jsdelivr.net/gh/herumi/[email protected]/browser/bls.js');
importScripts('https://cdn.jsdelivr.net/gh/golang/go@go1.22.5/misc/wasm/wasm_exec.js','https://cdn.jsdelivr.net/gh/herumi/[email protected]/browser/bls.js');

const go = new Go();
go.argv = {{.ArgsToJS}}
Expand Down
2 changes: 1 addition & 1 deletion wasmsdk/player_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *FilePlayer) download(startBlock int64) {
}
fmt.Println("start:", startBlock, "end:", endBlock, "numBlocks:", p.numBlocks, "total:", p.playlistFile.NumBlocks)

data, err := downloadBlocks(p.allocationObj.ID, p.remotePath, p.authTicket, p.lookupHash, startBlock, endBlock)
data, err := downloadBlocks(p.allocationObj.ID, p.remotePath, p.authTicket, p.lookupHash, "", startBlock, endBlock)
// data, err := downloadBlocks2(int(startBlock), int(endBlock), p.allocationObj, p.remotePath)
if err != nil {
PrintError(err.Error())
Expand Down
Loading