Skip to content

Commit

Permalink
Merge pull request #1667 from rancher/messageFix
Browse files Browse the repository at this point in the history
Alerting message fix
  • Loading branch information
dbason authored Sep 13, 2023
2 parents 24681cc + 21b2872 commit 5fdd711
Show file tree
Hide file tree
Showing 32 changed files with 602 additions and 404 deletions.
2 changes: 1 addition & 1 deletion internal/alerting/syncer/syncer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (a *AlertManagerSyncerV1) recvMsgs(
RECV:
syncReq, err := remoteSyncerClient.Recv()
if err == io.EOF {
a.lg.Warnf("remote syncer disconnected, reconnecting, ...")
a.lg.Info("remote syncer disconnected, reconnecting, ...")
break
}
if st, ok := status.FromError(err); ok && err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/alerting/drivers/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (o *OpniRouterV1) HasReceivers(routingId string) []string {
}

func (o *OpniRouterV1) SetDefaultReceiver(cfg config.WebhookConfig) {
o.mu.Lock()
defer o.mu.Unlock()
o.DefaultReceiver = cfg
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/alerting/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ const (
NotificationPartitionByDetails = "details"
)

const (
// Reserved namespace for routing messages to loaded receivers
TestNamespace = "test"
)

// var _ Message = (*config.Alert)(nil)

// Identifies important information in message contents
Expand Down
85 changes: 34 additions & 51 deletions pkg/alerting/storage/clientset.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package storage

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"io"
"strings"
"fmt"
"time"

"slices"
Expand All @@ -15,10 +13,12 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"gopkg.in/yaml.v3"

"github.com/rancher/opni/pkg/alerting/drivers/backend"
"github.com/rancher/opni/pkg/alerting/drivers/routing"
"github.com/rancher/opni/pkg/alerting/message"
"github.com/rancher/opni/pkg/alerting/shared"
storage_opts "github.com/rancher/opni/pkg/alerting/storage/opts"
"github.com/rancher/opni/pkg/alerting/storage/spec"
Expand Down Expand Up @@ -86,54 +86,6 @@ func (c *CompositeAlertingClientSet) GetHash(ctx context.Context, key string) (s
return hex.EncodeToString(hash.Sum(nil)), nil
}

func (c *CompositeAlertingClientSet) CalculateHash(ctx context.Context, key string, syncOptions *storage_opts.SyncOptions) error {
aggregate := ""
if syncOptions.DefaultReceiver != nil {
var st bytes.Buffer
err := yaml.NewEncoder(&st).Encode(syncOptions.DefaultReceiver)
if err != nil {
return err
}
aggregate += st.String()
}
if key == shared.SingleConfigId {
conds, err := c.listAllConditions(ctx)
if err != nil {
return err
}
slices.SortFunc(conds, func(a, b *alertingv1.AlertCondition) int {
if a.GroupId != b.GroupId {
return strings.Compare(a.GroupId, b.GroupId)
}
return strings.Compare(a.Id, b.Id)
})
aggregate += strings.Join(
lo.Map(conds, func(a *alertingv1.AlertCondition, _ int) string {
return a.Id + a.GroupId + a.LastUpdated.String()
}), "-")
endps, err := c.Endpoints().List(ctx)
if err != nil {
return err
}
slices.SortFunc(endps, func(a, b *alertingv1.AlertEndpoint) int {
return strings.Compare(a.Id, b.Id)
})
aggregate += strings.Join(
lo.Map(endps, func(a *alertingv1.AlertEndpoint, _ int) string {
return a.Id + a.LastUpdated.String()
}), "_")
} else {
panic("not implemented")
}
encode := strings.NewReader(aggregate)
hash := sha256.New()
if _, err := io.Copy(hash, encode); err != nil {
return err
}
c.hashes[key] = hex.EncodeToString(hash.Sum(nil))
return nil
}

func (c *CompositeAlertingClientSet) listAllConditions(ctx context.Context) ([]*alertingv1.AlertCondition, error) {
groups, err := c.Conditions().ListGroups(ctx)
if err != nil {
Expand Down Expand Up @@ -169,6 +121,36 @@ func (c *CompositeAlertingClientSet) calculateRouters(ctx context.Context, syncO
if syncOpts.Router == nil {
syncOpts.Router = routing.NewDefaultOpniRouting()
}

endpKeys := lo.Keys(endpMap)
slices.Sort(endpKeys)

for _, endpId := range endpKeys {
endp := endpMap[endpId]
err = syncOpts.Router.SetNamespaceSpec(message.TestNamespace, endp.Id, &alertingv1.FullAttachedEndpoints{
InitialDelay: durationpb.New(time.Second),
RepeatInterval: durationpb.New(time.Hour),
ThrottlingDuration: durationpb.New(time.Minute),
Details: &alertingv1.EndpointImplementation{
Title: "Test admin notification",
Body: fmt.Sprintf("Test admin notification : %s", endp.Name),
},
Items: []*alertingv1.FullAttachedEndpoint{
{
EndpointId: endp.Id,
AlertEndpoint: endp,
Details: &alertingv1.EndpointImplementation{
Title: "Test admin notification",
Body: fmt.Sprintf("Test admin notification : %s", endp.Name),
},
},
},
})
if err != nil {
return nil, err
}
}

// create router specs for conditions
for _, cond := range conds {
if cond.Id == "" {
Expand Down Expand Up @@ -212,6 +194,7 @@ func (c *CompositeAlertingClientSet) calculateRouters(ctx context.Context, syncO
panic(err)
}
}

// set expected defaults based on endpoint configuration
defaults := lo.Filter(endps, func(a *alertingv1.AlertEndpoint, _ int) bool {
if len(a.GetProperties()) == 0 {
Expand Down
93 changes: 68 additions & 25 deletions pkg/alerting/storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,12 +509,14 @@ func BuildStorageClientSetSuite(
})

Specify("the hash ring should change its hash when configurations change enough to warrant an update", func() {
id1 := uuid.New().String()
id2 := uuid.New().String()
err := s.Endpoints().Put(ctx, id1, &alertingv1.AlertEndpoint{
id1Condition := uuid.New().String()
id2Condition := uuid.New().String()
id1Endpoint := uuid.New().String()
id2Endpoint := uuid.New().String()
err := s.Endpoints().Put(ctx, id1Endpoint, &alertingv1.AlertEndpoint{
Name: "sample endpoint",
Description: "sample description",
Id: id1,
Id: id1Endpoint,
LastUpdated: timestamppb.Now(),
Endpoint: &alertingv1.AlertEndpoint_Slack{
Slack: &alertingv1.SlackEndpoint{
Expand All @@ -524,10 +526,10 @@ func BuildStorageClientSetSuite(
},
})
Expect(err).To(Succeed())
err = s.Endpoints().Put(ctx, id2, &alertingv1.AlertEndpoint{
err = s.Endpoints().Put(ctx, id2Endpoint, &alertingv1.AlertEndpoint{
Name: "sample endpoint",
Description: "sample description",
Id: id2,
Id: id2Endpoint,
LastUpdated: timestamppb.Now(),
Endpoint: &alertingv1.AlertEndpoint_Slack{
Slack: &alertingv1.SlackEndpoint{
Expand All @@ -540,16 +542,16 @@ func BuildStorageClientSetSuite(

mutateState := []func(){
func() { // new
err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{
err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{
Name: "sample condition",
Description: "sample description",
Id: id1,
Id: id1Condition,
LastUpdated: timestamppb.Now(),
Severity: alertingv1.OpniSeverity_Info,
AttachedEndpoints: &alertingv1.AttachedEndpoints{
Items: []*alertingv1.AttachedEndpoint{
{
EndpointId: id1,
EndpointId: id1Endpoint,
},
},
Details: &alertingv1.EndpointImplementation{
Expand All @@ -561,17 +563,17 @@ func BuildStorageClientSetSuite(
Expect(err).To(Succeed())
},
func() { // new
err := s.Conditions().Group("test-group").Put(ctx, id2, &alertingv1.AlertCondition{
err := s.Conditions().Group("test-group").Put(ctx, id2Condition, &alertingv1.AlertCondition{
Name: "sample condition",
Description: "sample description",
Id: id2,
Id: id2Condition,
GroupId: "test-group",
LastUpdated: timestamppb.Now(),
Severity: alertingv1.OpniSeverity_Info,
AttachedEndpoints: &alertingv1.AttachedEndpoints{
Items: []*alertingv1.AttachedEndpoint{
{
EndpointId: id2,
EndpointId: id2Endpoint,
},
},
Details: &alertingv1.EndpointImplementation{
Expand All @@ -583,16 +585,16 @@ func BuildStorageClientSetSuite(
Expect(err).To(Succeed())
},
func() { // update timestamp
err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{
err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{
Name: "sample condition",
Description: "sample description",
Id: id1,
Id: id1Condition,
LastUpdated: timestamppb.Now(),
Severity: alertingv1.OpniSeverity_Info,
AttachedEndpoints: &alertingv1.AttachedEndpoints{
Items: []*alertingv1.AttachedEndpoint{
{
EndpointId: id2,
EndpointId: id2Endpoint,
},
},
Details: &alertingv1.EndpointImplementation{
Expand All @@ -604,16 +606,16 @@ func BuildStorageClientSetSuite(
Expect(err).To(Succeed())
},
func() {
err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{
err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{
Name: "sample condition",
Description: "sample description",
Id: id2,
Id: id2Condition,
LastUpdated: timestamppb.Now(),
Severity: alertingv1.OpniSeverity_Info,
AttachedEndpoints: &alertingv1.AttachedEndpoints{
Items: []*alertingv1.AttachedEndpoint{
{
EndpointId: id1,
EndpointId: id1Endpoint,
},
},
Details: &alertingv1.EndpointImplementation{
Expand All @@ -625,19 +627,19 @@ func BuildStorageClientSetSuite(
Expect(err).To(Succeed())
},
func() {
err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{
err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{
Name: "sample condition",
Description: "sample description",
Id: id1,
Id: id1Condition,
LastUpdated: timestamppb.Now(),
Severity: alertingv1.OpniSeverity_Info,
AttachedEndpoints: &alertingv1.AttachedEndpoints{
Items: []*alertingv1.AttachedEndpoint{
{
EndpointId: id1,
EndpointId: id1Endpoint,
},
{
EndpointId: id2,
EndpointId: id2Endpoint,
},
},
Details: &alertingv1.EndpointImplementation{
Expand All @@ -649,10 +651,10 @@ func BuildStorageClientSetSuite(
Expect(err).To(Succeed())
},
func() {
err = s.Endpoints().Put(ctx, id2, &alertingv1.AlertEndpoint{
err = s.Endpoints().Put(ctx, id2Endpoint, &alertingv1.AlertEndpoint{
Name: "sample endpoint",
Description: "sample description",
Id: id2,
Id: id2Endpoint,
LastUpdated: timestamppb.Now(),
Endpoint: &alertingv1.AlertEndpoint_Slack{
Slack: &alertingv1.SlackEndpoint{
Expand Down Expand Up @@ -697,7 +699,48 @@ func BuildStorageClientSetSuite(
}
})

Specify("the hash should not change when no meaningul configuration change occurs", func() {
Specify("the hash should change when we add endpoints", func() {
oldHash, err := s.GetHash(ctx, shared.SingleConfigId)
Expect(err).To(Succeed())
err = s.Endpoints().Put(ctx, "endpoint1", &alertingv1.AlertEndpoint{
Name: "sample endpoint",
Description: "sample description",
Id: "endpoint1",
LastUpdated: timestamppb.Now(),
Endpoint: &alertingv1.AlertEndpoint_Slack{
Slack: &alertingv1.SlackEndpoint{
WebhookUrl: "https://slack222.com",
Channel: "#test222",
},
},
})
Expect(err).To(Succeed())

_, err = s.Sync(ctx)
Expect(err).To(Succeed())

newHash, err := s.GetHash(ctx, shared.SingleConfigId)
Expect(err).To(Succeed())
Expect(newHash).NotTo(Equal(oldHash))
})

Specify("the hash should change when we remove endpoints", func() {
oldHash, err := s.GetHash(ctx, shared.SingleConfigId)
Expect(err).To(Succeed())
err = s.Endpoints().Delete(ctx, "endpoint1")
Expect(err).To(Succeed())

_, err = s.Sync(ctx)
Expect(err).To(Succeed())

newHash, err := s.GetHash(ctx, shared.SingleConfigId)
Expect(err).To(Succeed())
Expect(newHash).NotTo(Equal(oldHash))
})

Specify("the hash should not change when no meaningful configuration change occurs", func() {
_, err := s.Sync(ctx)
Expect(err).To(Succeed())
for i := 0; i < 10; i++ {
oldHash, err := s.GetHash(ctx, shared.SingleConfigId)
Expect(err).To(Succeed())
Expand Down
Loading

0 comments on commit 5fdd711

Please sign in to comment.