Skip to content

Commit

Permalink
chore: job context improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
moshloop committed Jan 25, 2024
1 parent 986996b commit 0e96188
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 46 deletions.
137 changes: 92 additions & 45 deletions job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/google/uuid"
Expand Down Expand Up @@ -47,21 +46,23 @@ var Retention3Day = Retention{

type Job struct {
context.Context
Name string
Schedule string
Singleton bool
Debug, Trace bool
Timeout time.Duration
Fn func(ctx JobRuntime) error
JobHistory bool
RunNow bool
ID string
entryID *cron.EntryID
lock *sync.Mutex
lastRun *time.Time
Retention Retention
initialized bool
unschedule func()
Name string
Schedule string
Singleton bool
Debug, Trace bool
Timeout time.Duration
Fn func(ctx JobRuntime) error
JobHistory bool
RunNow bool
ID string
ResourceID, ResourceType string
entryID *cron.EntryID
lock *sync.Mutex
lastRun *time.Time
Retention Retention
LastJob *models.JobHistory
initialized bool
unschedule func()
}

type Retention struct {
Expand All @@ -88,12 +89,18 @@ func (j *JobRuntime) ID() string {
}

func (j *JobRuntime) start() {
j.Context.Debugf("%s starting", j.ID())
j.Tracef("starting")
j.History = models.NewJobHistory(j.Job.Name, "", "").Start()

j.Job.LastJob = j.History
if j.Job.ResourceID != "" {
j.History.ResourceID = j.Job.ResourceID
}
if j.Job.ResourceType != "" {
j.History.ResourceType = j.Job.ResourceType
}
if j.Job.JobHistory && j.Job.Retention.Success > 0 {
if err := j.History.Persist(j.DB()); err != nil {
logger.Warnf("%s failed to persist history: %v", j.ID(), err)
j.Warnf("failed to persist history: %v", err)
}
}
}
Expand All @@ -102,14 +109,14 @@ func (j *JobRuntime) end() {
j.History.End()
if j.Job.JobHistory && (j.Job.Retention.Success > 0 || len(j.History.Errors) > 0) {
if err := j.History.Persist(j.DB()); err != nil {
logger.Warnf("%s failed to persist history: %v", j.ID(), err)
j.Warnf("failed to persist history: %v", err)
}
}
}

func (j *JobRuntime) Failf(message string, args ...interface{}) {
err := fmt.Sprintf(message, args...)
j.Context.Debugf(err)
j.Debugf(err)
j.Span.SetStatus(codes.Error, err)
if j.History != nil {
j.History.AddError(err)
Expand Down Expand Up @@ -159,13 +166,12 @@ func (j *Job) SetID(id string) *Job {
}

func (j *Job) cleanupHistory() int {
j.Context.Debugf("[%s] running cleanup: %v", j.Name, j.Retention)
db := j.Context.DB()
j.Context.Tracef("running cleanup: %v", j.Retention)
db := j.Context.WithDBLogLevel("warn").DB()
if err := db.Exec("DELETE FROM job_history WHERE name = ? AND now() - created_at > interval '1 minute' * ?", j.Name, j.Retention.Age.Minutes()).Error; err != nil {
logger.Warnf("Failed to cleanup history for : %s", j.Name)
j.Context.Warnf("failed to cleanup history %v", err)
}
query := `
WITH ordered_history AS (
query := `WITH ordered_history AS (
SELECT
id,
status,
Expand All @@ -190,10 +196,10 @@ func (j *Job) cleanupHistory() int {
tx := db.Exec(query, j.Name, r.statuses, r.count)
count += int(tx.RowsAffected)
if tx.Error != nil {
logger.Warnf("Failed to cleanup history for %s: %v", j.Name, tx.Error)
j.Context.Warnf("failed to cleanup history: %v", tx.Error)
}
}
j.Context.Debugf("[%s] cleaned up %d records", j.Name, count)
j.Context.Tracef("cleaned up %d records", count)
return count
}

Expand All @@ -216,6 +222,7 @@ func (j *Job) Run() {
} else {
r.runId = uuid.NewString()[0:8]
}

r.start()
defer r.end()
if j.Singleton {
Expand Down Expand Up @@ -247,39 +254,54 @@ func (j *Job) Run() {
}
}

func getProperty(j *Job, properties map[string]string, property string) (string, bool) {
if val, ok := properties[j.Name+"."+property]; ok {
return val, ok
}
if val, ok := properties[fmt.Sprintf("%s[%s].%s", j.Name, j.ID, property)]; ok {
return val, ok
}
return "", false
}

func (j *Job) init() {
if j.initialized {
return
}
properties := j.Context.Properties()
if schedule, ok := properties[j.Name+".schedule"]; ok {

if schedule, ok := getProperty(j, properties, "schedule"); ok {
j.Schedule = schedule
}

if timeout, ok := properties[j.Name+".timeout"]; ok {
if timeout, ok := getProperty(j, properties, "timeout"); ok {
duration, err := time.ParseDuration(timeout)
if err != nil {
logger.Warnf("Invalid timeout for %s: %s", j.Name, timeout)
j.Context.Warnf("invalid timeout %s", timeout)
}
j.Timeout = duration
}

if history, ok := properties[j.Name+".history"]; ok {
if history, ok := getProperty(j, properties, "history"); ok {
j.JobHistory = !(history != "false")
}

if debug, ok := properties[j.Name+".debug"]; ok {
j.Debug = debug == "true"
if trace := properties["jobs.trace"]; trace == "true" {
j.Trace = true
} else if trace, ok := getProperty(j, properties, "trace"); ok {
j.Trace = trace == "true"
}

if trace, ok := properties[j.Name+".trace"]; ok {
j.Trace = trace == "true"
if debug := properties["jobs.debug"]; debug == "true" {
j.Debug = true
} else if debug, ok := getProperty(j, properties, "debug"); ok {
j.Debug = debug == "true"
}

if interval, ok := properties[j.Name+".retention.interval"]; ok {
if interval, ok := getProperty(j, properties, "retention.interval"); ok {
duration, err := time.ParseDuration(interval)
if err != nil {
logger.Warnf("Invalid timeout for %s: %s", j.Name, interval)
j.Context.Warnf("invalid timeout %s", interval)
}
j.Retention.Interval = duration
}
Expand All @@ -297,6 +319,11 @@ func (j *Job) init() {
if j.Retention.Interval.Nanoseconds() == 0 {
j.Retention.Interval = time.Hour
}
if j.ID != "" {
j.Context = j.Context.WithoutName().WithName(fmt.Sprintf("%s[%s]", j.Name, j.ID))
} else {
j.Context = j.Context.WithoutName().WithName(j.Name)
}

j.Context = j.Context.WithObject(v1.ObjectMeta{
Name: j.Name,
Expand All @@ -306,15 +333,22 @@ func (j *Job) init() {
},
})

if dbLevel, ok := properties[j.Name+".db.level"]; ok {
if dbLevel, ok := getProperty(j, properties, "db-log-level"); ok {
j.Context = j.Context.WithDBLogLevel(dbLevel)
}

j.Context.Debugf("initalized: %v", j)
j.Context.Tracef("initalized %v", j.String())
j.initialized = true

}

func (j *Job) Label() string {
if j.ID != "" {
return fmt.Sprintf("%s/%s", j.Name, j.ID)
}
return j.Name
}

func (j *Job) String() string {
return fmt.Sprintf("%s{schedule=%v, timeout=%v, history=%v, singleton=%v, retention=(%s)}",
j.Name,
Expand All @@ -328,14 +362,18 @@ func (j *Job) String() string {

func (j *Job) AddToScheduler(cronRunner *cron.Cron) error {
j.init()
if j.Schedule == "@never" {
logger.Infof("[%s] skipping scheduling", j.Name)
schedule := j.Schedule
if override, ok := getProperty(j, j.Context.Properties(), "schedule"); ok {
schedule = override
}
if schedule == "@never" {
j.Context.Infof("skipping scheduling")
return nil
}
logger.Infof("[%s] scheduled %s", j.Name, j.Schedule)
entryID, err := cronRunner.AddJob(j.Schedule, j)
j.Context.Infof("scheduled %s", schedule)
entryID, err := cronRunner.AddJob(schedule, j)
if err != nil {
return fmt.Errorf("failed to schedule job %s: %s", j.Name, err)
return fmt.Errorf("[%s] failed to schedule job: %s", j.Label(), err)
}
j.entryID = &entryID
if j.RunNow {
Expand Down Expand Up @@ -363,6 +401,15 @@ func (j *Job) Unschedule() {
}
}

func (j *Job) Reschedule(schedule string, cronRunner *cron.Cron) error {
if j.unschedule != nil {
j.unschedule()
j.unschedule = nil
}
j.Schedule = schedule
return j.AddToScheduler(cronRunner)
}

func (j *Job) RemoveFromScheduler(cronRunner *cron.Cron) {
if j.entryID == nil {
return
Expand Down
10 changes: 9 additions & 1 deletion models/job_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package models
import (
"fmt"
"os"
"strings"
"time"

"github.com/flanksource/commons/logger"
Expand Down Expand Up @@ -38,6 +39,13 @@ type JobHistory struct {
Errors []string `gorm:"-"`
}

func (j JobHistory) AsError() error {
if len(j.Errors) == 0 {
return nil
}
return fmt.Errorf(strings.Join(j.Errors, ","))
}

func (j JobHistory) TableName() string {
return "job_history"
}
Expand Down Expand Up @@ -82,7 +90,7 @@ func (h *JobHistory) AddError(err string) *JobHistory {
if err != "" {
h.Errors = append(h.Errors, err)
}
logger.Errorf("%s %s", h, err)
logger.StandardLogger().WithSkipReportLevel(1).Errorf("%s %s", h, err)
return h
}

Expand Down

0 comments on commit 0e96188

Please sign in to comment.