Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live streaming #1062

Open
wants to merge 10 commits into
base: staging
Choose a base branch
from
141 changes: 141 additions & 0 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,16 @@ type BulkUploadOption struct {
CallbackFuncName string `json:"callbackFuncName,omitempty"`
}

type LiveUploadOption struct {
AllocationID string `json:"allocationId,omitempty"`
RemotePath string `json:"remotePath,omitempty"`

Encrypt bool `json:"encrypt,omitempty"`

FileBytes jsbridge.Bytes `json:"fileBytes,omitempty"`
CallbackFuncName string `json:"callbackFuncName,omitempty"`
}

type BulkUploadResult struct {
RemotePath string `json:"remotePath,omitempty"`
Success bool `json:"success,omitempty"`
Expand All @@ -426,6 +436,11 @@ type MultiOperationOption struct {
DestName string `json:"destName,omitempty"` // Required only for rename operation
DestPath string `json:"destPath,omitempty"` // Required for copy and move operation`
}
type LiveUploadResult struct {
RemotePath string `json:"remotePath,omitempty"`
Success bool `json:"success,omitempty"`
Error string `json:"error,omitempty"`
}

// MultiOperation - do copy, move, delete and createdir operation together
// ## Inputs
Expand Down Expand Up @@ -686,6 +701,132 @@ func uploadWithJsFuncs(allocationID, remotePath string, readChunkFuncName string
return true, nil
}

// Live Upload Function.
func liveUpload(jsonLiveUploadOptions string) ([]LiveUploadResult, error) {
var options []LiveUploadOption
err := json.Unmarshal([]byte(jsonLiveUploadOptions), &options)
if err != nil {
return nil, err
}

n := len(options)
wait := make(chan LiveUploadResult, 1)

for _, option := range options {
go func(o LiveUploadOption) {
result := LiveUploadResult{
RemotePath: o.RemotePath,
}
defer func() { wait <- result }()
PrintError(o.FileBytes.Buffer)
ok, err := liveUploadWithJsFuncs(o.AllocationID,
o.RemotePath,
o.Encrypt,
o.FileBytes.Buffer,
o.CallbackFuncName)
result.Success = ok
if err != nil {
result.Error = err.Error()
result.Success = false
}

}(option)

}

results := make([]LiveUploadResult, 0, n)
for i := 0; i < n; i++ {
result := <-wait
results = append(results, result)
}

return results, nil
}

// Upload using createChunkedUpload.
func liveUploadWithJsFuncs(allocationID, remotePath string, encrypt bool, fileBytes []byte, callbackFuncName string) (bool, error) {

if len(allocationID) == 0 {
return false, RequiredArg("allocationID")
}

if len(remotePath) == 0 {
return false, RequiredArg("remotePath")
}

allocationObj, err := getAllocation(allocationID)
if err != nil {
PrintError("Error fetching the allocation", err)
return false, err
}

wg := &sync.WaitGroup{}
statusBar := &StatusBar{wg: wg}
if callbackFuncName != "" {
callback := js.Global().Get(callbackFuncName)
statusBar.callback = func(totalBytes, completedBytes int, err string) {
callback.Invoke(totalBytes, completedBytes, err)
}
}
wg.Add(1)
if strings.HasPrefix(remotePath, "/Encrypted") {
encrypt = true
}

fileReader := bytes.NewReader(fileBytes)
fileSize := int64(len(fileBytes))

mimeType, err := zboxutil.GetFileContentType(fileReader)
if err != nil {
return false, err
}

localPath := remotePath

remotePath = zboxutil.RemoteClean(remotePath)
isabs := zboxutil.IsRemoteAbs(remotePath)
if !isabs {
err = errors.New("invalid_path: Path should be valid and absolute")
return false, err
}
remotePath = zboxutil.GetFullRemotePath(localPath, remotePath)

_, fileName := pathutil.Split(remotePath)

fileMeta := sdk.FileMeta{
Path: localPath,
ActualSize: fileSize,
MimeType: mimeType,
RemoteName: fileName,
RemotePath: remotePath,
}

isUpdate := false
isRepair := false
webStreaming := false
ChunkedUpload, err := sdk.CreateChunkedUpload("/", allocationObj, fileMeta, fileReader, isUpdate, isRepair, webStreaming, zboxutil.NewConnectionId(),
sdk.WithEncrypt(encrypt),
sdk.WithStatusCallback(statusBar),
sdk.WithProgressStorer(&chunkedUploadProgressStorer{list: make(map[string]*sdk.UploadProgress)}))
if err != nil {
return false, err
}

err = ChunkedUpload.Start()

if err != nil {
PrintError("Upload failed.", err)
return false, err
}

wg.Wait()
if !statusBar.success {
return false, errors.New("upload failed: unknown")
}

return true, nil
}

// upload upload file
func upload(allocationID, remotePath string, fileBytes, thumbnailBytes []byte, webStreaming, encrypt, isUpdate, isRepair bool, numBlocks int) (*FileCommandResponse, error) {
if len(allocationID) == 0 {
Expand Down
2 changes: 2 additions & 0 deletions wasmsdk/demo/ffmpeg.min.js

Large diffs are not rendered by default.

139 changes: 129 additions & 10 deletions wasmsdk/demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
<script src="https://cdn.jsdelivr.net/gh/golang/[email protected]/misc/wasm/wasm_exec.js"></script>
<script src="zcn.js"></script>


<!-- for ffmpeg.min.js -->
<script src="ffmpeg.min.js"></script>

<!-- for demo -->
<script src="dom.js"></script>

Expand Down Expand Up @@ -100,7 +102,11 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
<fieldset>
<legend>Files</legend>
<button id="btnListFiles">List</button>
<span><input id="inputSelectedFile" type="file" multiple /> <button id="btnUploadFile">Upload</button> </span>
<span><input id="inputSelectedFile" type="file" multiple />
<button id="btnUploadFile">Upload</button>
</span>
<button id="btnLiveUpload"> Live Upload</button>
<button id="btnLiveUploadEnd"> Live Upload End</button>
<button id="btnUploadEncryptFile">EncryptedUpload</button> </span>
[ <button id="btnDownloadFile">Download</button> | <button id="btnDownloadShared">Download with AuthTicket</button>
]
Expand Down Expand Up @@ -141,12 +147,11 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
</div>
</fieldset>


<fieldset>
<legend>Image Viewer</legend>
<img id="viewer" src="image.png" width="600" />
</fieldset>

<script>

window.downloadCallback = function (totalBytes, completedBytes, error) {
Expand Down Expand Up @@ -259,13 +264,13 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
const config = {
datashards: 2,
parityshards: 2,
size: 2 * 1073741824,
expiry: Math.floor(expiry.getTime() / 1000),
size: 1073741824,
expiry: Math.round(new Date().getTime() / 1000) + 2628000,
minReadPrice: 0,
maxReadPrice: 184467440737095516,
maxReadPrice: 10000000000,
minWritePrice: 0,
maxWritePrice: 184467440737095516,
lock: 5000000000
maxWritePrice: 10000000000,
lock: 10450000000
}
try {
const allocation = await goWasm.sdk.createAllocation(config.datashards, config.parityshards, config.size, config.expiry,
Expand Down Expand Up @@ -371,6 +376,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
bindFiles()
})

let playLiveVideo = true
onClick('btnUploadFile', async () => {
const { files } = get('inputSelectedFile')
if (files && files.length > 0) {
Expand All @@ -382,7 +388,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
allocationId: allocationId,
remotePath: `/${file.name}`,
file: file,
thumbnailBytes: await readBytes(file),//only for demo, don't upload original file as thumbnail in production
thumbnailBytes: await readBytes(file), //only for demo, don't upload original file as thumbnail in production
encrypt: false,
webstreaming: false,
isUpdate: false,
Expand All @@ -400,6 +406,119 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
}
})

var streamData = []
let isstreamUploading = false;
var countVideoUploadChunk = 1
const { createFFmpeg, fetchFile } = FFmpeg
const ffmpeg = createFFmpeg({ log: false })

async function uploadStream(data, count) {
isstreamUploading = true;
console.log(`Uploading data "${count}"...`);

setTimeout(async () => {

const inputFileName = `input${count}.webm`;
const outputFileName = `tvshow${count}.ts`;
await ffmpeg.FS(
"writeFile",
inputFileName,
new Uint8Array(await data.data.arrayBuffer(), 0, data.data.size)
);

await ffmpeg.run("-i", inputFileName, outputFileName);
const output = await ffmpeg.FS("readFile", outputFileName);
console.log(`Transcoding "${outputFileName}" completed.`);

// uploading to blobber
const allocationId = getSelectedAllocation()
const objects = []

if (!allocationId) {
alert("please provide allocationId")
}
console.log(output.buffer);
objects.push({
allocationId: allocationId,
remotePath: "/stream/"+outputFileName,
encrypt: false,
fileBytes: output.buffer,
callback: function (totalBytes, completedBytes, error) {
console.log(outputFileName + " " + completedBytes + "/" + totalBytes + " err:" + error)
}
})

goWasm.liveUpload(objects).then((result) => {
console.log(JSON.stringify(result));
});

if (streamData.length > 0) {
const [nextData, nextcount] = streamData.shift();
uploadStream(nextData, nextcount);
} else {
isstreamUploading = false;
}
}, 0);

}
async function initiateUploadStream() {
if (!isstreamUploading && streamData.length > 0) {
const [data, count] = streamData.shift();
uploadStream(data, count);
}
}

function permittedGetUserMedia() {
return !!(navigator.mediaDevices &&
navigator.mediaDevices.getUserMedia);
}

async function processStream(stream) {
const mediaRecorder = new MediaRecorder(stream, {mimeType: 'video/webm;codecs="vp8,opus"'})

mediaRecorder.ondataavailable = async (data) => {
console.log(data.data)
streamData.push([data, countVideoUploadChunk]);
countVideoUploadChunk++;

if (!isstreamUploading) {
initiateUploadStream();
}

if(playLiveVideo) {
processStream(stream)
}
}

mediaRecorder.start()
setTimeout(function() {
mediaRecorder.stop(); // stop recorder after every time-slice
if (!playLiveVideo) {
stream.getTracks().forEach( track => track.stop() );
return
}
}, 10000);

}

onClick('btnLiveUpload', async () => {
if (permittedGetUserMedia()) {
playLiveVideo = true
if (!ffmpeg.isLoaded())
{
await ffmpeg.load()
}
await navigator.mediaDevices.getUserMedia({
video: true,
audio: true
}).then((stream) => processStream(stream));
}
})

onClick('btnLiveUploadEnd', async () => {
playLiveVideo = false
})

onClick('btnShare', async () => {
// change these values according to your wallet (obtained from zbox `zbox getwallet`)
let clientID = "7838eedbf2add6dc590a5ee95643e9a872ec7a9ae6c5efc452f2fa9c6971eb3a"
Expand Down
Loading