Skip to content

Commit

Permalink
chore: job/history and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop authored Jan 26, 2024
1 parent 4a88f23 commit c60cda0
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 95 deletions.
4 changes: 4 additions & 0 deletions context/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ func (k Context) Properties() Properties {
return props
}

func SetLocalProperty(ctx Context, property, value string) {
Local[property] = value
}

func UpdateProperty(ctx Context, key, value string) error {
query := "INSERT INTO properties (name, value) VALUES (?,?) ON CONFLICT (name) DO UPDATE SET value = excluded.value"
logger.Debugf("Updated property %s = %s", key, value)
Expand Down
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package duty
import (
"context"
"database/sql"
"flag"
"fmt"
"net/url"
"time"
Expand Down Expand Up @@ -45,6 +46,10 @@ func BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&LogLevel, "db-log-level", "error", "Set gorm logging level. trace, debug & info")
}

func BindGoFlags() {
flag.StringVar(&LogLevel, "db-log-level", "error", "Set gorm logging level. trace, debug & info")
}

func DefaultGormConfig() *gorm.Config {
return &gorm.Config{
FullSaveAssociations: true,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/asecurityteam/rolling v2.0.4+incompatible
github.com/exaring/otelpgx v0.5.2
github.com/fergusstrange/embedded-postgres v1.25.0
github.com/flanksource/commons v1.21.1
github.com/flanksource/commons v1.21.2
github.com/flanksource/gomplate/v3 v3.20.29
github.com/flanksource/kommons v0.31.4
github.com/flanksource/postq v0.1.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,8 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fergusstrange/embedded-postgres v1.25.0 h1:sa+k2Ycrtz40eCRPOzI7Ry7TtkWXXJ+YRsxpKMDhxK0=
github.com/fergusstrange/embedded-postgres v1.25.0/go.mod h1:t/MLs0h9ukYM6FSt99R7InCHs1nW0ordoVCcnzmpTYw=
github.com/flanksource/commons v1.21.1 h1:vZw21pM95hYd/YLXDdvtH/gb1PUZPMWLpRyk/N7eykw=
github.com/flanksource/commons v1.21.1/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY=
github.com/flanksource/commons v1.21.2 h1:Z6ZGeBkTenucalfPLGEnq5qMB6uwSfH3ycBOGutRcfg=
github.com/flanksource/commons v1.21.2/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.20.29 h1:hDrNw1JaQk+gmhSCvqng+nebYAt3a5afhf/Vdmr4CTs=
github.com/flanksource/gomplate/v3 v3.20.29/go.mod h1:GKmptFMdr2LbOuqwQZrmo9a/UygyZ0pbXffks8MuYhE=
Expand Down
71 changes: 10 additions & 61 deletions gorm/logger.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
package duty

import (
"context"
"errors"
"log"
"os"
"time"

"github.com/flanksource/commons/logger"
gLogger "gorm.io/gorm/logger"
)

const Debug = "debug"
const Trace = "trace"

type gormLogger struct {
logger logger.Logger
SlowThreshold time.Duration
IgnoreRecordNotFoundError bool
}

func NewGormLogger(level string) gLogger.Interface {
if level == Trace {
return gLogger.New(
Expand Down Expand Up @@ -47,56 +38,14 @@ func NewGormLogger(level string) gLogger.Interface {
)
}

currentGormLogger := logger.StandardLogger().Named("db")

switch level {
case "trace":
currentGormLogger.SetLogLevel(2)
case "debug":
currentGormLogger.SetLogLevel(1)
default:
currentGormLogger.SetLogLevel(0)
}

return &gormLogger{
SlowThreshold: time.Second,
logger: currentGormLogger,
}
}

// Pass the log level directly to NewGormLogger
func (t *gormLogger) LogMode(level gLogger.LogLevel) gLogger.Interface {
// not applicable since the mapping of gorm's loglevel to common's logger's log level
// doesn't work out well.
return t
}

func (l *gormLogger) Info(ctx context.Context, msg string, data ...interface{}) {
l.logger.Infof(msg, data)
}

func (l *gormLogger) Warn(ctx context.Context, msg string, data ...interface{}) {
l.logger.Warnf(msg, data)
}

func (l *gormLogger) Error(ctx context.Context, msg string, data ...interface{}) {
l.logger.Errorf(msg, data)
}

func (l *gormLogger) Trace(ctx context.Context, begin time.Time, fc func() (sql string, rowsAffected int64), err error) {
if !l.logger.IsTraceEnabled() {
return
}

elapsed := time.Since(begin)
sql, rows := fc()

switch {
case err != nil && (!errors.Is(err, gLogger.ErrRecordNotFound) || !l.IgnoreRecordNotFoundError):
l.logger.WithValues("elapsed", elapsed).WithValues("rows", rows).Errorf(sql)
case elapsed > l.SlowThreshold && l.SlowThreshold != 0:
l.logger.WithValues("elapsed", elapsed).WithValues("slow SQL", l.SlowThreshold).WithValues("rows", rows).Warnf(sql)
default:
l.logger.WithValues("elapsed", elapsed).WithValues("rows", rows).Infof(sql)
}
return gLogger.New(
log.New(os.Stdout, "\r\n", log.Ldate|log.Ltime|log.Lshortfile), // io writer
gLogger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: gLogger.Warn, // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
ParameterizedQueries: false, // Don't include params in the SQL log
Colorful: true, // Disable color
},
)
}
2 changes: 1 addition & 1 deletion hack/migrate/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/flanksource/duty/hack/migrate
go 1.20

require (
github.com/flanksource/commons v1.21.1
github.com/flanksource/commons v1.21.2
github.com/flanksource/duty v1.0.180
github.com/spf13/cobra v1.7.0
)
Expand Down
4 changes: 2 additions & 2 deletions hack/migrate/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,8 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flanksource/commons v1.21.1 h1:vZw21pM95hYd/YLXDdvtH/gb1PUZPMWLpRyk/N7eykw=
github.com/flanksource/commons v1.21.1/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY=
github.com/flanksource/commons v1.21.2 h1:Z6ZGeBkTenucalfPLGEnq5qMB6uwSfH3ycBOGutRcfg=
github.com/flanksource/commons v1.21.2/go.mod h1:GD5+yGvmYFPIW3WMNN+y1JkeDMJY74e05pQAsRbrvwY=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.20.29 h1:hDrNw1JaQk+gmhSCvqng+nebYAt3a5afhf/Vdmr4CTs=
github.com/flanksource/gomplate/v3 v3.20.29/go.mod h1:GKmptFMdr2LbOuqwQZrmo9a/UygyZ0pbXffks8MuYhE=
Expand Down
20 changes: 10 additions & 10 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (j *JobRuntime) ID() string {

func (j *JobRuntime) start() {
j.Tracef("starting")
j.History = models.NewJobHistory(j.Job.Name, "", "").Start()
j.History = models.NewJobHistory(j.Logger, j.Job.Name, "", "").Start()
j.Job.LastJob = j.History
if j.Job.ResourceID != "" {
j.History.ResourceID = j.Job.ResourceID
Expand All @@ -116,10 +116,10 @@ func (j *JobRuntime) end() {

func (j *JobRuntime) Failf(message string, args ...interface{}) {
err := fmt.Sprintf(message, args...)
j.Debugf(err)
j.Logger.WithSkipReportLevel(1).Debugf(err)
j.Span.SetStatus(codes.Error, err)
if j.History != nil {
j.History.AddError(err)
j.History.AddErrorWithSkipReportLevel(err, 1)
}
}

Expand Down Expand Up @@ -166,7 +166,7 @@ func (j *Job) SetID(id string) *Job {
}

func (j *Job) cleanupHistory() int {
j.Context.Tracef("running cleanup: %v", j.Retention)
j.Context.Logger.V(4).Infof("running cleanup: %v", j.Retention)
db := j.Context.WithDBLogLevel("warn").DB()
if err := db.Exec("DELETE FROM job_history WHERE name = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.Retention.Age.Minutes()).Error; err != nil {
j.Context.Warnf("failed to cleanup history %v", err)
Expand Down Expand Up @@ -199,7 +199,7 @@ func (j *Job) cleanupHistory() int {
j.Context.Warnf("failed to cleanup history: %v", tx.Error)
}
}
j.Context.Tracef("cleaned up %d records", count)
j.Context.Logger.V(3).Infof("cleaned up %d records", count)
return count
}

Expand All @@ -226,14 +226,14 @@ func (j *Job) Run() {
r.start()
defer r.end()
if j.Singleton {
ctx.Tracef("%s acquiring lock", r.ID())
ctx.Logger.V(4).Infof("acquiring lock")

if j.lock == nil {
j.lock = &sync.Mutex{}
}
if !j.lock.TryLock() {
r.History.Status = models.StatusAborted
ctx.Tracef("%s failed to acquire lock", r.ID())
ctx.Tracef("failed to acquire lock")
r.Failf("%s concurrent job aborted", r.ID())
return
}
Expand All @@ -247,10 +247,9 @@ func (j *Job) Run() {
}

err := j.Fn(r)
ctx.Tracef("%s finished duration=%s, error=%s", r.ID(), time.Since(r.History.TimeStart), err)
ctx.Tracef("finished duration=%s, error=%s", time.Since(r.History.TimeStart), err)
if err != nil {
ctx.Error(err)
r.History.AddError(err.Error())
r.History.AddErrorWithSkipReportLevel(err.Error(), 1)
}
}

Expand Down Expand Up @@ -362,6 +361,7 @@ func (j *Job) String() string {

func (j *Job) AddToScheduler(cronRunner *cron.Cron) error {
j.init()
cronRunner.Start()
schedule := j.Schedule
if override, ok := getProperty(j, j.Context.Properties(), "schedule"); ok {
schedule = override
Expand Down
25 changes: 21 additions & 4 deletions models/job_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type JobHistory struct {
Status string
TimeStart time.Time
TimeEnd *time.Time
Errors []string `gorm:"-"`
Errors []string `gorm:"-"`
Logger logger.Logger `gorm:"-"`
}

func (j JobHistory) AsError() error {
Expand All @@ -50,11 +51,12 @@ func (j JobHistory) TableName() string {
return "job_history"
}

func NewJobHistory(name, resourceType, resourceID string) *JobHistory {
func NewJobHistory(log logger.Logger, name, resourceType, resourceID string) *JobHistory {
return &JobHistory{
Name: name,
ResourceType: resourceType,
ResourceID: resourceID,
Logger: log,
}
}

Expand Down Expand Up @@ -82,15 +84,30 @@ func (h *JobHistory) Persist(db *gorm.DB) error {
}

func (h *JobHistory) AddErrorf(msg string, args ...interface{}) *JobHistory {
return h.AddError(fmt.Sprintf(msg, args...))
err := fmt.Sprintf(msg, args...)
h.ErrorCount += 1
if err != "" {
h.Errors = append(h.Errors, err)
}
h.Logger.WithSkipReportLevel(1).Errorf("%s %s", h, err)
return h
}

func (h *JobHistory) AddError(err string) *JobHistory {
h.ErrorCount += 1
if err != "" {
h.Errors = append(h.Errors, err)
}
logger.StandardLogger().WithSkipReportLevel(1).Errorf("%s %s", h, err)
h.Logger.WithSkipReportLevel(1).Errorf("%s %s", h, err)
return h
}

func (h *JobHistory) AddErrorWithSkipReportLevel(err string, level int) *JobHistory {
h.ErrorCount += 1
if err != "" {
h.Errors = append(h.Errors, err)
}
h.Logger.WithSkipReportLevel(level).Errorf("%s %s", h, err)
return h
}

Expand Down
17 changes: 3 additions & 14 deletions tests/setup/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package setup

import (
"database/sql"
"flag"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -38,8 +37,8 @@ var trace bool
var dbTrace bool

func init() {
flag.BoolVar(&trace, "trace", false, "Use trace level logging")
flag.BoolVar(&dbTrace, "db-trace", false, "DB trace")
logger.BindGoFlags()
duty.BindGoFlags()
}

func execPostgres(connection, query string) error {
Expand All @@ -64,6 +63,7 @@ func MustDB() *sql.DB {
var WithoutDummyData = "without_dummy_data"

func BeforeSuiteFn(args ...interface{}) context.Context {
logger.UseZap()
var err error
importDummyData := true

Expand Down Expand Up @@ -145,21 +145,10 @@ func BeforeSuiteFn(args ...interface{}) context.Context {
if trace {
DefaultContext = DefaultContext.WithTrace()
}
logger.StandardLogger().SetLogLevel(2)
return DefaultContext
}

func AfterSuiteFn() {
// logger.Infof("Deleting dummy data")
logger.StandardLogger().SetLogLevel(0)
// testDB, err := duty.NewGorm(PgUrl, duty.DefaultGormConfig())
// if err != nil {
// ginkgo.Fail(err.Error())
// }
// if err := dummyData.Delete(testDB); err != nil {
// ginkgo.Fail(err.Error())
// }

if os.Getenv("DUTY_DB_URL") == "" {
logger.Infof("Stopping postgres")
if err := postgresServer.Stop(); err != nil {
Expand Down

0 comments on commit c60cda0

Please sign in to comment.