Skip to content

Commit

Permalink
feat: add event worker for the vfs
Browse files Browse the repository at this point in the history
Signed-off-by: saltbo <[email protected]>
  • Loading branch information
saltbo committed Jul 30, 2023
1 parent 0525f6a commit 6e4eadc
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 18 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,12 @@ require (
github.com/pelletier/go-toml v1.2.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.8.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/saltbo/gopkg/sliceutil v0.0.0-20221024031008-7af9787873bd // indirect
github.com/satori/go.uuid v1.2.0 // indirect
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.1.2 // indirect
github.com/spf13/cast v1.3.0 // indirect
github.com/spf13/jwalterweatherman v1.0.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,14 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/robfig/cron v1.1.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8=
github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/saltbo/gopkg v0.0.0-20230725153125-0d57fc71396d h1:mt7ohLvhxC1dlsVsO4k29S/zmFxpa7ZOf2WnA8N2MF4=
Expand Down Expand Up @@ -406,6 +409,8 @@ github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
Expand Down
16 changes: 13 additions & 3 deletions internal/app/repo/matter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type MatterListOption struct {
Dir string `form:"dir"`
Type string `form:"type"`
Keyword string `form:"kw"`
Draft bool
}

type MatterFindWithOption struct {
Expand Down Expand Up @@ -111,7 +112,7 @@ func (db *MatterDBQuery) FindAll(ctx context.Context, opts *MatterListOption) ([

if opts.Keyword != "" {
conds = append(conds, db.q.Matter.Name.Like(fmt.Sprintf("%%%s%%", opts.Keyword)))
} else {
} else if !opts.Draft {
conds = append(conds, db.q.Matter.Parent.Eq(opts.Dir))
}

Expand All @@ -121,8 +122,17 @@ func (db *MatterDBQuery) FindAll(ctx context.Context, opts *MatterListOption) ([
conds = append(conds, db.q.Matter.Type.Like(fmt.Sprintf("%%%s%%", opts.Type)))
}

conds = append(conds, db.q.Matter.UploadedAt.IsNotNull())
return db.q.Matter.Where(conds...).Order(db.q.Matter.DirType.Desc(), db.q.Matter.Id.Desc()).FindByPage(opts.Offset, opts.Limit)
if !opts.Draft {
conds = append(conds, db.q.Matter.UploadedAt.IsNotNull())
}

q := db.q.Matter.WithContext(ctx).Where(conds...).Order(db.q.Matter.DirType.Desc(), db.q.Matter.Id.Desc())
if opts.Limit != 0 {
return q.FindByPage(opts.Offset, opts.Limit)
}

matters, err := q.Find()
return matters, int64(len(matters)), err
}

func (db *MatterDBQuery) Create(ctx context.Context, m *entity.Matter) error {
Expand Down
11 changes: 10 additions & 1 deletion internal/app/usecase/vfs/vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@ type Vfs struct {
matterRepo repo.Matter
recycleBinRepo repo.RecycleBin
uploader uploader.Uploader
eventWorker *EventWorker
}

func NewVfs(matterRepo repo.Matter, recycleBinRepo repo.RecycleBin, uploader uploader.Uploader) *Vfs {
return &Vfs{matterRepo: matterRepo, recycleBinRepo: recycleBinRepo, uploader: uploader}
vfs := &Vfs{matterRepo: matterRepo, recycleBinRepo: recycleBinRepo, uploader: uploader, eventWorker: NewWorker()}
vfs.eventWorker.registerEventHandler(EventActionCreated, vfs.matterCreatedEventHandler)
vfs.eventWorker.registerEventHandler(EventActionDeleted, vfs.matterDeletedEventHandler)
vfs.cleanExpiredMatters(context.Background())
go vfs.eventWorker.Run()
return vfs
}

func (v *Vfs) Create(ctx context.Context, m *entity.Matter) error {
if !m.IsDir() {
if err := v.uploader.CreateUploadURL(ctx, m); err != nil {
return err
}

defer v.eventWorker.sendEvent(EventActionCreated, m)
}

return v.matterRepo.Create(ctx, m)
Expand Down Expand Up @@ -96,6 +104,7 @@ func (v *Vfs) Delete(ctx context.Context, alias string) error {
return err
}

defer v.eventWorker.sendEvent(EventActionDeleted, m)
rb := m.BuildRecycleBinItem()
return v.recycleBinRepo.Create(ctx, rb)
}
49 changes: 49 additions & 0 deletions internal/app/usecase/vfs/vfs_jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package vfs

import (
"context"
"log"
"time"

"github.com/robfig/cron"

Check failure on line 8 in internal/app/usecase/vfs/vfs_jobs.go

View workflow job for this annotation

GitHub Actions / Test

missing go.sum entry for module providing package github.com/robfig/cron (imported by github.com/saltbo/zpan/internal/app/usecase/vfs); to add:
"github.com/saltbo/zpan/internal/app/entity"
"github.com/saltbo/zpan/internal/app/repo"
)

func (v *Vfs) matterCreatedEventHandler(matter *entity.Matter) error {
c := cron.New()
c.Start()
return c.AddFunc("@every 10s", func() {
ctx := context.Background()
if err := v.uploader.UploadDone(ctx, matter); err != nil {
return
}

c.Stop()
})
}

func (v *Vfs) matterDeletedEventHandler(matter *entity.Matter) error {
return nil
}

func (v *Vfs) cleanExpiredMatters(ctx context.Context) {
matters, _, err := v.matterRepo.FindAll(ctx, &repo.MatterListOption{Draft: true})
if err != nil {
log.Printf("error getting the files of not uploaded: %s", err)
return
}

for _, matter := range matters {
if time.Since(matter.CreatedAt) > time.Hour*24 {
continue
}

if err := v.matterRepo.Delete(ctx, matter.Id); err != nil {
log.Printf("error deleting the file %s: %s", matter.FullPath(), err)
return
}

log.Printf("deleted the file: %s", matter.FullPath())
}
}
54 changes: 41 additions & 13 deletions internal/app/usecase/vfs/worker.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,54 @@
package vfs

import (
"github.com/saltbo/zpan/internal/app/repo"
"github.com/saltbo/zpan/internal/app/entity"
"github.com/sourcegraph/conc/pool"
)

type Worker struct {
matterRepo repo.Matter
const (
EventActionCreated EventAction = "Created"
EventActionDeleted EventAction = "Deleted"
)

type (
EventAction string
EventHandler func(matter *entity.Matter) error
)

type Event struct {
Action EventAction
Matter *entity.Matter
}

func NewWorker() *Worker {
return &Worker{}
type EventWorker struct {
eventChan chan Event
eventReg map[EventAction]EventHandler
}

func (w *Worker) Start() {
func NewWorker() *EventWorker {
return &EventWorker{
eventChan: make(chan Event),
eventReg: make(map[EventAction]EventHandler),
}
}

func (w *EventWorker) Run() {
p := pool.New().WithMaxGoroutines(10)
for elem := range w.eventChan {
eventHandle := w.eventReg[elem.Action]
p.Go(func() {
if err := eventHandle(elem.Matter); err != nil {
return
}
})
}
p.Wait()
}

func (w *EventWorker) registerEventHandler(action EventAction, h EventHandler) {
w.eventReg[action] = h
}

func (w *Worker) cleanExpireMatters() {
// matters, total, err := w.matterRepo.FindAll(context.Background(), &repo.MatterListOption{})
// if err != nil {
// return
// }
//
// w.matterRepo.Delete()
func (w *EventWorker) sendEvent(action EventAction, m *entity.Matter) {
w.eventChan <- Event{Action: action, Matter: m}
}

0 comments on commit 6e4eadc

Please sign in to comment.