From 9d507dfad04f1f1ac9ccae8f673e936e820a6027 Mon Sep 17 00:00:00 2001 From: xrgzs Date: Fri, 20 Dec 2024 14:26:18 +0800 Subject: [PATCH] fix(139): handle upload file conflicts --- drivers/139/driver.go | 191 ++++++++++++++++++++++++++++-------------- drivers/139/types.go | 1 + 2 files changed, 131 insertions(+), 61 deletions(-) diff --git a/drivers/139/driver.go b/drivers/139/driver.go index 8862983ce5e..dd154efe42a 100644 --- a/drivers/139/driver.go +++ b/drivers/139/driver.go @@ -552,7 +552,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr firstPartInfos = firstPartInfos[:100] } - // 获取上传信息和前100个分片的上传地址 + // 创建任务,获取上传信息和前100个分片的上传地址 data := base.Json{ "contentHash": fullHash, "contentHashAlgorithm": "SHA256", @@ -572,87 +572,156 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr return err } - if resp.Data.Exist || resp.Data.RapidUpload { + // 判断文件是否已存在 + // resp.Data.Exist: true 已存在同名文件且校验相同,云端不会重复增加文件,无需手动处理冲突 + if resp.Data.Exist { return nil } - uploadPartInfos := resp.Data.PartInfos + // 判断文件是否支持快传 + // resp.Data.RapidUpload: true 支持快传,但此处直接检测是否返回分片的上传地址 + // 快传的情况下同样需要手动处理冲突 + if resp.Data.PartInfos != nil { + // 读取前100个分片的上传地址 + uploadPartInfos := resp.Data.PartInfos + + // 获取后续分片的上传地址 + for i := 101; i < len(partInfos); i += 100 { + end := i + 100 + if end > len(partInfos) { + end = len(partInfos) + } + batchPartInfos := partInfos[i:end] + + moredata := base.Json{ + "fileId": resp.Data.FileId, + "uploadId": resp.Data.UploadId, + "partInfos": batchPartInfos, + "commonAccountInfo": base.Json{ + "account": d.Account, + "accountType": 1, + }, + } + pathname := "/hcy/file/getUploadUrl" + var moreresp PersonalUploadUrlResp + _, err = d.personalPost(pathname, moredata, &moreresp) + if err != nil { + return err + } + uploadPartInfos = append(uploadPartInfos, moreresp.Data.PartInfos...) + } - // 获取后续分片的上传地址 - for i := 101; i < len(partInfos); i += 100 { - end := i + 100 - if end > len(partInfos) { - end = len(partInfos) + // Progress + p := driver.NewProgress(stream.GetSize(), up) + + // 上传所有分片 + for _, uploadPartInfo := range uploadPartInfos { + index := uploadPartInfo.PartNumber - 1 + partSize := partInfos[index].PartSize + log.Debugf("[139] uploading part %+v/%+v", index, len(uploadPartInfos)) + limitReader := io.LimitReader(stream, partSize) + + // Update Progress + r := io.TeeReader(limitReader, p) + + req, err := http.NewRequest("PUT", uploadPartInfo.UploadUrl, r) + if err != nil { + return err + } + req = req.WithContext(ctx) + req.Header.Set("Content-Type", "application/octet-stream") + req.Header.Set("Content-Length", fmt.Sprint(partSize)) + req.Header.Set("Origin", "https://yun.139.com") + req.Header.Set("Referer", "https://yun.139.com/") + req.ContentLength = partSize + + res, err := base.HttpClient.Do(req) + if err != nil { + return err + } + _ = res.Body.Close() + log.Debugf("[139] uploaded: %+v", res) + if res.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", res.StatusCode) + } } - batchPartInfos := partInfos[i:end] - moredata := base.Json{ - "fileId": resp.Data.FileId, - "uploadId": resp.Data.UploadId, - "partInfos": batchPartInfos, - "commonAccountInfo": base.Json{ - "account": d.Account, - "accountType": 1, - }, + data = base.Json{ + "contentHash": fullHash, + "contentHashAlgorithm": "SHA256", + "fileId": resp.Data.FileId, + "uploadId": resp.Data.UploadId, } - pathname := "/hcy/file/getUploadUrl" - var moreresp PersonalUploadUrlResp - _, err = d.personalPost(pathname, moredata, &moreresp) + _, err = d.personalPost("/hcy/file/complete", data, nil) if err != nil { return err } - uploadPartInfos = append(uploadPartInfos, moreresp.Data.PartInfos...) } - // Progress - p := driver.NewProgress(stream.GetSize(), up) - - // 上传所有分片 - for _, uploadPartInfo := range uploadPartInfos { - index := uploadPartInfo.PartNumber - 1 - partSize := partInfos[index].PartSize - log.Debugf("[139] uploading part %+v/%+v", index, len(uploadPartInfos)) - limitReader := io.LimitReader(stream, partSize) - - // Update Progress - r := io.TeeReader(limitReader, p) - - req, err := http.NewRequest("PUT", uploadPartInfo.UploadUrl, r) + // 处理冲突 + if resp.Data.FileName != stream.GetName() { + log.Debugf("[139] conflict detected: %s != %s", resp.Data.FileName, stream.GetName()) + // 给服务器一定时间处理数据,避免无法刷新文件列表 + time.Sleep(time.Millisecond * 500) + // 刷新并获取文件列表 + files, err := d.List(ctx, dstDir, model.ListArgs{Refresh: true}) if err != nil { return err } - req = req.WithContext(ctx) - req.Header.Set("Content-Type", "application/octet-stream") - req.Header.Set("Content-Length", fmt.Sprint(partSize)) - req.Header.Set("Origin", "https://yun.139.com") - req.Header.Set("Referer", "https://yun.139.com/") - req.ContentLength = partSize - - res, err := base.HttpClient.Do(req) - if err != nil { - return err + // 删除旧文件 + for _, file := range files { + if file.GetName() == stream.GetName() { + log.Debugf("[139] conflict: removing old: %s", file.GetName()) + // 删除前重命名旧文件,避免仍旧冲突 + err = d.Rename(ctx, file, stream.GetName()+random.String(4)) + if err != nil { + return err + } + err = d.Remove(ctx, file) + if err != nil { + return err + } + break + } } - _ = res.Body.Close() - log.Debugf("[139] uploaded: %+v", res) - if res.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code: %d", res.StatusCode) + // 重命名新文件 + for _, file := range files { + if file.GetName() == resp.Data.FileName { + log.Debugf("[139] conflict: renaming new: %s => %s", file.GetName(), stream.GetName()) + err = d.Rename(ctx, file, stream.GetName()) + if err != nil { + return err + } + break + } } } - - data = base.Json{ - "contentHash": fullHash, - "contentHashAlgorithm": "SHA256", - "fileId": resp.Data.FileId, - "uploadId": resp.Data.UploadId, - } - _, err = d.personalPost("/hcy/file/complete", data, nil) - if err != nil { - return err - } return nil case MetaPersonal: fallthrough case MetaFamily: + // 处理冲突 + // 获取文件列表 + files, err := d.List(ctx, dstDir, model.ListArgs{}) + if err != nil { + return err + } + // 删除旧文件 + for _, file := range files { + if file.GetName() == stream.GetName() { + log.Debugf("[139] conflict: removing old: %s", file.GetName()) + // 删除前重命名旧文件,避免仍旧冲突 + err = d.Rename(ctx, file, stream.GetName()+random.String(4)) + if err != nil { + return err + } + err = d.Remove(ctx, file) + if err != nil { + return err + } + break + } + } data := base.Json{ "manualRename": 2, "operation": 0, @@ -688,7 +757,7 @@ func (d *Yun139) Put(ctx context.Context, dstDir model.Obj, stream model.FileStr pathname = "/orchestration/familyCloud-rebuild/content/v1.0/getFileUploadURL" } var resp UploadResp - _, err := d.post(pathname, data, &resp) + _, err = d.post(pathname, data, &resp) if err != nil { return err } diff --git a/drivers/139/types.go b/drivers/139/types.go index c34cba0388b..ac7079d8d18 100644 --- a/drivers/139/types.go +++ b/drivers/139/types.go @@ -261,6 +261,7 @@ type PersonalUploadResp struct { BaseResp Data struct { FileId string `json:"fileId"` + FileName string `json:"fileName"` PartInfos []PersonalPartInfo `json:"partInfos"` Exist bool `json:"exist"` RapidUpload bool `json:"rapidUpload"`