From 9af417e49298d092a5e38b497dfbdb0175b9987a Mon Sep 17 00:00:00 2001 From: xrgzs Date: Mon, 9 Dec 2024 19:22:46 +0800 Subject: [PATCH] fix(cloudreve): support upload to remote and OneDrive storage (#6882) - Add support for remote and OneDrive storage types - Implement new upload methods for different storage types - Update driver to handle various storage policies - Add error handling and session cleanup for failed uploads --- drivers/cloudreve/driver.go | 73 +++++++++++++++++---------- drivers/cloudreve/types.go | 8 +-- drivers/cloudreve/util.go | 99 +++++++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+), 30 deletions(-) diff --git a/drivers/cloudreve/driver.go b/drivers/cloudreve/driver.go index ec0f6ef2b29..8fc117aca2c 100644 --- a/drivers/cloudreve/driver.go +++ b/drivers/cloudreve/driver.go @@ -10,6 +10,7 @@ import ( "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/driver" + "github.com/alist-org/alist/v3/internal/errs" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/pkg/utils" "github.com/go-resty/resty/v2" @@ -134,6 +135,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File if io.ReadCloser(stream) == http.NoBody { return d.create(ctx, dstDir, stream) } + + // 获取存储策略 var r DirectoryResp err := d.request(http.MethodGet, "/directory"+dstDir.GetPath(), nil, &r) if err != nil { @@ -146,6 +149,8 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File "policy_id": r.Policy.Id, "last_modified": stream.ModTime().Unix(), } + + // 获取上传会话信息 var u UploadInfo err = d.request(http.MethodPut, "/file/upload", func(req *resty.Request) { req.SetBody(uploadBody) @@ -153,36 +158,50 @@ func (d *Cloudreve) Put(ctx context.Context, dstDir model.Obj, stream model.File if err != nil { return err } - var chunkSize = u.ChunkSize - var buf []byte - var chunk int - for { - var n int - buf = make([]byte, chunkSize) - n, err = io.ReadAtLeast(stream, buf, chunkSize) - if err != nil && err != io.ErrUnexpectedEOF { - if err == io.EOF { - return nil - } - return err - } - if n == 0 { - break - } - buf = buf[:n] - err = d.request(http.MethodPost, "/file/upload/"+u.SessionID+"/"+strconv.Itoa(chunk), func(req *resty.Request) { - req.SetHeader("Content-Type", "application/octet-stream") - req.SetHeader("Content-Length", strconv.Itoa(n)) - req.SetBody(buf) - }, nil) - if err != nil { - break + // 根据存储方式选择分片上传的方法 + switch r.Policy.Type { + case "onedrive": + err = d.upOneDrive(ctx, stream, u, up) + case "remote": // 从机存储 + err = d.upRemote(ctx, stream, u, up) + case "local": // 本机存储 + var chunkSize = u.ChunkSize + var buf []byte + var chunk int + for { + var n int + buf = make([]byte, chunkSize) + n, err = io.ReadAtLeast(stream, buf, chunkSize) + if err != nil && err != io.ErrUnexpectedEOF { + if err == io.EOF { + return nil + } + return err + } + if n == 0 { + break + } + buf = buf[:n] + err = d.request(http.MethodPost, "/file/upload/"+u.SessionID+"/"+strconv.Itoa(chunk), func(req *resty.Request) { + req.SetHeader("Content-Type", "application/octet-stream") + req.SetHeader("Content-Length", strconv.Itoa(n)) + req.SetBody(buf) + }, nil) + if err != nil { + break + } + chunk++ } - chunk++ - + default: + err = errs.NotImplement } - return err + if err != nil { + // 删除失败的会话 + err = d.request(http.MethodDelete, "/file/upload/"+u.SessionID, nil, nil) + return err + } + return nil } func (d *Cloudreve) create(ctx context.Context, dir model.Obj, file model.Obj) error { diff --git a/drivers/cloudreve/types.go b/drivers/cloudreve/types.go index 241d993ebb8..a7c3919e8a9 100644 --- a/drivers/cloudreve/types.go +++ b/drivers/cloudreve/types.go @@ -21,9 +21,11 @@ type Policy struct { } type UploadInfo struct { - SessionID string `json:"sessionID"` - ChunkSize int `json:"chunkSize"` - Expires int `json:"expires"` + SessionID string `json:"sessionID"` + ChunkSize int `json:"chunkSize"` + Expires int `json:"expires"` + UploadURLs []string `json:"uploadURLs"` + Credential string `json:"credential,omitempty"` } type DirectoryResp struct { diff --git a/drivers/cloudreve/util.go b/drivers/cloudreve/util.go index 284e3289dee..b5b71153e12 100644 --- a/drivers/cloudreve/util.go +++ b/drivers/cloudreve/util.go @@ -1,16 +1,23 @@ package cloudreve import ( + "bytes" + "context" "encoding/base64" "errors" + "fmt" + "io" "net/http" + "strconv" "strings" "github.com/alist-org/alist/v3/drivers/base" "github.com/alist-org/alist/v3/internal/conf" + "github.com/alist-org/alist/v3/internal/driver" "github.com/alist-org/alist/v3/internal/model" "github.com/alist-org/alist/v3/internal/setting" "github.com/alist-org/alist/v3/pkg/cookie" + "github.com/alist-org/alist/v3/pkg/utils" "github.com/go-resty/resty/v2" json "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go" @@ -172,3 +179,95 @@ func (d *Cloudreve) GetThumb(file Object) (model.Thumbnail, error) { Thumbnail: resp.Header().Get("Location"), }, nil } + +func (d *Cloudreve) upRemote(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error { + uploadUrl := u.UploadURLs[0] + credential := u.Credential + var finish int64 = 0 + var chunk int = 0 + DEFAULT := int64(u.ChunkSize) + for finish < stream.GetSize() { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + utils.Log.Debugf("[Cloudreve-Remote] upload: %d", finish) + var byteSize = DEFAULT + left := stream.GetSize() - finish + if left < DEFAULT { + byteSize = left + } + byteData := make([]byte, byteSize) + n, err := io.ReadFull(stream, byteData) + utils.Log.Debug(err, n) + if err != nil { + return err + } + req, err := http.NewRequest("POST", uploadUrl+"?chunk="+strconv.Itoa(chunk), bytes.NewBuffer(byteData)) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.Header.Set("Content-Length", strconv.Itoa(int(byteSize))) + req.Header.Set("Authorization", fmt.Sprint(credential)) + finish += byteSize + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + res.Body.Close() + up(float64(finish) * 100 / float64(stream.GetSize())) + chunk++ + } + return nil +} + +func (d *Cloudreve) upOneDrive(ctx context.Context, stream model.FileStreamer, u UploadInfo, up driver.UpdateProgress) error { + uploadUrl := u.UploadURLs[0] + var finish int64 = 0 + DEFAULT := int64(u.ChunkSize) + for finish < stream.GetSize() { + if utils.IsCanceled(ctx) { + return ctx.Err() + } + utils.Log.Debugf("[Cloudreve-OneDrive] upload: %d", finish) + var byteSize = DEFAULT + left := stream.GetSize() - finish + if left < DEFAULT { + byteSize = left + } + byteData := make([]byte, byteSize) + n, err := io.ReadFull(stream, byteData) + utils.Log.Debug(err, n) + if err != nil { + return err + } + req, err := http.NewRequest("PUT", uploadUrl, bytes.NewBuffer(byteData)) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.Header.Set("Content-Length", strconv.Itoa(int(byteSize))) + req.Header.Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", finish, finish+byteSize-1, stream.GetSize())) + finish += byteSize + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + // https://learn.microsoft.com/zh-cn/onedrive/developer/rest-api/api/driveitem_createuploadsession + if res.StatusCode != 201 && res.StatusCode != 202 && res.StatusCode != 200 { + data, _ := io.ReadAll(res.Body) + res.Body.Close() + return errors.New(string(data)) + } + res.Body.Close() + up(float64(finish) * 100 / float64(stream.GetSize())) + } + // 上传成功发送回调请求 + err := d.request(http.MethodPost, "/callback/onedrive/finish/"+u.SessionID, func(req *resty.Request) { + req.SetBody("{}") + }, nil) + if err != nil { + return err + } + return nil +}