Skip to content

Commit

Permalink
Merge branch 'master' of github.com:douyu/juno-agent
Browse files Browse the repository at this point in the history
  • Loading branch information
kl7sn committed Sep 18, 2020
2 parents 9a6c6a3 + 0165ece commit a75cf1b
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 208 deletions.
9 changes: 0 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ github.com/apache/rocketmq-client-go v0.0.0-20191211114916-85ee94b43cef h1:O9vKN
github.com/apache/rocketmq-client-go v0.0.0-20191211114916-85ee94b43cef/go.mod h1:u76Qs5/j7mpSrin0NBm7xFotMYm1sH1uP4mWyfBVqnc=
github.com/apache/rocketmq-client-go/v2 v2.0.0-rc2 h1:H4GSortZKs6bm2ylvVRHRHCwjZ+NE90P2OhYikjJwvI=
github.com/apache/rocketmq-client-go/v2 v2.0.0-rc2/go.mod h1:oEZKFDvS7sz/RWU0839+dQBupazyBV7WX5cP6nrio0Q=
github.com/apache/rocketmq-client-go/v2 v2.0.0 h1:D6jFj3DcNjWyjWn5N/R7Eq8v5kLqlgkFnT/DNQFnWlM=
github.com/apache/rocketmq-client-go/v2 v2.0.0/go.mod h1:oEZKFDvS7sz/RWU0839+dQBupazyBV7WX5cP6nrio0Q=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down Expand Up @@ -103,8 +101,6 @@ github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5Xh
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/douyu/jupiter v0.2.4 h1:ILH3/46vu+wSi5ebl6Bef89ftVkzIkK6JYPXFjQxnkQ=
github.com/douyu/jupiter v0.2.4/go.mod h1:IGPvr5ozl9QUrOYy4oE9kkYQ39dBOqeib5yzCJSme1g=
github.com/douyu/jupiter v0.2.5-0.20200831145748-71aed348d32e h1:9Z892brOatPMF3fu5oRxRa390J3LMwoG4ygAlJMmD+M=
github.com/douyu/jupiter v0.2.5-0.20200831145748-71aed348d32e/go.mod h1:jbGsskNQyYcxDDhHl0VXeU00LiEjAfJfY6DT/kaSQ0U=
github.com/dubbogo/getty v1.3.2/go.mod h1:ANbVQ9tbpZ2b0xdR8nRrgS/oXIsZAeRxzvPSOn/7mbk=
github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
Expand Down Expand Up @@ -142,7 +138,6 @@ github.com/gin-gonic/gin v1.3.0/go.mod h1:7cKuhb5qV2ggCFctp2fJQ+ErvciLZrIeoOSOm6
github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM=
github.com/gin-gonic/gin v1.5.0/go.mod h1:Nd6IXA8m5kNZdNEHMBd93KT+mdY3+bewLgRvmCsR2Do=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q=
github.com/go-check/check v0.0.0-20180628173108-788fd7840127/go.mod h1:9ES+weclKsC9YodN5RgxqK/VD9HM9JsCSh7rNhMZE98=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -255,7 +250,6 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5 h1:UImYN5qQ8tuGpGE16ZmjvcTtTw24zw1QAp/SlnNrZhI=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.6/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/hashicorp/consul v1.5.3/go.mod h1:61E2GJCPEP3oq8La7sfDdWGQ66+Zbxzw5ecOdFD7xIE=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
Expand Down Expand Up @@ -488,7 +482,6 @@ github.com/smallnest/weighted v0.0.0-20200122032019-adf21c9b8bd1/go.mod h1:xc9Co
github.com/smartystreets/assertions v0.0.0-20180820201707-7c9eb446e3cf/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 h1:N8Bg45zpk/UcpNGnfJt2y/3lRWASHNTUET8owPYCgYI=
Expand Down Expand Up @@ -685,8 +678,6 @@ golang.org/x/sys v0.0.0-20200805065543-0cf7623e9dbd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
Expand Down
13 changes: 13 additions & 0 deletions pkg/core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Engine struct {
supervisorScanner *supervisor.Scanner
systemdScanner *systemd.Scanner
nginxScanner *nginx.ConfScanner
worker *job.Worker
}

// NewEngine new the engine
Expand Down Expand Up @@ -90,6 +91,12 @@ func NewEngine() *Engine {
); err != nil {
xlog.Panic("new engine", xlog.Any("err", err))
}

err := eng.RegisterHooks(jupiter.StageAfterStop, eng.cleanJobs)
if err != nil {
xlog.Panicf("register hook failed. err=%s", err.Error())
}

return eng
}

Expand Down Expand Up @@ -267,6 +274,7 @@ func (eng *Engine) startHealCheck() error {

func (eng *Engine) startWorker() error {
worker := job.StdConfig("worker").Build()
eng.worker = worker
return worker.Run()
}

Expand Down Expand Up @@ -372,3 +380,8 @@ func (eng *Engine) checkServiceNodes() {
}
}
}

func (eng *Engine) cleanJobs() error {
eng.worker.CleanJobs()
return nil
}
2 changes: 1 addition & 1 deletion pkg/job/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func StdConfig(key string) *Config {
}

// Build new a instance
func (c *Config) Build() *worker {
func (c *Config) Build() *Worker {
c.HostName = report.ReturnHostName()
c.AppIP = report.ReturnAppIp()

Expand Down
14 changes: 7 additions & 7 deletions pkg/job/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ func (wl *wrappedLogger) Error(err error, msg string, keysAndValues ...interface

// Cron ...
type Cron struct {
*worker
*Worker
*cron.Cron
entries map[string]EntryID
}

func newCron(config *worker) *Cron {
func newCron(config *Worker) *Cron {
c := &Cron{
worker: config,
Worker: config,
Cron: cron.New(
cron.WithLogger(&wrappedLogger{config.logger}),
cron.WithChain(config.wrappers...),
Expand All @@ -91,15 +91,15 @@ func (c *Cron) Schedule(schedule Schedule, job NamedJob) EntryID {
}
innnerJob := &wrappedJob{
NamedJob: job,
logger: c.worker.logger,
logger: c.Worker.logger,
}

return c.Cron.Schedule(schedule, innnerJob)
}

// AddJob ...
func (c *Cron) AddJob(spec string, cmd NamedJob) (EntryID, error) {
schedule, err := c.worker.parser.Parse(spec)
schedule, err := c.Worker.parser.Parse(spec)
if err != nil {
return 0, err
}
Expand All @@ -118,7 +118,7 @@ func (c *Cron) Remove(id EntryID) {

// Run ...
func (c *Cron) Run() {
c.worker.logger.Info("run worker", xlog.Int("number of scheduled jobs", len(c.Cron.Entries())))
c.Worker.logger.Info("run Worker", xlog.Int("number of scheduled jobs", len(c.Cron.Entries())))
c.Cron.Start()
}

Expand Down Expand Up @@ -170,7 +170,7 @@ func (wj wrappedJob) run() (err error) {
}
if err != nil {
fields = append(fields, xlog.String("err", err.Error()), xlog.Duration("cost", time.Since(beg)))
wj.logger.Error("worker", fields...)
wj.logger.Error("Worker", fields...)
}
}()

Expand Down
33 changes: 6 additions & 27 deletions pkg/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (
"strings"
"time"

"github.com/coreos/etcd/clientv3/concurrency"
"github.com/douyu/jupiter/pkg/client/etcdv3"
"github.com/douyu/jupiter/pkg/xlog"
"go.uber.org/zap"
)

func init() {
Expand Down Expand Up @@ -60,14 +58,14 @@ type Job struct {
hostname string

// 用于访问etcd
*worker `json:"-"`
*Worker `json:"-"`

mutex *etcdv3.Mutex
locked bool
}

// NewEtcdTimeoutContext return a new etcdTimeoutContext
func NewEtcdTimeoutContext(w *worker) (context.Context, context.CancelFunc) {
func NewEtcdTimeoutContext(w *Worker) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), time.Duration(w.ReqTimeout)*time.Second)
}

Expand Down Expand Up @@ -209,27 +207,19 @@ func (j *Job) ValidRules() error {

func (j *Job) Lock() error {
var err error
j.mutex, err = j.Client.NewMutex(LockKeyPrefix+j.ID, concurrency.WithTTL(10))
err = j.mutex.Lock(time.Second)
if err != nil {
return err
}

err = j.mutex.Lock(3 * time.Second)
if err != nil {
return err
}

j.locked = true

return nil
}

func (j *Job) Unlock() {
if j.mutex != nil {
err := j.mutex.Unlock()
if err != nil {
xlog.Error("unlock failed", xlog.FieldErr(err))
}
err := j.mutex.Unlock()
if err != nil {
xlog.Error("unlock failed", xlog.FieldErr(err))
}
}

Expand Down Expand Up @@ -260,17 +250,6 @@ func (rule *Timer) Valid() error {
return nil
}

func NewLogger() (*zap.Logger, error) {
cfg := zap.NewProductionConfig()
logFile := GetCurrentDirectory() + "/log"
os.MkdirAll(logFile, os.ModePerm)

cfg.OutputPaths = []string{
logFile + "/job.log",
}
return cfg.Build()
}

func GetCurrentDirectory() string {
dir, err := filepath.Abs(filepath.Dir(os.Args[0])) //返回绝对路径 filepath.Dir(os.Args[0])去除最后一个元素的路径
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/job/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewTask(job *Job, ops ...TaskOption) *Task {
op(task)
}
if task.TaskID == 0 {
id, _ := job.worker.taskIdGen.NextID()
id, _ := job.Worker.taskIdGen.NextID()
task.TaskID = id
}

Expand Down
Loading

0 comments on commit a75cf1b

Please sign in to comment.