Skip to content

Commit

Permalink
Merge pull request #526 from chancez/scheduled_report_parity
Browse files Browse the repository at this point in the history
Update ScheduledReports to support Report features
  • Loading branch information
Chance Zibolski authored Dec 5, 2018
2 parents 7f59fd8 + 470cde5 commit a2e843b
Show file tree
Hide file tree
Showing 13 changed files with 516 additions and 413 deletions.
6 changes: 5 additions & 1 deletion pkg/apis/metering/v1alpha1/scheduled_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ScheduledReportSpec struct {
GenerationQueryName string `json:"generationQuery"`

// Schedule configures when the report runs.
Schedule ScheduledReportSchedule `json:"schedule"`
Schedule *ScheduledReportSchedule `json:"schedule,omitempty"`

// ReportingStart specifies the time this ScheduledReport should start from
// instead of the current time.
Expand All @@ -53,6 +53,10 @@ type ScheduledReportSpec struct {
// than a log of all runs before it.
OverwriteExistingData bool `json:"overwriteExistingData,omitempty"`

// RunImmediately will run the report immediately, ignoring ReportingStart,
// ReportingEnd and GracePeriod.
RunImmediately bool `json:"runImmediately,omitempty"`

// Inputs are the inputs to the ReportGenerationQuery
Inputs ReportGenerationQueryInputValues `json:"inputs,omitempty"`

Expand Down
10 changes: 9 additions & 1 deletion pkg/apis/metering/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

185 changes: 115 additions & 70 deletions pkg/operator/scheduled_reports.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ type reportSchedule interface {
Next(time.Time) time.Time
}

func getSchedule(reportSched cbTypes.ScheduledReportSchedule) (reportSchedule, error) {
func getSchedule(reportSched *cbTypes.ScheduledReportSchedule) (reportSchedule, error) {
var cronSpec string
switch reportSched.Period {
case cbTypes.ScheduledReportPeriodCron:
Expand Down Expand Up @@ -210,10 +210,29 @@ type reportPeriod struct {
// hasn't elapsed, runScheduledReport will requeue the resource for a time when
// the period has elapsed.
func (op *Reporting) runScheduledReport(logger log.FieldLogger, report *cbTypes.ScheduledReport) error {
// check if this report was previously finished
runningCond := cbutil.GetScheduledReportCondition(report.Status, cbTypes.ScheduledReportRunning)
previouslyFinished := runningCond != nil && runningCond.Reason == cbutil.ReportPeriodFinishedReason && runningCond.Status == v1.ConditionFalse

// if the report's reportingEnd is unset or after the lastReportTime
// then the report was updated since it last finished and we should
// consider it something to be reprocessed
if previouslyFinished {
if report.Spec.ReportingEnd == nil {
logger.Infof("previously finished report's spec.reportingEnd is unset: beginning processing of report")
} else if report.Status.LastReportTime != nil && report.Spec.ReportingEnd.Time.After(report.Status.LastReportTime.Time) {
logger.Infof("previously finished report's spec.reportingEnd (%s) is now after lastReportTime (%s): beginning processing of report", report.Spec.ReportingEnd.Time, report.Status.LastReportTime.Time)
} else {
// return without processing because the report is complete
logger.Infof("ScheduledReport %s is already finished: %s", report.Name, runningCond.Message)
return nil
}
}

// validate the scheduledReport before anything else to surface issues
// before we actually run
runningCondition := cbutil.NewScheduledReportCondition(cbTypes.ScheduledReportRunning, v1.ConditionTrue, cbutil.ValidatingScheduledReportReason, "validating report and its dependencies")
cbutil.SetScheduledReportCondition(&report.Status, *runningCondition)
runningCond = cbutil.NewScheduledReportCondition(cbTypes.ScheduledReportRunning, v1.ConditionTrue, cbutil.ValidatingScheduledReportReason, "validating report and its dependencies")
cbutil.SetScheduledReportCondition(&report.Status, *runningCond)
var err error
report, err = op.meteringClient.MeteringV1alpha1().ScheduledReports(report.Namespace).Update(report)
if err != nil {
Expand All @@ -228,6 +247,9 @@ func (op *Reporting) runScheduledReport(logger log.FieldLogger, report *cbTypes.
if report.Spec.ReportingStart != nil && report.Spec.ReportingEnd != nil && (report.Spec.ReportingStart.Time.After(report.Spec.ReportingEnd.Time) || report.Spec.ReportingStart.Time.Equal(report.Spec.ReportingEnd.Time)) {
return op.setScheduledReportStatusValidationFailure(report, fmt.Sprintf("spec.reportingEnd (%s) must be after spec.reportingStart (%s)", report.Spec.ReportingEnd.Time, report.Spec.ReportingStart.Time))
}
if report.Spec.ReportingEnd == nil && report.Spec.RunImmediately {
return op.setScheduledReportStatusValidationFailure(report, "spec.reportingEnd must be set if report.spec.runImmediately is true")
}

genQuery, err := op.reportGenerationQueryLister.ReportGenerationQueries(report.Namespace).Get(report.Spec.GenerationQueryName)
if err != nil {
Expand Down Expand Up @@ -266,34 +288,28 @@ func (op *Reporting) runScheduledReport(logger log.FieldLogger, report *cbTypes.
} else {
logger.Infof("last report time was %s", report.Status.LastReportTime.Time)
}

lastReportTime := report.Status.LastReportTime.Time

// check if this report was previously finished
runningCond := cbutil.GetScheduledReportCondition(report.Status, cbTypes.ScheduledReportRunning)
previouslyFinished := runningCond != nil && runningCond.Reason == cbutil.ReportPeriodFinishedReason && runningCond.Status == v1.ConditionTrue
var reportPeriod *reportPeriod
if report.Spec.Schedule != nil {
reportSchedule, err := getSchedule(report.Spec.Schedule)
if err != nil {
return err
}

// if the report's reportingEnd is unset or after the lastReportTime
// then the report was updated since it last finished and we should
// consider it something to be reprocessed
if previouslyFinished {
if report.Spec.ReportingEnd == nil {
logger.Infof("previously finished report's spec.reportingEnd is unset: beginning processing of report")
} else if report.Spec.ReportingEnd.Time.After(lastReportTime) {
logger.Infof("previously finished report's spec.reportingEnd (%s) is now after lastReportTime (%s): beginning processing of report", report.Spec.ReportingEnd, lastReportTime)
} else {
// return without processing because the report is complete
logger.Infof("ScheduledReport %s is already finished", report.Name, runningCond.Message)
return nil
reportPeriod = getNextReportPeriod(reportSchedule, report.Spec.Schedule.Period, lastReportTime)
} else {
reportPeriod, err = getRunOnceReportPeriod(report)
if err != nil {
return err
}
}

reportSchedule, err := getSchedule(report.Spec.Schedule)
if err != nil {
return err
if reportPeriod.periodStart.After(reportPeriod.periodEnd) {
panic("periodStart should never come after periodEnd")
}

reportPeriod := getNextReportPeriod(reportSchedule, report.Spec.Schedule.Period, lastReportTime)

if report.Spec.ReportingEnd != nil && reportPeriod.periodEnd.After(report.Spec.ReportingEnd.Time) {
logger.Debugf("calculated ScheduledReport periodEnd %s goes beyond spec.reportingEnd %s, setting periodEnd to reportingEnd", reportPeriod.periodEnd, report.Spec.ReportingEnd.Time)
// we need to truncate the reportPeriod to align with the reportingEnd
Expand All @@ -304,53 +320,57 @@ func (op *Reporting) runScheduledReport(logger log.FieldLogger, report *cbTypes.
"lastReportTime": lastReportTime,
"periodStart": reportPeriod.periodStart,
"periodEnd": reportPeriod.periodEnd,
"period": report.Spec.Schedule.Period,
"overwriteExisting": report.Spec.OverwriteExistingData,
})

var gracePeriod time.Duration
if report.Spec.GracePeriod != nil {
gracePeriod = report.Spec.GracePeriod.Duration
var runningMsg string
if report.Spec.RunImmediately {
runningMsg = "ScheduledReport configured to run immediately"
logger.Infof(runningMsg)
} else {
gracePeriod = op.getDefaultReportGracePeriod()
logger.Debugf("ScheduledReport has no gracePeriod configured, falling back to defaultGracePeriod: %s", gracePeriod)
}

nextRunTime := reportPeriod.periodEnd.Add(gracePeriod)
reportGracePeriodUnmet := nextRunTime.After(now)
waitTime := nextRunTime.Sub(now)
var gracePeriod time.Duration
if report.Spec.GracePeriod != nil {
gracePeriod = report.Spec.GracePeriod.Duration
} else {
gracePeriod = op.getDefaultReportGracePeriod()
logger.Debugf("ScheduledReport has no gracePeriod configured, falling back to defaultGracePeriod: %s", gracePeriod)
}

if isRunningCond := cbutil.GetScheduledReportCondition(report.Status, cbTypes.ScheduledReportRunning); isRunningCond != nil && isRunningCond.Reason == cbutil.ReportPeriodWaitingReason && isRunningCond.Status == v1.ConditionTrue && reportGracePeriodUnmet {
// early check to see if an early reconcile occurred and if we're still
// just waiting for the next reporting period, in which case, we can
// just wait until the report period
logger.Debugf("ScheduledReport has a '%s' status with reason: '%s'. next scheduled report period is [%s to %s] with gracePeriod: %s. next run time is %s, waiting %s", cbTypes.ScheduledReportRunning, isRunningCond.Reason, reportPeriod.periodStart, reportPeriod.periodEnd, gracePeriod, nextRunTime, waitTime)
op.enqueueScheduledReportAfter(report, waitTime)
return nil
}
nextRunTime := reportPeriod.periodEnd.Add(gracePeriod)
reportGracePeriodUnmet := nextRunTime.After(now)
waitTime := nextRunTime.Sub(now)

if reportGracePeriodUnmet {
// early check to see if an early reconcile occurred and if we're still
// just waiting for the next reporting period, in which case, we can
// just wait until the report period
if runningCond != nil && runningCond.Reason == cbutil.ReportPeriodWaitingReason {
logger.Debugf("ScheduledReport has a '%s' status with reason: '%s'. next scheduled report period is [%s to %s] with gracePeriod: %s. next run time is %s, waiting %s", cbTypes.ScheduledReportRunning, runningCond.Reason, reportPeriod.periodStart, reportPeriod.periodEnd, gracePeriod, nextRunTime, waitTime)
op.enqueueScheduledReportAfter(report, waitTime)
return nil
}

if reportGracePeriodUnmet {
waitMsg := fmt.Sprintf("next scheduled report period is [%s to %s] with gracePeriod: %s. next run time is %s", reportPeriod.periodStart, reportPeriod.periodEnd, gracePeriod, nextRunTime)
logger.Infof(waitMsg+". waiting %s", waitTime)
waitMsg := fmt.Sprintf("next scheduled report period is [%s to %s] with gracePeriod: %s. next run time is %s", reportPeriod.periodStart, reportPeriod.periodEnd, gracePeriod, nextRunTime)
logger.Infof(waitMsg+". waiting %s", waitTime)

report, err = op.updateScheduledReportStatusRunning(report, cbutil.ReportPeriodWaitingReason, waitMsg)
if err != nil {
return err
}

// we requeue this for later when the period we need to report on next
// has elapsed
op.enqueueScheduledReportAfter(report, waitTime)
return nil
} else {
runningMsg := fmt.Sprintf("reached end of last reporting period [%s to %s]", reportPeriod.periodStart, reportPeriod.periodEnd)
logger.Infof(runningMsg + ", running now")
report, err = op.updateScheduledReportStatusRunning(report, cbutil.ReportPeriodWaitingReason, waitMsg)
if err != nil {
return err
}

report, err = op.updateScheduledReportStatusRunning(report, cbutil.ScheduledReason, runningMsg)
if err != nil {
return err
// we requeue this for later when the period we need to report on next
// has elapsed
op.enqueueScheduledReportAfter(report, waitTime)
return nil
} else {
runningMsg = fmt.Sprintf("reached end of last reporting period [%s to %s]", reportPeriod.periodStart, reportPeriod.periodEnd)
logger.Infof(runningMsg + ", running now")
}
}
report, err = op.updateScheduledReportStatusRunning(report, cbutil.ScheduledReason, runningMsg)
if err != nil {
return err
}

tableName := reportingutil.ScheduledReportTableName(report.Name)
// if tableName isn't set, this report is still new and we should make sure
Expand Down Expand Up @@ -426,8 +446,8 @@ func (op *Reporting) runScheduledReport(logger log.FieldLogger, report *cbTypes.
if finalRun {
// update the status to indicate the report doesn't need to run again
msg := fmt.Sprintf("ScheduledReport has finished reporting. Report has reached the configured spec.reportingEnd: %s", report.Spec.ReportingEnd.Time)
runningCondition := cbutil.NewScheduledReportCondition(cbTypes.ScheduledReportRunning, v1.ConditionFalse, cbutil.ReportPeriodFinishedReason, msg)
cbutil.SetScheduledReportCondition(&report.Status, *runningCondition)
runningCond := cbutil.NewScheduledReportCondition(cbTypes.ScheduledReportRunning, v1.ConditionFalse, cbutil.ReportPeriodFinishedReason, msg)
cbutil.SetScheduledReportCondition(&report.Status, *runningCond)
logger.Infof(msg)
}

Expand All @@ -447,20 +467,45 @@ func (op *Reporting) runScheduledReport(logger log.FieldLogger, report *cbTypes.
}

// determine how long we have to wait until we should re run this handler,
// and then queue the report for that time
now = op.clock.Now().UTC()
reportPeriod = getNextReportPeriod(reportSchedule, report.Spec.Schedule.Period, report.Status.LastReportTime.Time)
nextRunTime = reportPeriod.periodEnd.Add(gracePeriod)
waitTime = nextRunTime.Sub(now)
op.enqueueScheduledReportAfter(report, waitTime)
// if it's not a run-once report and then queue the report for that time
if report.Spec.Schedule != nil {
reportSchedule, err := getSchedule(report.Spec.Schedule)
if err != nil {
return err
}

reportPeriod = getNextReportPeriod(reportSchedule, report.Spec.Schedule.Period, report.Status.LastReportTime.Time)

var gracePeriod time.Duration
if report.Spec.GracePeriod != nil {
gracePeriod = report.Spec.GracePeriod.Duration
} else {
gracePeriod = op.getDefaultReportGracePeriod()
}

now = op.clock.Now().UTC()
nextRunTime := reportPeriod.periodEnd.Add(gracePeriod)
waitTime := nextRunTime.Sub(now)
op.enqueueScheduledReportAfter(report, waitTime)
}
return nil
}

func getNextReportPeriod(schedule reportSchedule, period cbTypes.ScheduledReportPeriod, lastScheduled time.Time) reportPeriod {
func getRunOnceReportPeriod(report *cbTypes.ScheduledReport) (*reportPeriod, error) {
if report.Spec.ReportingEnd == nil || report.Spec.ReportingStart == nil {
return nil, fmt.Errorf("run-once reports must have both ReportingEnd and ReportingStart")
}
reportPeriod := &reportPeriod{
periodStart: report.Spec.ReportingStart.UTC(),
periodEnd: report.Spec.ReportingEnd.UTC(),
}
return reportPeriod, nil
}

func getNextReportPeriod(schedule reportSchedule, period cbTypes.ScheduledReportPeriod, lastScheduled time.Time) *reportPeriod {
periodStart := lastScheduled
periodEnd := schedule.Next(periodStart)
return reportPeriod{
return &reportPeriod{
periodEnd: periodEnd.Truncate(time.Millisecond).UTC(),
periodStart: periodStart.Truncate(time.Millisecond).UTC(),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/operator/scheduled_reports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestGetNextReportPeriod(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
apiSched := v1alpha1.ScheduledReportSchedule{
apiSched := &v1alpha1.ScheduledReportSchedule{
Period: test.period,
// Normally only one is set, but we simply use a zero value
// for each to make it easier in tests.
Expand All @@ -85,7 +85,7 @@ func TestGetNextReportPeriod(t *testing.T) {

for _, expectedReportPeriod := range test.expectReportPeriods {
reportPeriod := getNextReportPeriod(schedule, test.period, lastScheduled)
assert.Equal(t, expectedReportPeriod, reportPeriod)
assert.Equal(t, &expectedReportPeriod, reportPeriod)
lastScheduled = expectedReportPeriod.periodEnd
}

Expand Down
Loading

0 comments on commit a2e843b

Please sign in to comment.