From c60cda09b43f8d1b48a33c3c8a78da523784151d Mon Sep 17 00:00:00 2001 From: Moshe Immerman Date: Fri, 26 Jan 2024 16:21:43 +0200 Subject: [PATCH] chore: job/history and fixes --- context/properties.go | 4 +++ db.go | 5 +++ go.mod | 2 +- go.sum | 4 +-- gorm/logger.go | 71 ++++++------------------------------------- hack/migrate/go.mod | 2 +- hack/migrate/go.sum | 4 +-- job/job.go | 20 ++++++------ models/job_history.go | 25 ++++++++++++--- tests/setup/common.go | 17 ++--------- 10 files changed, 59 insertions(+), 95 deletions(-) diff --git a/context/properties.go b/context/properties.go index fbea1751..71d19ef3 100644 --- a/context/properties.go +++ b/context/properties.go @@ -58,6 +58,10 @@ func (k Context) Properties() Properties { return props } +func SetLocalProperty(ctx Context, property, value string) { + Local[property] = value +} + func UpdateProperty(ctx Context, key, value string) error { query := "INSERT INTO properties (name, value) VALUES (?,?) ON CONFLICT (name) DO UPDATE SET value = excluded.value" logger.Debugf("Updated property %s = %s", key, value) diff --git a/db.go b/db.go index af4e211b..54a795da 100644 --- a/db.go +++ b/db.go @@ -3,6 +3,7 @@ package duty import ( "context" "database/sql" + "flag" "fmt" "net/url" "time" @@ -45,6 +46,10 @@ func BindFlags(flags *pflag.FlagSet) { flags.StringVar(&LogLevel, "db-log-level", "error", "Set gorm logging level. trace, debug & info") } +func BindGoFlags() { + flag.StringVar(&LogLevel, "db-log-level", "error", "Set gorm logging level. trace, debug & info") +} + func DefaultGormConfig() *gorm.Config { return &gorm.Config{ FullSaveAssociations: true, diff --git a/go.mod b/go.mod index 6afdd623..5dcf8413 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/asecurityteam/rolling v2.0.4+incompatible github.com/exaring/otelpgx v0.5.2 github.com/fergusstrange/embedded-postgres v1.25.0 - github.com/flanksource/commons v1.21.1 + github.com/flanksource/commons v1.21.2 github.com/flanksource/gomplate/v3 v3.20.29 github.com/flanksource/kommons v0.31.4 github.com/flanksource/postq v0.1.3 diff --git a/go.sum b/go.sum index 257545e9..1adbe41d 100644 --- a/go.sum +++ b/go.sum @@ -762,8 +762,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fergusstrange/embedded-postgres v1.25.0 h1:sa+k2Ycrtz40eCRPOzI7Ry7TtkWXXJ+YRsxpKMDhxK0= github.com/fergusstrange/embedded-postgres v1.25.0/go.mod h1:t/MLs0h9ukYM6FSt99R7InCHs1nW0ordoVCcnzmpTYw= -github.com/flanksource/commons v1.21.1 h1:vZw21pM95hYd/YLXDdvtH/gb1PUZPMWLpRyk/N7eykw= -github.com/flanksource/commons v1.21.1/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY= +github.com/flanksource/commons v1.21.2 h1:Z6ZGeBkTenucalfPLGEnq5qMB6uwSfH3ycBOGutRcfg= +github.com/flanksource/commons v1.21.2/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.29 h1:hDrNw1JaQk+gmhSCvqng+nebYAt3a5afhf/Vdmr4CTs= github.com/flanksource/gomplate/v3 v3.20.29/go.mod h1:GKmptFMdr2LbOuqwQZrmo9a/UygyZ0pbXffks8MuYhE= diff --git a/gorm/logger.go b/gorm/logger.go index cb73fca4..67189e08 100644 --- a/gorm/logger.go +++ b/gorm/logger.go @@ -1,25 +1,16 @@ package duty import ( - "context" - "errors" "log" "os" "time" - "github.com/flanksource/commons/logger" gLogger "gorm.io/gorm/logger" ) const Debug = "debug" const Trace = "trace" -type gormLogger struct { - logger logger.Logger - SlowThreshold time.Duration - IgnoreRecordNotFoundError bool -} - func NewGormLogger(level string) gLogger.Interface { if level == Trace { return gLogger.New( @@ -47,56 +38,14 @@ func NewGormLogger(level string) gLogger.Interface { ) } - currentGormLogger := logger.StandardLogger().Named("db") - - switch level { - case "trace": - currentGormLogger.SetLogLevel(2) - case "debug": - currentGormLogger.SetLogLevel(1) - default: - currentGormLogger.SetLogLevel(0) - } - - return &gormLogger{ - SlowThreshold: time.Second, - logger: currentGormLogger, - } -} - -// Pass the log level directly to NewGormLogger -func (t *gormLogger) LogMode(level gLogger.LogLevel) gLogger.Interface { - // not applicable since the mapping of gorm's loglevel to common's logger's log level - // doesn't work out well. - return t -} - -func (l *gormLogger) Info(ctx context.Context, msg string, data ...interface{}) { - l.logger.Infof(msg, data) -} - -func (l *gormLogger) Warn(ctx context.Context, msg string, data ...interface{}) { - l.logger.Warnf(msg, data) -} - -func (l *gormLogger) Error(ctx context.Context, msg string, data ...interface{}) { - l.logger.Errorf(msg, data) -} - -func (l *gormLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) { - if !l.logger.IsTraceEnabled() { - return - } - - elapsed := time.Since(begin) - sql, rows := fc() - - switch { - case err != nil && (!errors.Is(err, gLogger.ErrRecordNotFound) || !l.IgnoreRecordNotFoundError): - l.logger.WithValues("elapsed", elapsed).WithValues("rows", rows).Errorf(sql) - case elapsed > l.SlowThreshold && l.SlowThreshold != 0: - l.logger.WithValues("elapsed", elapsed).WithValues("slow SQL", l.SlowThreshold).WithValues("rows", rows).Warnf(sql) - default: - l.logger.WithValues("elapsed", elapsed).WithValues("rows", rows).Infof(sql) - } + return gLogger.New( + log.New(os.Stdout, "\r\n", log.Ldate|log.Ltime|log.Lshortfile), // io writer + gLogger.Config{ + SlowThreshold: time.Second, // Slow SQL threshold + LogLevel: gLogger.Warn, // Log level + IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger + ParameterizedQueries: false, // Don't include params in the SQL log + Colorful: true, // Disable color + }, + ) } diff --git a/hack/migrate/go.mod b/hack/migrate/go.mod index bae12c1e..be0db139 100644 --- a/hack/migrate/go.mod +++ b/hack/migrate/go.mod @@ -3,7 +3,7 @@ module github.com/flanksource/duty/hack/migrate go 1.20 require ( - github.com/flanksource/commons v1.21.1 + github.com/flanksource/commons v1.21.2 github.com/flanksource/duty v1.0.180 github.com/spf13/cobra v1.7.0 ) diff --git a/hack/migrate/go.sum b/hack/migrate/go.sum index fb74e703..f27b0cab 100644 --- a/hack/migrate/go.sum +++ b/hack/migrate/go.sum @@ -761,8 +761,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flanksource/commons v1.21.1 h1:vZw21pM95hYd/YLXDdvtH/gb1PUZPMWLpRyk/N7eykw= -github.com/flanksource/commons v1.21.1/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY= +github.com/flanksource/commons v1.21.2 h1:Z6ZGeBkTenucalfPLGEnq5qMB6uwSfH3ycBOGutRcfg= +github.com/flanksource/commons v1.21.2/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY= github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc= github.com/flanksource/gomplate/v3 v3.20.29 h1:hDrNw1JaQk+gmhSCvqng+nebYAt3a5afhf/Vdmr4CTs= github.com/flanksource/gomplate/v3 v3.20.29/go.mod h1:GKmptFMdr2LbOuqwQZrmo9a/UygyZ0pbXffks8MuYhE= diff --git a/job/job.go b/job/job.go index 122a3f39..4817bba3 100644 --- a/job/job.go +++ b/job/job.go @@ -90,7 +90,7 @@ func (j *JobRuntime) ID() string { func (j *JobRuntime) start() { j.Tracef("starting") - j.History = models.NewJobHistory(j.Job.Name, "", "").Start() + j.History = models.NewJobHistory(j.Logger, j.Job.Name, "", "").Start() j.Job.LastJob = j.History if j.Job.ResourceID != "" { j.History.ResourceID = j.Job.ResourceID @@ -116,10 +116,10 @@ func (j *JobRuntime) end() { func (j *JobRuntime) Failf(message string, args ...interface{}) { err := fmt.Sprintf(message, args...) - j.Debugf(err) + j.Logger.WithSkipReportLevel(1).Debugf(err) j.Span.SetStatus(codes.Error, err) if j.History != nil { - j.History.AddError(err) + j.History.AddErrorWithSkipReportLevel(err, 1) } } @@ -166,7 +166,7 @@ func (j *Job) SetID(id string) *Job { } func (j *Job) cleanupHistory() int { - j.Context.Tracef("running cleanup: %v", j.Retention) + j.Context.Logger.V(4).Infof("running cleanup: %v", j.Retention) db := j.Context.WithDBLogLevel("warn").DB() if err := db.Exec("DELETE FROM job_history WHERE name = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.Retention.Age.Minutes()).Error; err != nil { j.Context.Warnf("failed to cleanup history %v", err) @@ -199,7 +199,7 @@ func (j *Job) cleanupHistory() int { j.Context.Warnf("failed to cleanup history: %v", tx.Error) } } - j.Context.Tracef("cleaned up %d records", count) + j.Context.Logger.V(3).Infof("cleaned up %d records", count) return count } @@ -226,14 +226,14 @@ func (j *Job) Run() { r.start() defer r.end() if j.Singleton { - ctx.Tracef("%s acquiring lock", r.ID()) + ctx.Logger.V(4).Infof("acquiring lock") if j.lock == nil { j.lock = &sync.Mutex{} } if !j.lock.TryLock() { r.History.Status = models.StatusAborted - ctx.Tracef("%s failed to acquire lock", r.ID()) + ctx.Tracef("failed to acquire lock") r.Failf("%s concurrent job aborted", r.ID()) return } @@ -247,10 +247,9 @@ func (j *Job) Run() { } err := j.Fn(r) - ctx.Tracef("%s finished duration=%s, error=%s", r.ID(), time.Since(r.History.TimeStart), err) + ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err) if err != nil { - ctx.Error(err) - r.History.AddError(err.Error()) + r.History.AddErrorWithSkipReportLevel(err.Error(), 1) } } @@ -362,6 +361,7 @@ func (j *Job) String() string { func (j *Job) AddToScheduler(cronRunner *cron.Cron) error { j.init() + cronRunner.Start() schedule := j.Schedule if override, ok := getProperty(j, j.Context.Properties(), "schedule"); ok { schedule = override diff --git a/models/job_history.go b/models/job_history.go index edfb59fb..8f529e09 100644 --- a/models/job_history.go +++ b/models/job_history.go @@ -36,7 +36,8 @@ type JobHistory struct { Status string TimeStart time.Time TimeEnd *time.Time - Errors []string `gorm:"-"` + Errors []string `gorm:"-"` + Logger logger.Logger `gorm:"-"` } func (j JobHistory) AsError() error { @@ -50,11 +51,12 @@ func (j JobHistory) TableName() string { return "job_history" } -func NewJobHistory(name, resourceType, resourceID string) *JobHistory { +func NewJobHistory(log logger.Logger, name, resourceType, resourceID string) *JobHistory { return &JobHistory{ Name: name, ResourceType: resourceType, ResourceID: resourceID, + Logger: log, } } @@ -82,7 +84,13 @@ func (h *JobHistory) Persist(db *gorm.DB) error { } func (h *JobHistory) AddErrorf(msg string, args ...interface{}) *JobHistory { - return h.AddError(fmt.Sprintf(msg, args...)) + err := fmt.Sprintf(msg, args...) + h.ErrorCount += 1 + if err != "" { + h.Errors = append(h.Errors, err) + } + h.Logger.WithSkipReportLevel(1).Errorf("%s %s", h, err) + return h } func (h *JobHistory) AddError(err string) *JobHistory { @@ -90,7 +98,16 @@ func (h *JobHistory) AddError(err string) *JobHistory { if err != "" { h.Errors = append(h.Errors, err) } - logger.StandardLogger().WithSkipReportLevel(1).Errorf("%s %s", h, err) + h.Logger.WithSkipReportLevel(1).Errorf("%s %s", h, err) + return h +} + +func (h *JobHistory) AddErrorWithSkipReportLevel(err string, level int) *JobHistory { + h.ErrorCount += 1 + if err != "" { + h.Errors = append(h.Errors, err) + } + h.Logger.WithSkipReportLevel(level).Errorf("%s %s", h, err) return h } diff --git a/tests/setup/common.go b/tests/setup/common.go index fa51f4a0..c54467ed 100644 --- a/tests/setup/common.go +++ b/tests/setup/common.go @@ -2,7 +2,6 @@ package setup import ( "database/sql" - "flag" "fmt" "net" "net/http" @@ -38,8 +37,8 @@ var trace bool var dbTrace bool func init() { - flag.BoolVar(&trace, "trace", false, "Use trace level logging") - flag.BoolVar(&dbTrace, "db-trace", false, "DB trace") + logger.BindGoFlags() + duty.BindGoFlags() } func execPostgres(connection, query string) error { @@ -64,6 +63,7 @@ func MustDB() *sql.DB { var WithoutDummyData = "without_dummy_data" func BeforeSuiteFn(args ...interface{}) context.Context { + logger.UseZap() var err error importDummyData := true @@ -145,21 +145,10 @@ func BeforeSuiteFn(args ...interface{}) context.Context { if trace { DefaultContext = DefaultContext.WithTrace() } - logger.StandardLogger().SetLogLevel(2) return DefaultContext } func AfterSuiteFn() { - // logger.Infof("Deleting dummy data") - logger.StandardLogger().SetLogLevel(0) - // testDB, err := duty.NewGorm(PgUrl, duty.DefaultGormConfig()) - // if err != nil { - // ginkgo.Fail(err.Error()) - // } - // if err := dummyData.Delete(testDB); err != nil { - // ginkgo.Fail(err.Error()) - // } - if os.Getenv("DUTY_DB_URL") == "" { logger.Infof("Stopping postgres") if err := postgresServer.Stop(); err != nil {