Skip to content

Commit

Permalink
feat: Persistant Task
Browse files Browse the repository at this point in the history
  • Loading branch information
itsHenry35 committed Aug 6, 2024
1 parent af9c6af commit 9770a5c
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 56 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
github.com/u2takey/ffmpeg-go v0.5.0
github.com/upyun/go-sdk/v3 v3.0.4
github.com/winfsp/cgofuse v1.5.1-0.20230130140708-f87f5db493b5
github.com/xhofe/tache v0.1.1
github.com/xhofe/tache v0.1.2
github.com/xhofe/wopan-sdk-go v0.1.3
github.com/zzzhr1990/go-common-entity v0.0.0-20221216044934-fd1c571e3a22
golang.org/x/crypto v0.25.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,8 @@ github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25 h1:eDfebW/yfq9DtG9RO3K
github.com/xhofe/gsync v0.0.0-20230917091818-2111ceb38a25/go.mod h1:fH4oNm5F9NfI5dLi0oIMtsLNKQOirUDbEMCIBb/7SU0=
github.com/xhofe/tache v0.1.1 h1:O5QY4cVjIGELx3UGh6LbVAc18MWGXgRNQjMt72x6w/8=
github.com/xhofe/tache v0.1.1/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/tache v0.1.2 h1:pHrXlrWcbTb4G7hVUDW7Rc+YTUnLJvnLBrdktVE1Fqg=
github.com/xhofe/tache v0.1.2/go.mod h1:iKumPFvywf30FRpAHHCt64G0JHLMzT0K+wyGedHsmTQ=
github.com/xhofe/wopan-sdk-go v0.1.3 h1:J58X6v+n25ewBZjb05pKOr7AWGohb+Rdll4CThGh6+A=
github.com/xhofe/wopan-sdk-go v0.1.3/go.mod h1:dcY9yA28fnaoZPnXZiVTFSkcd7GnIPTpTIIlfSI5z5Q=
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no=
Expand Down
12 changes: 7 additions & 5 deletions internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ func InitConfig() {
}
conf.Conf.TempDir = absPath
}
err := os.RemoveAll(filepath.Join(conf.Conf.TempDir))
if err != nil {
log.Errorln("failed delete temp file:", err)
}
err = os.MkdirAll(conf.Conf.TempDir, 0o777)
err := os.MkdirAll(conf.Conf.TempDir, 0o777)
if err != nil {
log.Fatalf("create temp dir error: %+v", err)
}
Expand Down Expand Up @@ -104,3 +100,9 @@ func initURL() {
}
conf.URL = u
}

func CleanTempDir() {
if err := os.RemoveAll(conf.Conf.TempDir); err != nil {
log.Errorln("failed delete temp file: ", err)
}
}
1 change: 1 addition & 0 deletions internal/bootstrap/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import "github.com/alist-org/alist/v3/cmd/flags"
func InitData() {
initUser()
initSettings()
initTasks()
if flags.Dev {
initDevData()
initDevDo()
Expand Down
29 changes: 29 additions & 0 deletions internal/bootstrap/data/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package data

import (
"github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/model"
)

var initialTaskItems []model.TaskItem

func initTasks() {
InitialTasks()

for i := range initialTaskItems {
item := &initialTaskItems[i]
taskitem, _ := db.GetTaskDataByType(item.Key)
if taskitem == nil {
db.CreateTaskData(item)
}
}
}

func InitialTasks() []model.TaskItem {
initialTaskItems = []model.TaskItem{
{Key: "copy", PersistData: "[]"},
{Key: "download", PersistData: "[]"},
{Key: "transfer", PersistData: "[]"},
}
return initialTaskItems
}
12 changes: 8 additions & 4 deletions internal/bootstrap/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package bootstrap

import (
"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/db"
"github.com/alist-org/alist/v3/internal/fs"
"github.com/alist-org/alist/v3/internal/offline_download/tool"
"github.com/xhofe/tache"
)

func InitTaskManager() {
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry))
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
fs.UploadTaskManager = tache.NewManager[*fs.UploadTask](tache.WithWorks(conf.Conf.Tasks.Upload.Workers), tache.WithMaxRetry(conf.Conf.Tasks.Upload.MaxRetry)) //upload will not support persist
fs.CopyTaskManager = tache.NewManager[*fs.CopyTask](tache.WithWorks(conf.Conf.Tasks.Copy.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant), db.UpdateTaskDataFunc("copy", conf.Conf.Tasks.Copy.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Copy.MaxRetry))
tool.DownloadTaskManager = tache.NewManager[*tool.DownloadTask](tache.WithWorks(conf.Conf.Tasks.Download.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant), db.UpdateTaskDataFunc("download", conf.Conf.Tasks.Download.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Download.MaxRetry))
tool.TransferTaskManager = tache.NewManager[*tool.TransferTask](tache.WithWorks(conf.Conf.Tasks.Transfer.Workers), tache.WithPersistFunction(db.GetTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant), db.UpdateTaskDataFunc("transfer", conf.Conf.Tasks.Transfer.TaskPersistant)), tache.WithMaxRetry(conf.Conf.Tasks.Transfer.MaxRetry))
if len(tool.TransferTaskManager.GetAll()) == 0 { //prevent offline downloaded files from being deleted
CleanTempDir()
}
}
20 changes: 12 additions & 8 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ type LogConfig struct {
}

type TaskConfig struct {
Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
Workers int `json:"workers" env:"WORKERS"`
MaxRetry int `json:"max_retry" env:"MAX_RETRY"`
TaskPersistant bool `json:"task_persistant" env:"TASK_PERSISTANT"`
}

type TasksConfig struct {
Expand Down Expand Up @@ -130,19 +131,22 @@ func DefaultConfig() *Config {
TlsInsecureSkipVerify: true,
Tasks: TasksConfig{
Download: TaskConfig{
Workers: 5,
MaxRetry: 1,
Workers: 5,
MaxRetry: 1,
TaskPersistant: true,
},
Transfer: TaskConfig{
Workers: 5,
MaxRetry: 2,
Workers: 5,
MaxRetry: 2,
TaskPersistant: true,
},
Upload: TaskConfig{
Workers: 5,
},
Copy: TaskConfig{
Workers: 5,
MaxRetry: 2,
Workers: 5,
MaxRetry: 2,
TaskPersistant: true,
},
},
Cors: Cors{
Expand Down
2 changes: 1 addition & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var db *gorm.DB

func Init(d *gorm.DB) {
db = d
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode))
err := AutoMigrate(new(model.Storage), new(model.User), new(model.Meta), new(model.SettingItem), new(model.SearchNode), new(model.TaskItem))
if err != nil {
log.Fatalf("failed migrate database: %s", err.Error())
}
Expand Down
48 changes: 48 additions & 0 deletions internal/db/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package db

import (
"github.com/alist-org/alist/v3/internal/model"
"github.com/pkg/errors"
)

func GetTaskDataByType(type_s string) (*model.TaskItem, error) {
task := model.TaskItem{Key: type_s}
if err := db.Where(task).First(&task).Error; err != nil {
return nil, errors.Wrapf(err, "failed find task")
}
return &task, nil
}

func UpdateTaskData(t *model.TaskItem) error {
return errors.WithStack(db.Model(&model.TaskItem{}).Where("key = ?", t.Key).Update("persist_data", t.PersistData).Error)
}

func CreateTaskData(t *model.TaskItem) error {
return errors.WithStack(db.Create(t).Error)
}

func GetTaskDataFunc(type_s string, enabled bool) func() ([]byte, error) {
if !enabled {
return nil
}
task, err := GetTaskDataByType(type_s)
if err != nil {
return nil
}
return func() ([]byte, error) {
return []byte(task.PersistData), nil
}
}

func UpdateTaskDataFunc(type_s string, enabled bool) func([]byte) error {
if !enabled {
return nil
}
return func(data []byte) error {
s := string(data)
if s == "null" || s == "" {
s = "[]"
}
return UpdateTaskData(&model.TaskItem{Key: type_s, PersistData: s})
}
}
50 changes: 34 additions & 16 deletions internal/fs/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package fs
import (
"context"
"fmt"
"net/http"
stdpath "path"

"github.com/alist-org/alist/v3/internal/conf"
"github.com/alist-org/alist/v3/internal/driver"
"github.com/alist-org/alist/v3/internal/model"
Expand All @@ -11,28 +14,39 @@ import (
"github.com/alist-org/alist/v3/pkg/utils"
"github.com/pkg/errors"
"github.com/xhofe/tache"
"net/http"
stdpath "path"
)

type CopyTask struct {
tache.Base
Status string `json:"status"`
srcStorage, dstStorage driver.Driver
srcObjPath, dstDirPath string
Status string `json:"-"` //don't save status to save space
SrcObjPath string `json:"src_path"`
DstDirPath string `json:"dst_path"`
srcStorage driver.Driver `json:"-"`
dstStorage driver.Driver `json:"-"`
SrcStorageMp string `json:"src_storage_mp"`
DstStorageMp string `json:"dst_storage_mp"`
}

func (t *CopyTask) GetName() string {
return fmt.Sprintf("copy [%s](%s) to [%s](%s)",
t.srcStorage.GetStorage().MountPath, t.srcObjPath, t.dstStorage.GetStorage().MountPath, t.dstDirPath)
return fmt.Sprintf("copy [%s](%s) to [%s](%s)", t.SrcStorageMp, t.SrcObjPath, t.DstStorageMp, t.DstDirPath)
}

func (t *CopyTask) GetStatus() string {
return t.Status
}

func (t *CopyTask) Run() error {
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.srcObjPath, t.dstDirPath)
var err error
if t.srcStorage == nil {
t.srcStorage, err = op.GetStorageByMountPath(t.SrcStorageMp)
}
if t.dstStorage == nil {
t.dstStorage, err = op.GetStorageByMountPath(t.DstStorageMp)
}
if err != nil {
return errors.WithMessage(err, "failed get storage")
}
return copyBetween2Storages(t, t.srcStorage, t.dstStorage, t.SrcObjPath, t.DstDirPath)
}

var CopyTaskManager *tache.Manager[*CopyTask]
Expand Down Expand Up @@ -79,10 +93,12 @@ func _copy(ctx context.Context, srcObjPath, dstDirPath string, lazyCache ...bool
}
// not in the same storage
t := &CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
srcObjPath: srcObjActualPath,
dstDirPath: dstDirActualPath,
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjActualPath,
DstDirPath: dstDirActualPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
}
CopyTaskManager.Add(t)
return t, nil
Expand All @@ -107,10 +123,12 @@ func copyBetween2Storages(t *CopyTask, srcStorage, dstStorage driver.Driver, src
srcObjPath := stdpath.Join(srcObjPath, obj.GetName())
dstObjPath := stdpath.Join(dstDirPath, srcObj.GetName())
CopyTaskManager.Add(&CopyTask{
srcStorage: srcStorage,
dstStorage: dstStorage,
srcObjPath: srcObjPath,
dstDirPath: dstObjPath,
srcStorage: srcStorage,
dstStorage: dstStorage,
SrcObjPath: srcObjPath,
DstDirPath: dstObjPath,
SrcStorageMp: srcStorage.GetStorage().MountPath,
DstStorageMp: dstStorage.GetStorage().MountPath,
})
}
t.Status = "src object is dir, added all copy tasks of objs"
Expand Down
6 changes: 6 additions & 0 deletions internal/model/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package model

type TaskItem struct {
Key string `json:"key"`
PersistData string `gorm:"type:text" json:"persist_data"`
}
1 change: 1 addition & 0 deletions internal/offline_download/tool/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func AddURL(ctx context.Context, args *AddURLArgs) (tache.TaskWithInfo, error) {
DstDirPath: args.DstDirPath,
TempDir: tempDir,
DeletePolicy: deletePolicy,
Toolname: args.Tool,
tool: tool,
}
DownloadTaskManager.Add(t)
Expand Down
30 changes: 19 additions & 11 deletions internal/offline_download/tool/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,26 @@ import (

type DownloadTask struct {
tache.Base
Url string `json:"url"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"`

Status string `json:"status"`
Signal chan int `json:"-"`
GID string `json:"-"`
Url string `json:"url"`
DstDirPath string `json:"dst_dir_path"`
TempDir string `json:"temp_dir"`
DeletePolicy DeletePolicy `json:"delete_policy"`
Toolname string `json:"toolname"`
Status string `json:"-"`
Signal chan int `json:"-"`
GID string `json:"-"`
tool Tool
callStatusRetried int
}

func (t *DownloadTask) Run() error {
if t.tool == nil {
tool, err := Tools.Get(t.Toolname)
if err != nil {
return errors.WithMessage(err, "failed get tool")
}
t.tool = tool
}
if err := t.tool.Run(t); !errs.IsNotSupportError(err) {
if err == nil {
return t.Complete()
Expand Down Expand Up @@ -142,9 +149,10 @@ func (t *DownloadTask) Complete() error {
file := files[i]
TransferTaskManager.Add(&TransferTask{
file: file,
dstDirPath: t.DstDirPath,
tempDir: t.TempDir,
deletePolicy: t.DeletePolicy,
DstDirPath: t.DstDirPath,
TempDir: t.TempDir,
DeletePolicy: t.DeletePolicy,
FileDir: file.Path,
})
}
return nil
Expand Down
Loading

0 comments on commit 9770a5c

Please sign in to comment.