Skip to content

Commit

Permalink
Alerts: Fix data at watermark boundary is excluded (#4348)
Browse files Browse the repository at this point in the history
* Alerts boundary fix

* Alert error info log

* Wording

* Fix test
  • Loading branch information
begelundmuller authored and nishantmonu51 committed Mar 15, 2024
1 parent ce084c8 commit 86ee030
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
4 changes: 4 additions & 0 deletions runtime/queries/resource_watermark.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (q *ResourceWatermark) resolveMetricsView(ctx context.Context, rt *runtime.
}

if !t.IsZero() {
// Hacky workaround for the following issue: the watermark is used as the *exclusive* upper bound for time ranges.
// We need to add a small delta to ensure the row with the watermark is included in the result.
t = t.Add(time.Second)

q.Result = &t
}

Expand Down
2 changes: 1 addition & 1 deletion runtime/queries/resource_watermark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestResourceWatermark_MetricsView(t *testing.T) {

rt, instanceID := testruntime.NewInstance(t)
testruntime.PutFiles(t, rt, instanceID, map[string]string{
"/models/foo.sql": fmt.Sprintf(`SELECT '%s'::TIMESTAMP as time`, ts.Format(time.RFC3339)),
"/models/foo.sql": fmt.Sprintf(`SELECT '%s'::TIMESTAMP as time`, ts.Add(-time.Second).Format(time.RFC3339)),
"/dashboards/bare.yaml": `
model: foo
measures:
Expand Down
8 changes: 5 additions & 3 deletions runtime/reconcilers/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *AlertReconciler) Reconcile(ctx context.Context, n *runtimev1.ResourceNa
}

// As a special rule, we override the refresh schedule to run every 10 minutes if:
// ref_update=true, one of the refs is streaming, and no other schedule is set.
// ref_update=true and one of the refs is streaming (and an explicit schedule wasn't provided).
if hasStreamingRef(ctx, r.C, self.Meta.Refs) {
if a.Spec.RefreshSchedule != nil && a.Spec.RefreshSchedule.RefUpdate {
if a.Spec.RefreshSchedule.TickerSeconds == 0 && a.Spec.RefreshSchedule.Cron == "" {
Expand Down Expand Up @@ -472,7 +472,7 @@ func (r *AlertReconciler) executeAllWrapped(ctx context.Context, self *runtimev1
}

if len(ts) == 0 {
r.C.Logger.Debug("Skipped alert check because watermark has not advanced by a full interval", zap.String("name", self.Meta.Name.Name), zap.Time("current_watermark", watermark), zap.Time("previous_watermark", previousWatermark), zap.String("interval", a.Spec.IntervalsIsoDuration))
r.C.Logger.Debug("Skipped alert check because watermark is unchanged or has not advanced by a full interval", zap.String("name", self.Meta.Name.Name), zap.Time("current_watermark", watermark), zap.Time("previous_watermark", previousWatermark), zap.String("interval", a.Spec.IntervalsIsoDuration))
return nil
}

Expand Down Expand Up @@ -514,6 +514,8 @@ func (r *AlertReconciler) executeSingle(ctx context.Context, self *runtimev1.Res
Status: runtimev1.AssertionStatus_ASSERTION_STATUS_ERROR,
ErrorMessage: fmt.Sprintf("Alert check failed: %s", executeErr.Error()),
}

r.C.Logger.Info("Alert errored", zap.String("name", self.Meta.Name.Name), zap.Time("execution_time", executionTime), zap.Error(executeErr))
}

// Finalize and pop current execution.
Expand Down Expand Up @@ -757,7 +759,7 @@ func (r *AlertReconciler) computeInheritedWatermark(ctx context.Context, refs []
func calculateExecutionTimes(a *runtimev1.Alert, watermark, previousWatermark time.Time) ([]time.Time, error) {
// If the watermark is unchanged, skip the check.
// NOTE: It might make sense to make this configurable in the future, but the use cases seem limited.
// The watermark can only be unchanged if watermark="inherit", and since that indicates watermarks can be trusted, why check for the same watermark?
// The watermark can only be unchanged if watermark="inherit" and since that indicates watermarks can be trusted, why check for the same watermark?
if watermark.Equal(previousWatermark) {
return nil, nil
}
Expand Down

2 comments on commit 86ee030

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉 Published on https://ui.rilldata.com as production
🚀 Deployed on https://65f43ab49d8dc55823e20042--rill-ui.netlify.app

Please sign in to comment.