diff --git a/internal/alerting/syncer/syncer_server.go b/internal/alerting/syncer/syncer_server.go index 2718382763..ff2f8f2064 100644 --- a/internal/alerting/syncer/syncer_server.go +++ b/internal/alerting/syncer/syncer_server.go @@ -153,7 +153,7 @@ func (a *AlertManagerSyncerV1) recvMsgs( RECV: syncReq, err := remoteSyncerClient.Recv() if err == io.EOF { - a.lg.Warnf("remote syncer disconnected, reconnecting, ...") + a.lg.Info("remote syncer disconnected, reconnecting, ...") break } if st, ok := status.FromError(err); ok && err != nil { diff --git a/pkg/alerting/drivers/routing/routing.go b/pkg/alerting/drivers/routing/routing.go index a72d66f93a..9fbd836db6 100644 --- a/pkg/alerting/drivers/routing/routing.go +++ b/pkg/alerting/drivers/routing/routing.go @@ -229,6 +229,8 @@ func (o *OpniRouterV1) HasReceivers(routingId string) []string { } func (o *OpniRouterV1) SetDefaultReceiver(cfg config.WebhookConfig) { + o.mu.Lock() + defer o.mu.Unlock() o.DefaultReceiver = cfg } diff --git a/pkg/alerting/message/message.go b/pkg/alerting/message/message.go index f484207db4..3bcfd9ff3b 100644 --- a/pkg/alerting/message/message.go +++ b/pkg/alerting/message/message.go @@ -41,6 +41,11 @@ const ( NotificationPartitionByDetails = "details" ) +const ( + // Reserved namespace for routing messages to loaded receivers + TestNamespace = "test" +) + // var _ Message = (*config.Alert)(nil) // Identifies important information in message contents diff --git a/pkg/alerting/storage/clientset.go b/pkg/alerting/storage/clientset.go index 4e5d6102d9..802f905f20 100644 --- a/pkg/alerting/storage/clientset.go +++ b/pkg/alerting/storage/clientset.go @@ -1,12 +1,10 @@ package storage import ( - "bytes" "context" "crypto/sha256" "encoding/hex" - "io" - "strings" + "fmt" "time" "slices" @@ -15,10 +13,12 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "gopkg.in/yaml.v3" "github.com/rancher/opni/pkg/alerting/drivers/backend" "github.com/rancher/opni/pkg/alerting/drivers/routing" + "github.com/rancher/opni/pkg/alerting/message" "github.com/rancher/opni/pkg/alerting/shared" storage_opts "github.com/rancher/opni/pkg/alerting/storage/opts" "github.com/rancher/opni/pkg/alerting/storage/spec" @@ -86,54 +86,6 @@ func (c *CompositeAlertingClientSet) GetHash(ctx context.Context, key string) (s return hex.EncodeToString(hash.Sum(nil)), nil } -func (c *CompositeAlertingClientSet) CalculateHash(ctx context.Context, key string, syncOptions *storage_opts.SyncOptions) error { - aggregate := "" - if syncOptions.DefaultReceiver != nil { - var st bytes.Buffer - err := yaml.NewEncoder(&st).Encode(syncOptions.DefaultReceiver) - if err != nil { - return err - } - aggregate += st.String() - } - if key == shared.SingleConfigId { - conds, err := c.listAllConditions(ctx) - if err != nil { - return err - } - slices.SortFunc(conds, func(a, b *alertingv1.AlertCondition) int { - if a.GroupId != b.GroupId { - return strings.Compare(a.GroupId, b.GroupId) - } - return strings.Compare(a.Id, b.Id) - }) - aggregate += strings.Join( - lo.Map(conds, func(a *alertingv1.AlertCondition, _ int) string { - return a.Id + a.GroupId + a.LastUpdated.String() - }), "-") - endps, err := c.Endpoints().List(ctx) - if err != nil { - return err - } - slices.SortFunc(endps, func(a, b *alertingv1.AlertEndpoint) int { - return strings.Compare(a.Id, b.Id) - }) - aggregate += strings.Join( - lo.Map(endps, func(a *alertingv1.AlertEndpoint, _ int) string { - return a.Id + a.LastUpdated.String() - }), "_") - } else { - panic("not implemented") - } - encode := strings.NewReader(aggregate) - hash := sha256.New() - if _, err := io.Copy(hash, encode); err != nil { - return err - } - c.hashes[key] = hex.EncodeToString(hash.Sum(nil)) - return nil -} - func (c *CompositeAlertingClientSet) listAllConditions(ctx context.Context) ([]*alertingv1.AlertCondition, error) { groups, err := c.Conditions().ListGroups(ctx) if err != nil { @@ -169,6 +121,36 @@ func (c *CompositeAlertingClientSet) calculateRouters(ctx context.Context, syncO if syncOpts.Router == nil { syncOpts.Router = routing.NewDefaultOpniRouting() } + + endpKeys := lo.Keys(endpMap) + slices.Sort(endpKeys) + + for _, endpId := range endpKeys { + endp := endpMap[endpId] + err = syncOpts.Router.SetNamespaceSpec(message.TestNamespace, endp.Id, &alertingv1.FullAttachedEndpoints{ + InitialDelay: durationpb.New(time.Second), + RepeatInterval: durationpb.New(time.Hour), + ThrottlingDuration: durationpb.New(time.Minute), + Details: &alertingv1.EndpointImplementation{ + Title: "Test admin notification", + Body: fmt.Sprintf("Test admin notification : %s", endp.Name), + }, + Items: []*alertingv1.FullAttachedEndpoint{ + { + EndpointId: endp.Id, + AlertEndpoint: endp, + Details: &alertingv1.EndpointImplementation{ + Title: "Test admin notification", + Body: fmt.Sprintf("Test admin notification : %s", endp.Name), + }, + }, + }, + }) + if err != nil { + return nil, err + } + } + // create router specs for conditions for _, cond := range conds { if cond.Id == "" { @@ -212,6 +194,7 @@ func (c *CompositeAlertingClientSet) calculateRouters(ctx context.Context, syncO panic(err) } } + // set expected defaults based on endpoint configuration defaults := lo.Filter(endps, func(a *alertingv1.AlertEndpoint, _ int) bool { if len(a.GetProperties()) == 0 { diff --git a/pkg/alerting/storage/storage_test.go b/pkg/alerting/storage/storage_test.go index e14aafc8bb..ce41810397 100644 --- a/pkg/alerting/storage/storage_test.go +++ b/pkg/alerting/storage/storage_test.go @@ -509,12 +509,14 @@ func BuildStorageClientSetSuite( }) Specify("the hash ring should change its hash when configurations change enough to warrant an update", func() { - id1 := uuid.New().String() - id2 := uuid.New().String() - err := s.Endpoints().Put(ctx, id1, &alertingv1.AlertEndpoint{ + id1Condition := uuid.New().String() + id2Condition := uuid.New().String() + id1Endpoint := uuid.New().String() + id2Endpoint := uuid.New().String() + err := s.Endpoints().Put(ctx, id1Endpoint, &alertingv1.AlertEndpoint{ Name: "sample endpoint", Description: "sample description", - Id: id1, + Id: id1Endpoint, LastUpdated: timestamppb.Now(), Endpoint: &alertingv1.AlertEndpoint_Slack{ Slack: &alertingv1.SlackEndpoint{ @@ -524,10 +526,10 @@ func BuildStorageClientSetSuite( }, }) Expect(err).To(Succeed()) - err = s.Endpoints().Put(ctx, id2, &alertingv1.AlertEndpoint{ + err = s.Endpoints().Put(ctx, id2Endpoint, &alertingv1.AlertEndpoint{ Name: "sample endpoint", Description: "sample description", - Id: id2, + Id: id2Endpoint, LastUpdated: timestamppb.Now(), Endpoint: &alertingv1.AlertEndpoint_Slack{ Slack: &alertingv1.SlackEndpoint{ @@ -540,16 +542,16 @@ func BuildStorageClientSetSuite( mutateState := []func(){ func() { // new - err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{ + err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{ Name: "sample condition", Description: "sample description", - Id: id1, + Id: id1Condition, LastUpdated: timestamppb.Now(), Severity: alertingv1.OpniSeverity_Info, AttachedEndpoints: &alertingv1.AttachedEndpoints{ Items: []*alertingv1.AttachedEndpoint{ { - EndpointId: id1, + EndpointId: id1Endpoint, }, }, Details: &alertingv1.EndpointImplementation{ @@ -561,17 +563,17 @@ func BuildStorageClientSetSuite( Expect(err).To(Succeed()) }, func() { // new - err := s.Conditions().Group("test-group").Put(ctx, id2, &alertingv1.AlertCondition{ + err := s.Conditions().Group("test-group").Put(ctx, id2Condition, &alertingv1.AlertCondition{ Name: "sample condition", Description: "sample description", - Id: id2, + Id: id2Condition, GroupId: "test-group", LastUpdated: timestamppb.Now(), Severity: alertingv1.OpniSeverity_Info, AttachedEndpoints: &alertingv1.AttachedEndpoints{ Items: []*alertingv1.AttachedEndpoint{ { - EndpointId: id2, + EndpointId: id2Endpoint, }, }, Details: &alertingv1.EndpointImplementation{ @@ -583,16 +585,16 @@ func BuildStorageClientSetSuite( Expect(err).To(Succeed()) }, func() { // update timestamp - err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{ + err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{ Name: "sample condition", Description: "sample description", - Id: id1, + Id: id1Condition, LastUpdated: timestamppb.Now(), Severity: alertingv1.OpniSeverity_Info, AttachedEndpoints: &alertingv1.AttachedEndpoints{ Items: []*alertingv1.AttachedEndpoint{ { - EndpointId: id2, + EndpointId: id2Endpoint, }, }, Details: &alertingv1.EndpointImplementation{ @@ -604,16 +606,16 @@ func BuildStorageClientSetSuite( Expect(err).To(Succeed()) }, func() { - err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{ + err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{ Name: "sample condition", Description: "sample description", - Id: id2, + Id: id2Condition, LastUpdated: timestamppb.Now(), Severity: alertingv1.OpniSeverity_Info, AttachedEndpoints: &alertingv1.AttachedEndpoints{ Items: []*alertingv1.AttachedEndpoint{ { - EndpointId: id1, + EndpointId: id1Endpoint, }, }, Details: &alertingv1.EndpointImplementation{ @@ -625,19 +627,19 @@ func BuildStorageClientSetSuite( Expect(err).To(Succeed()) }, func() { - err := s.Conditions().Group("").Put(ctx, id1, &alertingv1.AlertCondition{ + err := s.Conditions().Group("").Put(ctx, id1Condition, &alertingv1.AlertCondition{ Name: "sample condition", Description: "sample description", - Id: id1, + Id: id1Condition, LastUpdated: timestamppb.Now(), Severity: alertingv1.OpniSeverity_Info, AttachedEndpoints: &alertingv1.AttachedEndpoints{ Items: []*alertingv1.AttachedEndpoint{ { - EndpointId: id1, + EndpointId: id1Endpoint, }, { - EndpointId: id2, + EndpointId: id2Endpoint, }, }, Details: &alertingv1.EndpointImplementation{ @@ -649,10 +651,10 @@ func BuildStorageClientSetSuite( Expect(err).To(Succeed()) }, func() { - err = s.Endpoints().Put(ctx, id2, &alertingv1.AlertEndpoint{ + err = s.Endpoints().Put(ctx, id2Endpoint, &alertingv1.AlertEndpoint{ Name: "sample endpoint", Description: "sample description", - Id: id2, + Id: id2Endpoint, LastUpdated: timestamppb.Now(), Endpoint: &alertingv1.AlertEndpoint_Slack{ Slack: &alertingv1.SlackEndpoint{ @@ -697,7 +699,48 @@ func BuildStorageClientSetSuite( } }) - Specify("the hash should not change when no meaningul configuration change occurs", func() { + Specify("the hash should change when we add endpoints", func() { + oldHash, err := s.GetHash(ctx, shared.SingleConfigId) + Expect(err).To(Succeed()) + err = s.Endpoints().Put(ctx, "endpoint1", &alertingv1.AlertEndpoint{ + Name: "sample endpoint", + Description: "sample description", + Id: "endpoint1", + LastUpdated: timestamppb.Now(), + Endpoint: &alertingv1.AlertEndpoint_Slack{ + Slack: &alertingv1.SlackEndpoint{ + WebhookUrl: "https://slack222.com", + Channel: "#test222", + }, + }, + }) + Expect(err).To(Succeed()) + + _, err = s.Sync(ctx) + Expect(err).To(Succeed()) + + newHash, err := s.GetHash(ctx, shared.SingleConfigId) + Expect(err).To(Succeed()) + Expect(newHash).NotTo(Equal(oldHash)) + }) + + Specify("the hash should change when we remove endpoints", func() { + oldHash, err := s.GetHash(ctx, shared.SingleConfigId) + Expect(err).To(Succeed()) + err = s.Endpoints().Delete(ctx, "endpoint1") + Expect(err).To(Succeed()) + + _, err = s.Sync(ctx) + Expect(err).To(Succeed()) + + newHash, err := s.GetHash(ctx, shared.SingleConfigId) + Expect(err).To(Succeed()) + Expect(newHash).NotTo(Equal(oldHash)) + }) + + Specify("the hash should not change when no meaningful configuration change occurs", func() { + _, err := s.Sync(ctx) + Expect(err).To(Succeed()) for i := 0; i < 10; i++ { oldHash, err := s.GetHash(ctx, shared.SingleConfigId) Expect(err).To(Succeed()) diff --git a/pkg/apis/alerting/v1/alerting.endpoint.pb.go b/pkg/apis/alerting/v1/alerting.endpoint.pb.go index 432e737029..89612cd706 100644 --- a/pkg/apis/alerting/v1/alerting.endpoint.pb.go +++ b/pkg/apis/alerting/v1/alerting.endpoint.pb.go @@ -2241,7 +2241,7 @@ var file_github_com_rancher_opni_pkg_apis_alerting_v1_alerting_endpoint_proto_ra 0x6f, 0x69, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x22, 0x30, 0x0a, 0x0d, 0x54, 0x6f, 0x67, 0x67, 0x6c, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1f, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x66, - 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x02, 0x69, 0x64, 0x32, 0xe6, 0x05, 0x0a, 0x0e, 0x41, + 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x52, 0x02, 0x69, 0x64, 0x32, 0xf6, 0x04, 0x0a, 0x0e, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x73, 0x12, 0x56, 0x0a, 0x13, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x17, 0x2e, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x2e, @@ -2281,14 +2281,7 @@ var file_github_com_rancher_opni_pkg_apis_alerting_v1_alerting_endpoint_proto_ra 0x61, 0x6c, 0x65, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x12, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0c, 0x3a, 0x01, 0x2a, 0x22, 0x07, 0x2f, 0x64, 0x65, 0x6c, - 0x65, 0x74, 0x65, 0x12, 0x6e, 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x41, 0x6c, 0x65, 0x72, 0x74, - 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x22, 0x2e, 0x61, 0x6c, 0x65, 0x72, 0x74, - 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x45, 0x6e, 0x64, - 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x61, - 0x6c, 0x65, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x65, 0x73, 0x74, 0x41, 0x6c, 0x65, 0x72, - 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x10, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0a, 0x3a, 0x01, 0x2a, 0x22, 0x05, 0x2f, 0x74, - 0x65, 0x73, 0x74, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x65, 0x74, 0x65, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x61, 0x6e, 0x63, 0x68, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x6e, 0x69, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x73, 0x2f, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, @@ -2395,16 +2388,14 @@ var file_github_com_rancher_opni_pkg_apis_alerting_v1_alerting_endpoint_proto_de 13, // 45: alerting.AlertEndpoints.ListAlertEndpoints:input_type -> alerting.ListAlertEndpointsRequest 14, // 46: alerting.AlertEndpoints.UpdateAlertEndpoint:input_type -> alerting.UpdateAlertEndpointRequest 15, // 47: alerting.AlertEndpoints.DeleteAlertEndpoint:input_type -> alerting.DeleteAlertEndpointRequest - 16, // 48: alerting.AlertEndpoints.TestAlertEndpoint:input_type -> alerting.TestAlertEndpointRequest - 33, // 49: alerting.AlertEndpoints.CreateAlertEndpoint:output_type -> core.Reference - 0, // 50: alerting.AlertEndpoints.GetAlertEndpoint:output_type -> alerting.AlertEndpoint - 35, // 51: alerting.AlertEndpoints.ToggleNotifications:output_type -> google.protobuf.Empty - 11, // 52: alerting.AlertEndpoints.ListAlertEndpoints:output_type -> alerting.AlertEndpointList - 36, // 53: alerting.AlertEndpoints.UpdateAlertEndpoint:output_type -> alerting.ConditionReferenceList - 36, // 54: alerting.AlertEndpoints.DeleteAlertEndpoint:output_type -> alerting.ConditionReferenceList - 17, // 55: alerting.AlertEndpoints.TestAlertEndpoint:output_type -> alerting.TestAlertEndpointResponse - 49, // [49:56] is the sub-list for method output_type - 42, // [42:49] is the sub-list for method input_type + 33, // 48: alerting.AlertEndpoints.CreateAlertEndpoint:output_type -> core.Reference + 0, // 49: alerting.AlertEndpoints.GetAlertEndpoint:output_type -> alerting.AlertEndpoint + 35, // 50: alerting.AlertEndpoints.ToggleNotifications:output_type -> google.protobuf.Empty + 11, // 51: alerting.AlertEndpoints.ListAlertEndpoints:output_type -> alerting.AlertEndpointList + 36, // 52: alerting.AlertEndpoints.UpdateAlertEndpoint:output_type -> alerting.ConditionReferenceList + 36, // 53: alerting.AlertEndpoints.DeleteAlertEndpoint:output_type -> alerting.ConditionReferenceList + 48, // [48:54] is the sub-list for method output_type + 42, // [42:48] is the sub-list for method input_type 42, // [42:42] is the sub-list for extension type_name 42, // [42:42] is the sub-list for extension extendee 0, // [0:42] is the sub-list for field type_name diff --git a/pkg/apis/alerting/v1/alerting.endpoint.proto b/pkg/apis/alerting/v1/alerting.endpoint.proto index 27ff401dbc..d992a5362b 100644 --- a/pkg/apis/alerting/v1/alerting.endpoint.proto +++ b/pkg/apis/alerting/v1/alerting.endpoint.proto @@ -74,14 +74,6 @@ service AlertEndpoints{ body : "*" }; } - - rpc TestAlertEndpoint(TestAlertEndpointRequest) - returns (TestAlertEndpointResponse) { - option (google.api.http) = { - post : "/test" - body : "*" - }; - } } diff --git a/pkg/apis/alerting/v1/alerting.endpoint_grpc.pb.go b/pkg/apis/alerting/v1/alerting.endpoint_grpc.pb.go index 045c6762f6..1724a2ce2b 100644 --- a/pkg/apis/alerting/v1/alerting.endpoint_grpc.pb.go +++ b/pkg/apis/alerting/v1/alerting.endpoint_grpc.pb.go @@ -27,7 +27,6 @@ const ( AlertEndpoints_ListAlertEndpoints_FullMethodName = "/alerting.AlertEndpoints/ListAlertEndpoints" AlertEndpoints_UpdateAlertEndpoint_FullMethodName = "/alerting.AlertEndpoints/UpdateAlertEndpoint" AlertEndpoints_DeleteAlertEndpoint_FullMethodName = "/alerting.AlertEndpoints/DeleteAlertEndpoint" - AlertEndpoints_TestAlertEndpoint_FullMethodName = "/alerting.AlertEndpoints/TestAlertEndpoint" ) // AlertEndpointsClient is the client API for AlertEndpoints service. @@ -51,7 +50,6 @@ type AlertEndpointsClient interface { // deletes and applies the consequences of those changes // to everything without warning DeleteAlertEndpoint(ctx context.Context, in *DeleteAlertEndpointRequest, opts ...grpc.CallOption) (*ConditionReferenceList, error) - TestAlertEndpoint(ctx context.Context, in *TestAlertEndpointRequest, opts ...grpc.CallOption) (*TestAlertEndpointResponse, error) } type alertEndpointsClient struct { @@ -116,15 +114,6 @@ func (c *alertEndpointsClient) DeleteAlertEndpoint(ctx context.Context, in *Dele return out, nil } -func (c *alertEndpointsClient) TestAlertEndpoint(ctx context.Context, in *TestAlertEndpointRequest, opts ...grpc.CallOption) (*TestAlertEndpointResponse, error) { - out := new(TestAlertEndpointResponse) - err := c.cc.Invoke(ctx, AlertEndpoints_TestAlertEndpoint_FullMethodName, in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - // AlertEndpointsServer is the server API for AlertEndpoints service. // All implementations must embed UnimplementedAlertEndpointsServer // for forward compatibility @@ -146,7 +135,6 @@ type AlertEndpointsServer interface { // deletes and applies the consequences of those changes // to everything without warning DeleteAlertEndpoint(context.Context, *DeleteAlertEndpointRequest) (*ConditionReferenceList, error) - TestAlertEndpoint(context.Context, *TestAlertEndpointRequest) (*TestAlertEndpointResponse, error) mustEmbedUnimplementedAlertEndpointsServer() } @@ -172,9 +160,6 @@ func (UnimplementedAlertEndpointsServer) UpdateAlertEndpoint(context.Context, *U func (UnimplementedAlertEndpointsServer) DeleteAlertEndpoint(context.Context, *DeleteAlertEndpointRequest) (*ConditionReferenceList, error) { return nil, status.Errorf(codes.Unimplemented, "method DeleteAlertEndpoint not implemented") } -func (UnimplementedAlertEndpointsServer) TestAlertEndpoint(context.Context, *TestAlertEndpointRequest) (*TestAlertEndpointResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method TestAlertEndpoint not implemented") -} func (UnimplementedAlertEndpointsServer) mustEmbedUnimplementedAlertEndpointsServer() {} // UnsafeAlertEndpointsServer may be embedded to opt out of forward compatibility for this service. @@ -296,24 +281,6 @@ func _AlertEndpoints_DeleteAlertEndpoint_Handler(srv interface{}, ctx context.Co return interceptor(ctx, in, info, handler) } -func _AlertEndpoints_TestAlertEndpoint_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(TestAlertEndpointRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(AlertEndpointsServer).TestAlertEndpoint(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: AlertEndpoints_TestAlertEndpoint_FullMethodName, - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(AlertEndpointsServer).TestAlertEndpoint(ctx, req.(*TestAlertEndpointRequest)) - } - return interceptor(ctx, in, info, handler) -} - // AlertEndpoints_ServiceDesc is the grpc.ServiceDesc for AlertEndpoints service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -345,10 +312,6 @@ var AlertEndpoints_ServiceDesc = grpc.ServiceDesc{ MethodName: "DeleteAlertEndpoint", Handler: _AlertEndpoints_DeleteAlertEndpoint_Handler, }, - { - MethodName: "TestAlertEndpoint", - Handler: _AlertEndpoints_TestAlertEndpoint_Handler, - }, }, Streams: []grpc.StreamDesc{}, Metadata: "github.com/rancher/opni/pkg/apis/alerting/v1/alerting.endpoint.proto", diff --git a/pkg/apis/alerting/v1/alerting.notification.pb.go b/pkg/apis/alerting/v1/alerting.notification.pb.go index f68e3990b0..6c3bc43c3f 100644 --- a/pkg/apis/alerting/v1/alerting.notification.pb.go +++ b/pkg/apis/alerting/v1/alerting.notification.pb.go @@ -816,8 +816,13 @@ var file_github_com_rancher_opni_pkg_apis_alerting_v1_alerting_notification_prot 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x43, 0x6f, 0x6e, 0x64, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x66, 0x65, 0x72, 0x65, 0x6e, 0x63, 0x65, 0x4c, 0x69, 0x73, 0x74, 0x52, 0x05, 0x76, 0x61, - 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xee, 0x04, 0x0a, 0x12, 0x41, 0x6c, 0x65, 0x72, - 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x65, + 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x32, 0xbe, 0x05, 0x0a, 0x12, 0x41, 0x6c, 0x65, 0x72, + 0x74, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4e, + 0x0a, 0x11, 0x54, 0x65, 0x73, 0x74, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x45, 0x6e, 0x64, 0x70, 0x6f, + 0x69, 0x6e, 0x74, 0x12, 0x0f, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x52, 0x65, 0x66, 0x65, 0x72, + 0x65, 0x6e, 0x63, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x10, 0x82, 0xd3, + 0xe4, 0x93, 0x02, 0x0a, 0x3a, 0x01, 0x2a, 0x22, 0x05, 0x2f, 0x74, 0x65, 0x73, 0x74, 0x12, 0x65, 0x0a, 0x0d, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x73, 0x12, 0x1e, 0x2e, 0x61, 0x6c, 0x65, 0x72, 0x74, 0x69, 0x6e, 0x67, 0x2e, 0x54, 0x72, 0x69, 0x67, 0x67, 0x65, 0x72, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, @@ -922,20 +927,22 @@ var file_github_com_rancher_opni_pkg_apis_alerting_v1_alerting_notification_prot 16, // 17: alerting.ResolveAlertsRequest.labels:type_name -> alerting.ResolveAlertsRequest.LabelsEntry 17, // 18: alerting.ListRoutingRelationshipsResponse.routingRelationships:type_name -> alerting.ListRoutingRelationshipsResponse.RoutingRelationshipsEntry 23, // 19: alerting.ListRoutingRelationshipsResponse.RoutingRelationshipsEntry.value:type_name -> alerting.ConditionReferenceList - 5, // 20: alerting.AlertNotifications.TriggerAlerts:input_type -> alerting.TriggerAlertsRequest - 7, // 21: alerting.AlertNotifications.ResolveAlerts:input_type -> alerting.ResolveAlertsRequest - 3, // 22: alerting.AlertNotifications.PushNotification:input_type -> alerting.Notification - 1, // 23: alerting.AlertNotifications.ListNotifications:input_type -> alerting.ListNotificationRequest - 0, // 24: alerting.AlertNotifications.ListAlarmMessages:input_type -> alerting.ListAlarmMessageRequest - 24, // 25: alerting.AlertNotifications.ListRoutingRelationships:input_type -> google.protobuf.Empty - 6, // 26: alerting.AlertNotifications.TriggerAlerts:output_type -> alerting.TriggerAlertsResponse - 8, // 27: alerting.AlertNotifications.ResolveAlerts:output_type -> alerting.ResolveAlertsResponse - 24, // 28: alerting.AlertNotifications.PushNotification:output_type -> google.protobuf.Empty - 2, // 29: alerting.AlertNotifications.ListNotifications:output_type -> alerting.ListMessageResponse - 2, // 30: alerting.AlertNotifications.ListAlarmMessages:output_type -> alerting.ListMessageResponse - 9, // 31: alerting.AlertNotifications.ListRoutingRelationships:output_type -> alerting.ListRoutingRelationshipsResponse - 26, // [26:32] is the sub-list for method output_type - 20, // [20:26] is the sub-list for method input_type + 22, // 20: alerting.AlertNotifications.TestAlertEndpoint:input_type -> core.Reference + 5, // 21: alerting.AlertNotifications.TriggerAlerts:input_type -> alerting.TriggerAlertsRequest + 7, // 22: alerting.AlertNotifications.ResolveAlerts:input_type -> alerting.ResolveAlertsRequest + 3, // 23: alerting.AlertNotifications.PushNotification:input_type -> alerting.Notification + 1, // 24: alerting.AlertNotifications.ListNotifications:input_type -> alerting.ListNotificationRequest + 0, // 25: alerting.AlertNotifications.ListAlarmMessages:input_type -> alerting.ListAlarmMessageRequest + 24, // 26: alerting.AlertNotifications.ListRoutingRelationships:input_type -> google.protobuf.Empty + 24, // 27: alerting.AlertNotifications.TestAlertEndpoint:output_type -> google.protobuf.Empty + 6, // 28: alerting.AlertNotifications.TriggerAlerts:output_type -> alerting.TriggerAlertsResponse + 8, // 29: alerting.AlertNotifications.ResolveAlerts:output_type -> alerting.ResolveAlertsResponse + 24, // 30: alerting.AlertNotifications.PushNotification:output_type -> google.protobuf.Empty + 2, // 31: alerting.AlertNotifications.ListNotifications:output_type -> alerting.ListMessageResponse + 2, // 32: alerting.AlertNotifications.ListAlarmMessages:output_type -> alerting.ListMessageResponse + 9, // 33: alerting.AlertNotifications.ListRoutingRelationships:output_type -> alerting.ListRoutingRelationshipsResponse + 27, // [27:34] is the sub-list for method output_type + 20, // [20:27] is the sub-list for method input_type 20, // [20:20] is the sub-list for extension type_name 20, // [20:20] is the sub-list for extension extendee 0, // [0:20] is the sub-list for field type_name diff --git a/pkg/apis/alerting/v1/alerting.notification.proto b/pkg/apis/alerting/v1/alerting.notification.proto index bce9c7d7d7..dc5cf6b6f1 100644 --- a/pkg/apis/alerting/v1/alerting.notification.proto +++ b/pkg/apis/alerting/v1/alerting.notification.proto @@ -18,6 +18,13 @@ package alerting; // Opni-Alerting internal use service AlertNotifications { + rpc TestAlertEndpoint(core.Reference) returns (google.protobuf.Empty) { + option (google.api.http) = { + post : "/test" + body : "*" + }; + } + rpc TriggerAlerts(TriggerAlertsRequest) returns (TriggerAlertsResponse) { option (google.api.http) = { post : "/trigger" diff --git a/pkg/apis/alerting/v1/alerting.notification_grpc.pb.go b/pkg/apis/alerting/v1/alerting.notification_grpc.pb.go index c2d915eb31..dee39f891e 100644 --- a/pkg/apis/alerting/v1/alerting.notification_grpc.pb.go +++ b/pkg/apis/alerting/v1/alerting.notification_grpc.pb.go @@ -8,6 +8,7 @@ package v1 import ( context "context" + v1 "github.com/rancher/opni/pkg/apis/core/v1" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" @@ -20,6 +21,7 @@ import ( const _ = grpc.SupportPackageIsVersion7 const ( + AlertNotifications_TestAlertEndpoint_FullMethodName = "/alerting.AlertNotifications/TestAlertEndpoint" AlertNotifications_TriggerAlerts_FullMethodName = "/alerting.AlertNotifications/TriggerAlerts" AlertNotifications_ResolveAlerts_FullMethodName = "/alerting.AlertNotifications/ResolveAlerts" AlertNotifications_PushNotification_FullMethodName = "/alerting.AlertNotifications/PushNotification" @@ -32,6 +34,7 @@ const ( // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type AlertNotificationsClient interface { + TestAlertEndpoint(ctx context.Context, in *v1.Reference, opts ...grpc.CallOption) (*emptypb.Empty, error) TriggerAlerts(ctx context.Context, in *TriggerAlertsRequest, opts ...grpc.CallOption) (*TriggerAlertsResponse, error) ResolveAlerts(ctx context.Context, in *ResolveAlertsRequest, opts ...grpc.CallOption) (*ResolveAlertsResponse, error) PushNotification(ctx context.Context, in *Notification, opts ...grpc.CallOption) (*emptypb.Empty, error) @@ -54,6 +57,15 @@ func NewAlertNotificationsClient(cc grpc.ClientConnInterface) AlertNotifications return &alertNotificationsClient{cc} } +func (c *alertNotificationsClient) TestAlertEndpoint(ctx context.Context, in *v1.Reference, opts ...grpc.CallOption) (*emptypb.Empty, error) { + out := new(emptypb.Empty) + err := c.cc.Invoke(ctx, AlertNotifications_TestAlertEndpoint_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *alertNotificationsClient) TriggerAlerts(ctx context.Context, in *TriggerAlertsRequest, opts ...grpc.CallOption) (*TriggerAlertsResponse, error) { out := new(TriggerAlertsResponse) err := c.cc.Invoke(ctx, AlertNotifications_TriggerAlerts_FullMethodName, in, out, opts...) @@ -112,6 +124,7 @@ func (c *alertNotificationsClient) ListRoutingRelationships(ctx context.Context, // All implementations must embed UnimplementedAlertNotificationsServer // for forward compatibility type AlertNotificationsServer interface { + TestAlertEndpoint(context.Context, *v1.Reference) (*emptypb.Empty, error) TriggerAlerts(context.Context, *TriggerAlertsRequest) (*TriggerAlertsResponse, error) ResolveAlerts(context.Context, *ResolveAlertsRequest) (*ResolveAlertsResponse, error) PushNotification(context.Context, *Notification) (*emptypb.Empty, error) @@ -131,6 +144,9 @@ type AlertNotificationsServer interface { type UnimplementedAlertNotificationsServer struct { } +func (UnimplementedAlertNotificationsServer) TestAlertEndpoint(context.Context, *v1.Reference) (*emptypb.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method TestAlertEndpoint not implemented") +} func (UnimplementedAlertNotificationsServer) TriggerAlerts(context.Context, *TriggerAlertsRequest) (*TriggerAlertsResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method TriggerAlerts not implemented") } @@ -162,6 +178,24 @@ func RegisterAlertNotificationsServer(s grpc.ServiceRegistrar, srv AlertNotifica s.RegisterService(&AlertNotifications_ServiceDesc, srv) } +func _AlertNotifications_TestAlertEndpoint_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(v1.Reference) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AlertNotificationsServer).TestAlertEndpoint(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: AlertNotifications_TestAlertEndpoint_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AlertNotificationsServer).TestAlertEndpoint(ctx, req.(*v1.Reference)) + } + return interceptor(ctx, in, info, handler) +} + func _AlertNotifications_TriggerAlerts_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(TriggerAlertsRequest) if err := dec(in); err != nil { @@ -277,6 +311,10 @@ var AlertNotifications_ServiceDesc = grpc.ServiceDesc{ ServiceName: "alerting.AlertNotifications", HandlerType: (*AlertNotificationsServer)(nil), Methods: []grpc.MethodDesc{ + { + MethodName: "TestAlertEndpoint", + Handler: _AlertNotifications_TestAlertEndpoint_Handler, + }, { MethodName: "TriggerAlerts", Handler: _AlertNotifications_TriggerAlerts_Handler, diff --git a/pkg/apis/alerting/v1/validate.go b/pkg/apis/alerting/v1/validate.go index 8db809312d..ea97e186d6 100644 --- a/pkg/apis/alerting/v1/validate.go +++ b/pkg/apis/alerting/v1/validate.go @@ -599,8 +599,11 @@ func (l *ListAlarmMessageRequest) Sanitize() { } func (l *ListAlarmMessageRequest) Validate() error { + if l.ConditionId == nil { + return validation.Errorf("%w :%s", validation.ErrMissingRequiredField, "conditionId") + } if l.ConditionId.Id == "" { - return validation.Error("field conditionId must be set") + return validation.Errorf("%w : %s", validation.ErrMissingRequiredField, "conditionId.id") } if l.Start.AsTime().After(l.End.AsTime()) { return validation.Error("start time must be before end time") diff --git a/pkg/test/alerting/alert_router_gen.go b/pkg/test/alerting/alert_router_gen.go index 5512e69a28..34f0647504 100644 --- a/pkg/test/alerting/alert_router_gen.go +++ b/pkg/test/alerting/alert_router_gen.go @@ -122,7 +122,7 @@ func NewWebhookMemoryServer(e *test.Environment, webHookRoute string) *MockInteg res.WriteBuffer(&msg) }) webhookServer := &http.Server{ - Addr: fmt.Sprintf("127.0.0.1:%d", port), + Addr: fmt.Sprintf("localhost:%d", port), Handler: mux, ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, diff --git a/plugins/alerting/pkg/alerting/admin.go b/plugins/alerting/pkg/alerting/admin.go index dcaac9ed5a..8f5dae5c06 100644 --- a/plugins/alerting/pkg/alerting/admin.go +++ b/plugins/alerting/pkg/alerting/admin.go @@ -17,10 +17,12 @@ import ( "github.com/samber/lo" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.uber.org/zap" "gopkg.in/yaml.v2" alertingSync "github.com/rancher/opni/pkg/alerting/server/sync" "github.com/rancher/opni/plugins/alerting/apis/alertops" + "github.com/rancher/opni/plugins/alerting/pkg/alerting/drivers" "github.com/rancher/opni/plugins/alerting/pkg/alerting/metrics" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -41,6 +43,8 @@ type RemoteInfo struct { } type SyncController struct { + lg *zap.SugaredLogger + hashMu sync.Mutex syncMu sync.RWMutex remoteMu sync.RWMutex @@ -92,7 +96,8 @@ func (s *SyncController) PushSyncReq(payload *syncPayload) { for id, syncer := range s.syncPushers { id := id syncer := syncer - syncer <- &alertops.SyncRequest{ + select { + case syncer <- &alertops.SyncRequest{ LifecycleId: id, SyncId: payload.syncId, Items: []*alertingv1.PutConfigRequest{ @@ -101,13 +106,17 @@ func (s *SyncController) PushSyncReq(payload *syncPayload) { Config: payload.data, }, }, + }: + default: + s.lg.With("syncer-id", id).Error("failed to push sync request : buffer already full") } } } func (s *SyncController) PushOne(lifecycleId string, payload *syncPayload) { if _, ok := s.syncPushers[lifecycleId]; ok { - s.syncPushers[lifecycleId] <- &alertops.SyncRequest{ + select { + case s.syncPushers[lifecycleId] <- &alertops.SyncRequest{ LifecycleId: lifecycleId, SyncId: payload.syncId, Items: []*alertingv1.PutConfigRequest{ @@ -116,12 +125,16 @@ func (s *SyncController) PushOne(lifecycleId string, payload *syncPayload) { Config: payload.data, }, }, + }: + default: + s.lg.With("syncer-id", lifecycleId).Error("failed to push sync request : buffer already full") } } } -func NewSyncController() SyncController { +func NewSyncController(lg *zap.SugaredLogger) SyncController { return SyncController{ + lg: lg, syncPushers: map[string]chan *alertops.SyncRequest{}, remoteInfo: map[string]RemoteInfo{}, syncMu: sync.RWMutex{}, @@ -218,7 +231,9 @@ func (p *Plugin) Info(ctx context.Context, _ *emptypb.Empty) (*alertops.Componen if err != nil { return nil, status.Error(codes.Unavailable, err.Error()) } + p.syncController.hashMu.Lock() hash, err := cl.GetHash(ctx, shared.SingleConfigId) + p.syncController.hashMu.Unlock() if err != nil { return nil, status.Error(codes.NotFound, fmt.Sprintf("could not find or calculate hash for %s", shared.SingleConfigId)) } @@ -248,7 +263,11 @@ func (p *Plugin) constructManualSync() (*syncPayload, error) { if err != nil { return nil, status.Error(codes.Unavailable, fmt.Sprintf("failed to get storage client set: %s", err)) } - return p.constructPartialSyncRequest(p.ctx, storageClientSet, storageClientSet.Routers()) + driver, err := p.clusterDriver.GetContext(ctxTimeout) + if err != nil { + return nil, status.Error(codes.Unavailable, fmt.Sprintf("failed to get cluster driver: %s", err)) + } + return p.constructPartialSyncRequest(p.ctx, driver, storageClientSet, storageClientSet.Routers()) } func (p *Plugin) SyncConfig(server alertops.ConfigReconciler_SyncConfigServer) error { @@ -281,7 +300,7 @@ func (p *Plugin) SyncConfig(server alertops.ConfigReconciler_SyncConfigServer) e for { info, err := server.Recv() if err == io.EOF { - lg.Debug("remote syncer closed connection") + lg.Warn("remote syncer closed connection") p.syncController.RemoveSyncPusher(assignedLifecycleUuid) return nil } @@ -300,10 +319,10 @@ func (p *Plugin) SyncConfig(server alertops.ConfigReconciler_SyncConfigServer) e for { select { case <-p.ctx.Done(): - lg.Debug("exiting syncer loop, alerting plugin shutting down") + lg.Info("exiting syncer loop, alerting plugin shutting down") return nil case <-server.Context().Done(): - lg.Debug("exiting syncer loop, remote syncer shutting down") + lg.Info("exiting syncer loop, remote syncer shutting down") return nil case err := <-connErr: return err @@ -326,6 +345,7 @@ type syncPayload struct { func (p *Plugin) constructPartialSyncRequest( ctx context.Context, + driver drivers.ClusterDriver, hashRing spec.HashRing, routers spec.RouterStorage, ) (*syncPayload, error) { @@ -344,6 +364,9 @@ func (p *Plugin) constructPartialSyncRequest( lg.Errorf("failed to get router %s from storage: %s", key, err) return nil, err } + if recv := driver.GetDefaultReceiver(); recv != nil { + router.SetDefaultReceiver(*recv) + } config, err := router.BuildConfig() if err != nil { lg.Errorf("failed to build config for router %s: %s", key, err) @@ -378,6 +401,11 @@ func (p *Plugin) doConfigSync(ctx context.Context, syncInfo alertingSync.SyncInf lg.Warn("failed to acquire alerting storage clientset, skipping sync...") return err } + driver, err := p.clusterDriver.GetContext(ctxTimeout) + if err != nil { + lg.Warn("failed to acquire cluster driver, skipping sync...") + return err + } routerKeys, err := clientSet.Sync(ctx) if err != nil { @@ -386,11 +414,10 @@ func (p *Plugin) doConfigSync(ctx context.Context, syncInfo alertingSync.SyncInf } if len(routerKeys) > 0 { // global configuration has changed and never been applied - payload, err := p.constructPartialSyncRequest(ctx, clientSet, clientSet.Routers()) + payload, err := p.constructPartialSyncRequest(ctx, driver, clientSet, clientSet.Routers()) if err != nil { return err } - lg.With("syncId", payload.syncId).Debug("sync change detected, pushing sync request to remote syncers") p.syncController.PushSyncReq(payload) return nil } @@ -404,12 +431,11 @@ func (p *Plugin) doConfigSync(ctx context.Context, syncInfo alertingSync.SyncInf } } if len(queuedSyncs) > 0 { - payload, err := p.constructPartialSyncRequest(ctx, clientSet, clientSet.Routers()) + payload, err := p.constructPartialSyncRequest(ctx, driver, clientSet, clientSet.Routers()) if err != nil { return err } for _, id := range queuedSyncs { - lg.With("syncId", payload.syncId, "lifecycleId", id).Debug("sync id mismatch, pushing sync request to remote syncers") p.syncController.PushOne(id, payload) } } @@ -430,11 +456,16 @@ func (p *Plugin) doConfigForceSync(ctx context.Context, syncInfo alertingSync.Sy lg.Warn("failed to acquire alerting storage clientset, skipping force sync...") return err } + driver, err := p.clusterDriver.GetContext(ctxTimeout) + if err != nil { + lg.Warn("failed to acquire cluster driver, skipping force sync...") + return err + } if err := clientSet.ForceSync(ctx); err != nil { lg.Errorf("failed to force sync configuration in alerting clientset %s", err) return err } - payload, err := p.constructPartialSyncRequest(p.ctx, clientSet, clientSet.Routers()) + payload, err := p.constructPartialSyncRequest(p.ctx, driver, clientSet, clientSet.Routers()) if err != nil { return err } @@ -498,7 +529,6 @@ func (p *Plugin) runSyncTasks(tasks []alertingSync.SyncTask) (retErr error) { return err } - lg.Info("Running periodic sync for alerting") var eg util.MultiErrGroup for _, task := range tasks { task := task @@ -507,9 +537,8 @@ func (p *Plugin) runSyncTasks(tasks []alertingSync.SyncTask) (retErr error) { }) } eg.Wait() - lg.Infof("finished running periodic sync for alerting, sucessfully ran %d/%d sync tasks", len(tasks)-len(eg.Errors()), len(tasks)) if err := eg.Error(); err != nil { - lg.Error(err) + lg.Errorf(" ran %d/%d tasks successfully %w", len(tasks)-len(eg.Errors()), len(tasks), err) retErr = err } return @@ -543,25 +572,6 @@ func (p *Plugin) runSync() { } } -func (p *Plugin) SendManualSyncRequest( - ctx context.Context, - hashRing spec.HashRing, - routers spec.RouterStorage, -) error { - p.syncController.syncMu.Lock() - defer p.syncController.syncMu.Unlock() - - lg := p.logger.With("method", "sendManualSyncRequest") - payload, err := p.constructPartialSyncRequest(ctx, hashRing, routers) - if err != nil { - lg.Errorf("failed to construct sync request: %s", err) - return err - } - p.syncController.PushSyncReq(payload) - lg.With("sync-id", payload.syncId).Debug("sent manual sync request") - return nil -} - func (p *Plugin) ready() error { for _, comp := range p.Components() { if !comp.Ready() { diff --git a/plugins/alerting/pkg/alerting/alarms/v1/server.go b/plugins/alerting/pkg/alerting/alarms/v1/server.go index a109f05f16..408c20d22f 100644 --- a/plugins/alerting/pkg/alerting/alarms/v1/server.go +++ b/plugins/alerting/pkg/alerting/alarms/v1/server.go @@ -277,7 +277,11 @@ func (a *AlarmServerComponent) ListAlertConditionsWithStatus(ctx context.Context if err != nil { return nil, err } - allConds = append(allConds, conds...) + if req.ItemFilter != nil { + allConds = append(allConds, lo.Filter(conds, req.ItemFilter.FilterFunc())...) + } else { + allConds = append(allConds, conds...) + } } res := &alertingv1.ListStatusResponse{ AlertConditions: make(map[string]*alertingv1.AlertConditionWithStatus), @@ -469,12 +473,16 @@ func (a *AlarmServerComponent) Timeline(ctx context.Context, req *alertingv1.Tim if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - yieldedValues := make(chan lo.Tuple2[string, *alertingv1.ActiveWindows]) + yieldedValues := make(chan lo.Tuple2[*alertingv1.ConditionReference, *alertingv1.ActiveWindows]) go func() { n := int(req.Limit) pool := pond.New(max(25, n/10), n, pond.Strategy(pond.Balanced())) for _, cond := range conditions { cond := cond + ref := &alertingv1.ConditionReference{ + Id: cond.Id, + GroupId: cond.GroupId, + } pool.Submit(func() { if alertingv1.IsInternalCondition(cond) { activeWindows, err := a.incidentStorage.Get().GetActiveWindowsFromIncidentTracker(ctx, cond.Id, start, end) @@ -482,9 +490,14 @@ func (a *AlarmServerComponent) Timeline(ctx context.Context, req *alertingv1.Tim a.logger.Errorf("failed to get active windows from agent incident tracker : %s", err) return } - yieldedValues <- lo.Tuple2[string, *alertingv1.ActiveWindows]{A: cond.Id, B: &alertingv1.ActiveWindows{ - Windows: activeWindows, - }} + for _, w := range activeWindows { + w.Ref = ref + } + yieldedValues <- lo.T2( + ref, + &alertingv1.ActiveWindows{ + Windows: activeWindows, + }) } if alertingv1.IsMetricsCondition(cond) { @@ -509,10 +522,17 @@ func (a *AlarmServerComponent) Timeline(ctx context.Context, req *alertingv1.Tim lg.Errorf("expected to get matrix from prometheus response : %s", err) return } + windows := cortex.ReducePrometheusMatrix(matrix) + for _, w := range windows { + w.Ref = ref + } lg.With("reduce-matrix", cond.Id).Infof("looking to reduce %d potential causes", len(*matrix)) - yieldedValues <- lo.Tuple2[string, *alertingv1.ActiveWindows]{A: cond.Id, B: &alertingv1.ActiveWindows{ - Windows: cortex.ReducePrometheusMatrix(matrix), - }} + yieldedValues <- lo.T2( + ref, + &alertingv1.ActiveWindows{ + Windows: windows, + }, + ) } }) } @@ -528,7 +548,8 @@ func (a *AlarmServerComponent) Timeline(ctx context.Context, req *alertingv1.Tim if !ok { return resp, nil } - resp.Items[v.A] = v.B + id := v.A.Id + resp.Items[id] = v.B } } } diff --git a/plugins/alerting/pkg/alerting/drivers/alerting_manager/cluster_driver.go b/plugins/alerting/pkg/alerting/drivers/alerting_manager/cluster_driver.go index 53f318a7ed..c34d475ed2 100644 --- a/plugins/alerting/pkg/alerting/drivers/alerting_manager/cluster_driver.go +++ b/plugins/alerting/pkg/alerting/drivers/alerting_manager/cluster_driver.go @@ -10,6 +10,7 @@ import ( "github.com/rancher/opni/apis" corev1beta1 "github.com/rancher/opni/apis/core/v1beta1" alertingClient "github.com/rancher/opni/pkg/alerting/client" + "github.com/rancher/opni/pkg/alerting/drivers/config" "github.com/rancher/opni/pkg/alerting/shared" corev1 "github.com/rancher/opni/pkg/apis/core/v1" "github.com/rancher/opni/pkg/logger" @@ -341,12 +342,16 @@ func (a *AlertingClusterManager) ShouldDisableNode(_ *corev1.Reference) error { return nil } +func (a *AlertingClusterManager) GetDefaultReceiver() *config.WebhookConfig { + return nil +} + func listPeers(replicas int) []alertingClient.AlertingPeer { peers := []alertingClient.AlertingPeer{} for i := 0; i < replicas; i++ { peers = append(peers, alertingClient.AlertingPeer{ ApiAddress: fmt.Sprintf("http://%s-%d.%s:9093", shared.AlertmanagerService, i, shared.AlertmanagerService), - EmbeddedAddress: fmt.Sprintf("http://%s-%d.%s:6006", shared.EmitterService, i, shared.EmitterService), + EmbeddedAddress: fmt.Sprintf("http://%s-%d.%s:3000", shared.AlertmanagerService, i, shared.AlertmanagerService), }) } return peers diff --git a/plugins/alerting/pkg/alerting/drivers/drivers.go b/plugins/alerting/pkg/alerting/drivers/drivers.go index 4d59ae2521..1209d36e1b 100644 --- a/plugins/alerting/pkg/alerting/drivers/drivers.go +++ b/plugins/alerting/pkg/alerting/drivers/drivers.go @@ -3,6 +3,7 @@ package drivers import ( "context" + "github.com/rancher/opni/pkg/alerting/drivers/config" "github.com/rancher/opni/pkg/alerting/shared" corev1 "github.com/rancher/opni/pkg/apis/core/v1" "github.com/rancher/opni/pkg/plugins/driverutil" @@ -15,6 +16,8 @@ type ClusterDriver interface { // have this capability enabled. If this function returns an error, the // node will be set to disabled instead, and the error will be logged. ShouldDisableNode(*corev1.Reference) error + + GetDefaultReceiver() *config.WebhookConfig } var Drivers = driverutil.NewDriverCache[ClusterDriver]() @@ -32,6 +35,10 @@ func (d *NoopClusterDriver) GetRuntimeOptions() shared.AlertingClusterOptions { return shared.AlertingClusterOptions{} } +func (d *NoopClusterDriver) GetDefaultReceiver() *config.WebhookConfig { + return &config.WebhookConfig{} +} + func init() { Drivers.Register("noop", func(ctx context.Context, opts ...driverutil.Option) (ClusterDriver, error) { return &NoopClusterDriver{}, nil diff --git a/plugins/alerting/pkg/alerting/endpoints/v1/component.go b/plugins/alerting/pkg/alerting/endpoints/v1/component.go index 2c8c2ffae4..cb6b6d0e95 100644 --- a/plugins/alerting/pkg/alerting/endpoints/v1/component.go +++ b/plugins/alerting/pkg/alerting/endpoints/v1/component.go @@ -25,10 +25,7 @@ type EndpointServerComponent struct { mu sync.Mutex server.Config - endpMu sync.Mutex - notifications *notifications.NotificationServerComponent - manualSync manualSync logger *zap.SugaredLogger @@ -61,8 +58,6 @@ type EndpointServerConfiguration struct { spec.ConditionStorage spec.RouterStorage spec.HashRing - - ManualSync manualSync } func (e *EndpointServerComponent) Name() string { @@ -99,6 +94,5 @@ func (e *EndpointServerComponent) Initialize(conf EndpointServerConfiguration) { e.conditionStorage.Set(conf.ConditionStorage) e.routerStorage.Set(conf.RouterStorage) e.hashRing.Set(conf.HashRing) - e.manualSync = conf.ManualSync }) } diff --git a/plugins/alerting/pkg/alerting/endpoints/v1/server.go b/plugins/alerting/pkg/alerting/endpoints/v1/server.go index 726f8d23b3..8941cb0c06 100644 --- a/plugins/alerting/pkg/alerting/endpoints/v1/server.go +++ b/plugins/alerting/pkg/alerting/endpoints/v1/server.go @@ -7,13 +7,11 @@ import ( amCfg "github.com/prometheus/alertmanager/config" "github.com/rancher/opni/pkg/alerting/drivers/config" - "github.com/rancher/opni/pkg/alerting/message" "github.com/rancher/opni/pkg/alerting/shared" "github.com/rancher/opni/pkg/alerting/storage/opts" "github.com/rancher/opni/pkg/alerting/storage/spec" alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1" corev1 "github.com/rancher/opni/pkg/apis/core/v1" - "github.com/rancher/opni/pkg/validation" "github.com/samber/lo" lop "github.com/samber/lo/parallel" "gopkg.in/yaml.v2" @@ -22,11 +20,14 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" ) +var ( + RetryTestEdnpoint = 1 * time.Second +) + var _ alertingv1.AlertEndpointsServer = (*EndpointServerComponent)(nil) func validateFullWebhook(w *alertingv1.WebhookEndpoint) error { @@ -220,80 +221,6 @@ func (e *EndpointServerComponent) ListAlertEndpoints( return &alertingv1.AlertEndpointList{Items: items}, nil } -func (e *EndpointServerComponent) TestAlertEndpoint(ctx context.Context, req *alertingv1.TestAlertEndpointRequest) (*alertingv1.TestAlertEndpointResponse, error) { - if !e.Initialized() { - return nil, status.Error(codes.Unavailable, "Endpoint server is not yet available") - } - if req.Endpoint == nil { - return nil, validation.Error("Endpoint must be set") - } - // if it has an Id it needs to be unredacted - if req.Endpoint.Id != "" { - unredactSecrets(ctx, e.endpointStorage.Get(), req.Endpoint.Id, req.Endpoint) - } - if err := req.Validate(); err != nil { - return nil, err - } - details := &alertingv1.EndpointImplementation{ - Title: "Test Alert Endpoint", - Body: "Opni Alerting is sending you a test alert to verify your alert endpoint configuration.", - } - - ephemeralId := shared.NewAlertingRefId() - createImpl := &alertingv1.FullAttachedEndpoints{ - InitialDelay: durationpb.New(time.Duration(time.Second * 0)), - Items: []*alertingv1.FullAttachedEndpoint{ - { - EndpointId: ephemeralId, - AlertEndpoint: req.GetEndpoint(), - Details: details, - }, - }, - ThrottlingDuration: durationpb.New(time.Duration(time.Second * 1)), - Details: details, - } - - // - create ephemeral dispatcher - router, err := e.routerStorage.Get().Get(ctx, shared.SingleConfigId) - if err != nil { - return nil, err - } - - ns := "test" - if err := router.SetNamespaceSpec("test", ephemeralId, createImpl); err != nil { - return nil, err - } - err = e.manualSync(ctx, e.hashRing.Get(), e.routerStorage.Get()) - if err != nil { - e.logger.Errorf("Failed to sync router %s", err) - return nil, err - } - go func() { // create, trigger, delete - _, err = e.notifications.TriggerAlerts(ctx, &alertingv1.TriggerAlertsRequest{ - ConditionId: &corev1.Reference{Id: ephemeralId}, - Namespace: ns, - Annotations: map[string]string{ - message.NotificationContentHeader: "Test notification", - message.NotificationContentSummary: "Admin has sent a test notification", - }, - Labels: map[string]string{ - ns: ephemeralId, - }, - }) - if err != nil { - e.logger.Errorf("Failed to trigger alert %s", err) - } - // - delete ephemeral dispatcher - if err := router.SetNamespaceSpec("test", ephemeralId, &alertingv1.FullAttachedEndpoints{ - Items: []*alertingv1.FullAttachedEndpoint{}, - }); err != nil { - return - } - }() - - return &alertingv1.TestAlertEndpointResponse{}, nil -} - func unredactSecrets( ctx context.Context, store spec.EndpointStorage, diff --git a/plugins/alerting/pkg/alerting/notifications/v1/component.go b/plugins/alerting/pkg/alerting/notifications/v1/component.go index a97cf94558..f9b3b668b4 100644 --- a/plugins/alerting/pkg/alerting/notifications/v1/component.go +++ b/plugins/alerting/pkg/alerting/notifications/v1/component.go @@ -24,6 +24,7 @@ type NotificationServerComponent struct { logger *zap.SugaredLogger conditionStorage future.Future[spec.ConditionStorage] + endpointStorage future.Future[spec.EndpointStorage] } var _ server.ServerComponent = (*NotificationServerComponent)(nil) @@ -34,11 +35,13 @@ func NewNotificationServerComponent( return &NotificationServerComponent{ logger: logger, conditionStorage: future.New[spec.ConditionStorage](), + endpointStorage: future.New[spec.EndpointStorage](), } } type NotificationServerConfiguration struct { spec.ConditionStorage + spec.EndpointStorage } func (n *NotificationServerComponent) Name() string { @@ -72,5 +75,6 @@ func (n *NotificationServerComponent) Sync(_ context.Context, _ alertingSync.Syn func (n *NotificationServerComponent) Initialize(conf NotificationServerConfiguration) { n.InitOnce(func() { n.conditionStorage.Set(conf.ConditionStorage) + n.endpointStorage.Set(conf.EndpointStorage) }) } diff --git a/plugins/alerting/pkg/alerting/notifications/v1/server.go b/plugins/alerting/pkg/alerting/notifications/v1/server.go index 9f3ef238e4..ab087a1e77 100644 --- a/plugins/alerting/pkg/alerting/notifications/v1/server.go +++ b/plugins/alerting/pkg/alerting/notifications/v1/server.go @@ -2,6 +2,7 @@ package notifications import ( "context" + "fmt" "math" "time" @@ -13,6 +14,7 @@ import ( "github.com/rancher/opni/pkg/alerting/message" "github.com/rancher/opni/pkg/alerting/shared" alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1" + corev1 "github.com/rancher/opni/pkg/apis/core/v1" "github.com/rancher/opni/pkg/util" "github.com/samber/lo" "google.golang.org/grpc/codes" @@ -40,6 +42,44 @@ func init() { var _ (alertingv1.AlertNotificationsServer) = (*NotificationServerComponent)(nil) +func (n *NotificationServerComponent) TestAlertEndpoint(ctx context.Context, ref *corev1.Reference) (*emptypb.Empty, error) { + if !n.Initialized() { + return nil, status.Error(codes.Unavailable, "Notification server is not yet available") + } + if err := ref.Validate(); err != nil { + return nil, err + } + ctxca, ca := context.WithTimeout(ctx, 1*time.Second) + defer ca() + + endp, err := n.endpointStorage.GetContext(ctxca) + if err != nil { + return nil, status.Error(codes.Unavailable, fmt.Sprintf("failed to get endpoit storage : %s", err.Error())) + } + if _, err := endp.Get(ctx, ref.Id); err != nil { + return nil, fmt.Errorf("testing an endpoint requires it is loaded : %w", err) + } + + n.mu.Lock() + defer n.mu.Unlock() + err = n.Client.AlertClient().PostAlarm( + ctx, + client.AlertObject{ + Id: ref.Id, + Labels: map[string]string{ + message.NotificationPropertyOpniUuid: ref.Id, + message.TestNamespace: ref.Id, + }, + Annotations: map[string]string{ + message.NotificationContentHeader: "Test notification", + message.NotificationContentSummary: "Admin has sent a test notification", + }, + }, + ) + + return &emptypb.Empty{}, err +} + func (n *NotificationServerComponent) TriggerAlerts(ctx context.Context, req *alertingv1.TriggerAlertsRequest) (*alertingv1.TriggerAlertsResponse, error) { if !n.Initialized() { return nil, status.Error(codes.Unavailable, "Notification server is not yet available") @@ -47,9 +87,6 @@ func (n *NotificationServerComponent) TriggerAlerts(ctx context.Context, req *al if err := req.Validate(); err != nil { return nil, err } - lg := n.logger.With("Handler", "TriggerAlerts") - lg.Debugf("Received request to trigger alerts on condition %s", req.GetConditionId()) - lg.Debugf("Received alert annotations : %s", req.Annotations) // This logic is intended to // 1) Provide a safeguard to ensure that external callers of the API will not cause nil pointer map dereferences in the AlertManager adapter logic diff --git a/plugins/alerting/pkg/alerting/plugin.go b/plugins/alerting/pkg/alerting/plugin.go index dd364a362b..ca17f872df 100644 --- a/plugins/alerting/pkg/alerting/plugin.go +++ b/plugins/alerting/pkg/alerting/plugin.go @@ -132,7 +132,7 @@ func NewPlugin(ctx context.Context) *Plugin { ), } - p.syncController = NewSyncController() + p.syncController = NewSyncController(p.logger.With("component", "sync-controller")) p.httpProxy = NewHttpApiServer( lg.With("component", "http-proxy"), p.AlertingClient, @@ -174,6 +174,7 @@ func NewPlugin(ctx context.Context) *Plugin { future.Wait1(p.storageClientSet, func(s spec.AlertingClientSet) { p.NotificationServerComponent.Initialize(notifications.NotificationServerConfiguration{ ConditionStorage: s.Conditions(), + EndpointStorage: s.Endpoints(), }) p.EndpointServerComponent.Initialize(endpoints.EndpointServerConfiguration{ @@ -181,7 +182,6 @@ func NewPlugin(ctx context.Context) *Plugin { EndpointStorage: s.Endpoints(), RouterStorage: s.Routers(), HashRing: s, - ManualSync: p.SendManualSyncRequest, }) serverCfg := server.Config{ diff --git a/plugins/alerting/pkg/alerting/system.go b/plugins/alerting/pkg/alerting/system.go index 6cb204aeda..e37dd3158d 100644 --- a/plugins/alerting/pkg/alerting/system.go +++ b/plugins/alerting/pkg/alerting/system.go @@ -238,7 +238,7 @@ func listPeers(replicas int) []alertingClient.AlertingPeer { for i := 0; i < replicas; i++ { peers = append(peers, alertingClient.AlertingPeer{ ApiAddress: fmt.Sprintf("http://%s-%d.%s:9093", shared.AlertmanagerService, i, shared.AlertmanagerService), - EmbeddedAddress: fmt.Sprintf("http://%s-%d.%s:3000", shared.EmitterService, i, shared.EmitterService), + EmbeddedAddress: fmt.Sprintf("http://%s-%d.%s:3000", shared.AlertmanagerService, i, shared.AlertmanagerService), }) } return peers diff --git a/plugins/alerting/pkg/node_backend/backend.go b/plugins/alerting/pkg/node_backend/backend.go index f07ac004d3..71d1cec894 100644 --- a/plugins/alerting/pkg/node_backend/backend.go +++ b/plugins/alerting/pkg/node_backend/backend.go @@ -3,6 +3,7 @@ package node_backend import ( "context" "sync" + "sync/atomic" "github.com/google/go-cmp/cmp" capabilityv1 "github.com/rancher/opni/pkg/apis/capability/v1" @@ -84,10 +85,19 @@ var ( _ capabilityv1.BackendServer = (*AlertingNodeBackend)(nil) ) -var FallbackDefaultNodeSpec = &node.AlertingCapabilitySpec{ - RuleDiscovery: &node.RuleDiscoverySpec{ - Enabled: true, - }, +var ( + // The "default" default node spec. Exported for testing purposes. + FallbackDefaultNodeSpec atomic.Pointer[node.AlertingCapabilitySpec] +) + +func init() { + FallbackDefaultNodeSpec.Store( + &node.AlertingCapabilitySpec{ + RuleDiscovery: &node.RuleDiscoverySpec{ + Enabled: true, + }, + }, + ) } func (a *AlertingNodeBackend) requestNodeSync(ctx context.Context, node *corev1.Reference) { @@ -199,7 +209,7 @@ func (a *AlertingNodeBackend) SetNodeConfiguration(ctx context.Context, req *nod func (a *AlertingNodeBackend) getDefaultNodeSpec(ctx context.Context) (*node.AlertingCapabilitySpec, error) { spec, err := a.capabilityKV.Get().DefaultCapabilitySpec.Get(ctx) if status.Code(err) == codes.NotFound { - spec = FallbackDefaultNodeSpec + spec = FallbackDefaultNodeSpec.Load() } else if err != nil { return nil, status.Errorf(codes.Unavailable, "failed to get default capability spec : %s", err) } diff --git a/plugins/alerting/test/test_drivers.go b/plugins/alerting/test/test_drivers.go index 3366ae0364..a945323cf7 100644 --- a/plugins/alerting/test/test_drivers.go +++ b/plugins/alerting/test/test_drivers.go @@ -25,6 +25,7 @@ import ( "github.com/rancher/opni/pkg/alerting/client" "github.com/rancher/opni/pkg/alerting/drivers/config" "github.com/rancher/opni/pkg/alerting/drivers/routing" + "github.com/rancher/opni/pkg/alerting/extensions" "github.com/rancher/opni/pkg/alerting/shared" corev1 "github.com/rancher/opni/pkg/apis/core/v1" "github.com/rancher/opni/pkg/logger" @@ -88,6 +89,8 @@ type TestEnvAlertingClusterDriver struct { alertops.UnsafeAlertingAdminServer client.AlertingClient + embdServerAddress string + subscribers []chan []client.AlertingPeer } @@ -106,12 +109,17 @@ func NewTestEnvAlertingClusterDriver(env *test.Environment, options TestEnvAlert lg := logger.NewPluginLogger().Named("alerting-test-cluster-driver") lg = lg.With("config-file", configFile) + initial := &atomic.Bool{} + initial.Store(false) + ePort := freeport.GetFreePort() + opniAddr := fmt.Sprintf("127.0.0.1:%d", ePort) + _ = extensions.StartOpniEmbeddedServer(env.Context(), opniAddr, false) rTree := routing.NewRoutingTree(&config.WebhookConfig{ NotifierConfig: config.NotifierConfig{ VSendResolved: false, }, URL: &amCfg.URL{ - URL: util.Must(url.Parse("http://localhost:6006")), + URL: util.Must(url.Parse(fmt.Sprintf("http://%s%s", opniAddr, shared.AlertingDefaultHookName))), }, }) rTreeBytes, err := yaml.Marshal(rTree) @@ -122,8 +130,6 @@ func NewTestEnvAlertingClusterDriver(env *test.Environment, options TestEnvAlert if err != nil { panic(err) } - initial := &atomic.Bool{} - initial.Store(false) return &TestEnvAlertingClusterDriver{ env: env, @@ -134,9 +140,21 @@ func NewTestEnvAlertingClusterDriver(env *test.Environment, options TestEnvAlert ClusterConfiguration: &alertops.ClusterConfiguration{ ResourceLimits: &alertops.ResourceLimitSpec{}, }, - logger: lg, - subscribers: options.Subscribers, - stateMu: &sync.RWMutex{}, + logger: lg, + subscribers: options.Subscribers, + stateMu: &sync.RWMutex{}, + embdServerAddress: opniAddr, + } +} + +func (l *TestEnvAlertingClusterDriver) GetDefaultReceiver() *config.WebhookConfig { + return &config.WebhookConfig{ + NotifierConfig: config.NotifierConfig{ + VSendResolved: false, + }, + URL: &amCfg.URL{ + URL: util.Must(url.Parse(fmt.Sprintf("http://%s%s", l.embdServerAddress, shared.AlertingDefaultHookName))), + }, } } @@ -179,7 +197,7 @@ func (l *TestEnvAlertingClusterDriver) ConfigureCluster(_ context.Context, confi for _, inst := range l.managedInstances { peers = append(peers, client.AlertingPeer{ ApiAddress: fmt.Sprintf("http://127.0.0.1:%d", inst.AlertManagerPort), - EmbeddedAddress: "http://127.0.0.1:6006", + EmbeddedAddress: fmt.Sprintf("http://%s", l.embdServerAddress), }) } l.AlertingClient.MemberlistClient().SetKnownPeers(peers) @@ -234,14 +252,14 @@ func (l *TestEnvAlertingClusterDriver) InstallCluster(_ context.Context, _ *empt l.AlertingClient = client.NewClient( nil, fmt.Sprintf("http://127.0.0.1:%d", l.managedInstances[0].AlertManagerPort), - "http://127.0.0.1:6006", + fmt.Sprintf("http://%s", l.embdServerAddress), ) peers := []client.AlertingPeer{} for _, inst := range l.managedInstances { peers = append(peers, client.AlertingPeer{ ApiAddress: fmt.Sprintf("http://127.0.0.1:%d", inst.AlertManagerPort), - EmbeddedAddress: "http://127.0.0.1:6006", + EmbeddedAddress: fmt.Sprintf("http://%s", l.embdServerAddress), }) } l.AlertingClient.MemberlistClient().SetKnownPeers(peers) diff --git a/plugins/alerting/test/test_plugin.go b/plugins/alerting/test/test_plugin.go index 2a56ba214d..865022d034 100644 --- a/plugins/alerting/test/test_plugin.go +++ b/plugins/alerting/test/test_plugin.go @@ -11,6 +11,7 @@ import ( "github.com/rancher/opni/plugins/alerting/pkg/agent" "github.com/rancher/opni/plugins/alerting/pkg/alerting" "github.com/rancher/opni/plugins/alerting/pkg/alerting/alarms/v1" + endpointv1 "github.com/rancher/opni/plugins/alerting/pkg/alerting/endpoints/v1" "google.golang.org/protobuf/types/known/durationpb" ) @@ -59,6 +60,7 @@ func init() { alarms.CapabilityStreamEvaluateInterval = time.Minute * 100 alarms.CortexStreamEvaluateInterval = time.Second * 1 test.EnablePlugin(meta.ModeAgent, agent.Scheme) + endpointv1.RetryTestEdnpoint = time.Millisecond * 50 agent.RuleSyncInterval = time.Second * 1 } diff --git a/test/plugins/alerting/alerting_test.go b/test/plugins/alerting/alerting_test.go index ea041b1235..eef4aee75c 100644 --- a/test/plugins/alerting/alerting_test.go +++ b/test/plugins/alerting/alerting_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "math/rand" "net" @@ -149,8 +150,10 @@ func BuildAlertingClusterIntegrationTests( ts := timestamppb.Now() for _, comp := range info.Components { Expect(comp.LastHandshake.AsTime()).To(BeTemporally("<", ts.AsTime())) - if comp.ConnectInfo.SyncId != info.CurSyncId { - Expect(comp.LastHandshake.AsTime()).To(BeTemporally(">", ts.AsTime().Add(15*(-time.Second))), "sync is stuttering") + if comp.ConnectInfo.GetState() != alertops.SyncState_Synced { + if comp.ConnectInfo.SyncId != info.CurSyncId { + Expect(comp.LastHandshake.AsTime()).To(BeTemporally(">", ts.AsTime().Add(15*(-time.Second))), "sync is stuttering") + } } } } @@ -198,7 +201,7 @@ func BuildAlertingClusterIntegrationTests( return fmt.Errorf("cluster config not equal : not applied") } return err - }, time.Second*5, time.Millisecond*200) + }, time.Second*5, time.Millisecond*200).Should(Succeed()) }) Specify("the alerting plugin components should be running and healthy", func() { @@ -254,24 +257,33 @@ func BuildAlertingClusterIntegrationTests( By("verifying they are reachable") for _, endp := range endpList.GetItems() { - _, err := alertEndpointsClient.TestAlertEndpoint(env.Context(), &alertingv1.TestAlertEndpointRequest{ - Endpoint: endp.GetEndpoint(), - }) + Expect(endp.GetId()).NotTo(BeNil()) + _, err := alertNotificationsClient.TestAlertEndpoint(env.Context(), endp.GetId()) Expect(err).To(Succeed()) } + maxSuccesses := 0 Eventually(func() error { + success := 0 + errs := []error{} for _, server := range servers { if len(server.GetBuffer()) == 0 { - return fmt.Errorf("server %s did not receive any alerts", server.Endpoint().Name) + if success > maxSuccesses { + maxSuccesses = success + } + errs = append(errs, fmt.Errorf("server %v did not receive any alerts", server.Endpoint())) + } else { + success++ } } + if len(errs) > 0 { + return errors.Join(errs...) + } return nil - }, time.Second*30, time.Millisecond*100) + }, time.Second*15, time.Millisecond*100).Should(Succeed(), fmt.Sprintf("only %d/%d servers received alerts", maxSuccesses, numServers)) for _, server := range servers { server.ClearBuffer() } - }) It("should create some default conditions when bootstrapping agents", func() { @@ -369,14 +381,33 @@ func BuildAlertingClusterIntegrationTests( }, }) Expect(err).To(Succeed()) + disconnectStatusList, err := alertConditionsClient.ListAlertConditionsWithStatus(env.Context(), &alertingv1.ListStatusRequest{ + ItemFilter: &alertingv1.ListAlertConditionRequest{ + AlertTypes: []alertingv1.AlertType{ + alertingv1.AlertType_System, + }, + }, + }) + Expect(err).To(Succeed()) capabilityList, err := alertConditionsClient.ListAlertConditions(env.Context(), &alertingv1.ListAlertConditionRequest{ AlertTypes: []alertingv1.AlertType{ alertingv1.AlertType_DownstreamCapability, }, }) + Expect(err).To(Succeed()) + + capabilityStatusList, err := alertConditionsClient.ListAlertConditionsWithStatus(env.Context(), &alertingv1.ListStatusRequest{ + ItemFilter: &alertingv1.ListAlertConditionRequest{ + AlertTypes: []alertingv1.AlertType{ + alertingv1.AlertType_DownstreamCapability, + }, + }, + }) + Expect(err).To(Succeed()) Expect(capabilityList.Items).To(HaveLen(len(disconnectList.Items))) + Expect(capabilityStatusList.GetAlertConditions()).To(HaveLen(len(disconnectStatusList.GetAlertConditions()))) }) It("should be able to attach endpoints to conditions", func() { @@ -461,9 +492,15 @@ func BuildAlertingClusterIntegrationTests( return nil }, time.Second*30, time.Second).Should(Succeed()) By("verifying the routing relationships are correctly loaded") + Eventually(func() int { + relationships, err := alertNotificationsClient.ListRoutingRelationships(env.Context(), &emptypb.Empty{}) + if err != nil { + return -1 + } + return len(relationships.RoutingRelationships) + }).Should(Equal(len(expectedRouting))) relationships, err := alertNotificationsClient.ListRoutingRelationships(env.Context(), &emptypb.Empty{}) Expect(err).To(Succeed()) - Expect(len(relationships.RoutingRelationships)).To(Equal(len(expectedRouting))) for endpId, rel := range relationships.RoutingRelationships { slices.SortFunc(rel.Items, func(a, b *alertingv1.ConditionReference) bool { @@ -576,7 +613,7 @@ func BuildAlertingClusterIntegrationTests( } } return nil - }, time.Second*5, time.Second*1) + }, time.Second*5, time.Second*1).Should(Succeed()) }) It("should be able to batch list status and filter by status", func() { @@ -626,10 +663,10 @@ func BuildAlertingClusterIntegrationTests( } } return nil - }, time.Second*5, time.Second) + }, time.Second*60, time.Second).Should(Succeed()) }) - XIt("should be able to list opni messages", func() { + It("should be able to list opni messages", func() { Eventually(func() error { list, err := alertNotificationsClient.ListNotifications(env.Context(), &alertingv1.ListNotificationRequest{}) if err != nil { @@ -639,7 +676,7 @@ func BuildAlertingClusterIntegrationTests( return fmt.Errorf("expected to find at least one notification, got 0") } return nil - }, time.Second*60, time.Second).Should(BeNil()) + }, time.Second*60, time.Second).Should(Succeed()) By("verifying we enforce limits") list, err := alertNotificationsClient.ListNotifications(env.Context(), &alertingv1.ListNotificationRequest{ @@ -714,8 +751,9 @@ func BuildAlertingClusterIntegrationTests( ConditionId: &alertingv1.ConditionReference{ Id: id, }, - Start: item.Windows[0].Start, - End: timestamppb.Now(), + Fingerprints: item.Windows[0].Fingerprints, + Start: item.Windows[0].Start, + End: timestamppb.Now(), }) if err != nil { return err @@ -732,7 +770,7 @@ func BuildAlertingClusterIntegrationTests( } } return nil - }, time.Second*15, time.Second) + }, time.Second*15, time.Second).Should(Succeed()) }) Specify("the alertmanager proxy served by the Gateway HTTP port should be able to list the alarms", func() { diff --git a/test/plugins/alerting/metrics_test.go b/test/plugins/alerting/metrics_test.go index df475e3431..12596a4e3b 100644 --- a/test/plugins/alerting/metrics_test.go +++ b/test/plugins/alerting/metrics_test.go @@ -2,6 +2,7 @@ package alerting_test import ( "context" + "errors" "fmt" "time" @@ -15,6 +16,7 @@ import ( "github.com/rancher/opni/pkg/test" "github.com/rancher/opni/pkg/test/alerting" "github.com/rancher/opni/plugins/alerting/apis/alertops" + "github.com/rancher/opni/plugins/metrics/apis/cortexadmin" "github.com/rancher/opni/plugins/metrics/apis/cortexops" _ "github.com/rancher/opni/plugins/metrics/test" "github.com/samber/lo" @@ -174,13 +176,14 @@ var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { return fmt.Errorf("unexpected amount of alert conditions %d. expected %d", len(statuses.GetAlertConditions()), 3) } return nil - }) + }).Should(Succeed()) }) - Specify("the metrics -> alerting pipeline should be functional", func() { + Specify("the metrics -> alerting pipeline should be functional", FlakeAttempts(4), func() { alertConditionsClient := env.NewAlertConditionsClient() - By("installing the metrics capabilities") mgmtClient := env.NewManagementClient() + cortexAdminClient := cortexadmin.NewCortexAdminClient(env.ManagementClientConn()) + By("installing the metrics capabilities") for _, agent := range agents { _, err := mgmtClient.InstallCapability(env.Context(), &managementv1.CapabilityInstallRequest{ Name: "metrics", @@ -191,15 +194,103 @@ var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { Expect(err).NotTo(HaveOccurred()) } - By("verifying the conditions move to the firing state", func() { - By("making sure when metrics aren't installed, the conditions are invalid") - Eventually(func() error { + By("verifying the metrics alerts are loaded properly") + Eventually(func() error { + allStatuses, err := alertConditionsClient.ListAlertConditionsWithStatus(env.Context(), &alertingv1.ListStatusRequest{ + States: []alertingv1.AlertConditionState{}, + ItemFilter: &alertingv1.ListAlertConditionRequest{ + Clusters: agents, + Severities: []alertingv1.OpniSeverity{}, + Labels: []string{}, + AlertTypes: []alertingv1.AlertType{ + alertingv1.AlertType_PrometheusQuery, + }, + }, + }) + if err != nil { + return err + } + statuses, err := alertConditionsClient.ListAlertConditionsWithStatus(env.Context(), &alertingv1.ListStatusRequest{ + States: []alertingv1.AlertConditionState{ + alertingv1.AlertConditionState_Firing, + }, + ItemFilter: &alertingv1.ListAlertConditionRequest{ + Clusters: agents, + Severities: []alertingv1.OpniSeverity{}, + Labels: []string{}, + AlertTypes: []alertingv1.AlertType{ + alertingv1.AlertType_PrometheusQuery, + }, + }, + }) + if err != nil { + return err + } + + clusters, err := mgmtClient.ListClusters(env.Context(), &managementv1.ListClustersRequest{}) + if err != nil { + return err + } + rules, err := cortexAdminClient.ListRules(env.Context(), &cortexadmin.ListRulesRequest{ + ClusterId: lo.Map(clusters.GetItems(), func(cl *corev1.Cluster, _ int) string { + return cl.GetId() + }), + RuleType: []string{"alerting"}, + NamespaceRegexp: "opni-alerting", + }) + if err != nil { + return err + } + By("checking the prometheus alert conditions loaded the rule groups for each agent") + if len(rules.Data.GetGroups()) != len(agents) { + return fmt.Errorf("not enough rules found %d, expected %d %v", len(rules.Data.GetGroups()), len(agents), rules.Data) + } + + loadedClusters := lo.Map(rules.Data.GetGroups(), func(g *cortexadmin.RuleGroup, _ int) string { + return g.ClusterId + }) + loadedClusters = lo.Uniq(loadedClusters) + Expect(loadedClusters).To(ConsistOf(agents)) + + By("checking we have the correct amount of prometheus alert conditions loaded") + if len(allStatuses.GetAlertConditions()) != len(agents) { + return fmt.Errorf("unexpected amount of alert conditions %d, expected %d : %v", len(statuses.GetAlertConditions()), 3, allStatuses.GetAlertConditions()) + } + errs := []error{} + numFiring := 0 // FIXME: it looks like the test_driver metrics agent does not always send metrics + for _, cond := range statuses.GetAlertConditions() { + status, err := alertConditionsClient.AlertConditionStatus(env.Context(), &alertingv1.ConditionReference{ + Id: cond.AlertCondition.Id, + GroupId: cond.AlertCondition.GroupId, + }) + if err != nil { + return err + } + state := status.State + if status.State != alertingv1.AlertConditionState_Firing && status.State != alertingv1.AlertConditionState_Ok { + errs = append(errs, fmt.Errorf("condition %s is in unexpected state %s", cond.AlertCondition.Name, state.String())) + } + if status.State == alertingv1.AlertConditionState_Firing { + numFiring++ + } + } + if numFiring == 0 { + errs = append(errs, errors.New("no sanity metrics are firing")) + } + return errors.Join(errs...) + }, time.Second*10, time.Millisecond*500).Should(Succeed()) + + By("verifying the webhook endpoints have received the message if sanity metrics are firing") + Eventually(func() error { + errs := []error{} + numFiring := 0 //FIXME: metrics agent test_driver does not always send metrics + for agent, webhooks := range agentAlertingEndpoints { statuses, err := alertConditionsClient.ListAlertConditionsWithStatus(env.Context(), &alertingv1.ListStatusRequest{ States: []alertingv1.AlertConditionState{ alertingv1.AlertConditionState_Firing, }, ItemFilter: &alertingv1.ListAlertConditionRequest{ - Clusters: agents, + Clusters: []string{agent}, Severities: []alertingv1.OpniSeverity{}, Labels: []string{}, AlertTypes: []alertingv1.AlertType{ @@ -208,38 +299,22 @@ var _ = Describe("metrics and alerting", Ordered, Label("integration"), func() { }, }) if err != nil { - return err + errs = append(errs, err) + continue } - for _, cond := range statuses.GetAlertConditions() { - status, err := alertConditionsClient.AlertConditionStatus(env.Context(), &alertingv1.ConditionReference{ - Id: cond.AlertCondition.Id, - GroupId: cond.AlertCondition.GroupId, - }) - if err != nil { - return err - } - if status.State != alertingv1.AlertConditionState_Firing { - return fmt.Errorf("condition %s is not firing", cond.AlertCondition.Name) - } - } - - if len(statuses.GetAlertConditions()) != 3 { - return fmt.Errorf("unexpected amount of alert conditions %d, expected %d", len(statuses.GetAlertConditions()), 3) - } - return nil - }, time.Second*10, time.Millisecond*500) - }) - - By("verifying the webhook endpoints have received the message") - Eventually(func() error { - for agent, webhooks := range agentAlertingEndpoints { - for _, webhook := range webhooks { - if len(webhook.GetBuffer()) == 0 { - return fmt.Errorf("endpoints for conditions created for '%s' agent have not received messages", agent) + if len(statuses.GetAlertConditions()) > 0 { + numFiring++ + for _, webhook := range webhooks { + if len(webhook.GetBuffer()) == 0 { + errs = append(errs, fmt.Errorf("no messages received on webhook %s for agent %s", webhook.EndpointId, agent)) + } } } } - return nil + if numFiring == 0 { + errs = append(errs, errors.New("no sanity metrics are firing, definitely investigate")) + } + return errors.Join(errs...) }, time.Second*30, time.Millisecond*500).Should(Succeed()) }) }) diff --git a/test/plugins/alerting/node_config_test.go b/test/plugins/alerting/node_config_test.go index 8df54d5534..df237ae056 100644 --- a/test/plugins/alerting/node_config_test.go +++ b/test/plugins/alerting/node_config_test.go @@ -130,7 +130,7 @@ var _ = Describe("Node Config", Ordered, Label("integration"), func() { }).Should(Succeed()) // replace the standard default config with the test environment config - backend.FallbackDefaultNodeSpec = util.ProtoClone(defaultConfig) + backend.FallbackDefaultNodeSpec.Store(util.ProtoClone(defaultConfig)) spec, isDefault, err := getConfig("agent1") Expect(err).NotTo(HaveOccurred()) diff --git a/test/plugins/alerting/routing_test.go b/test/plugins/alerting/routing_test.go index ea623c9d3a..3768c79842 100644 --- a/test/plugins/alerting/routing_test.go +++ b/test/plugins/alerting/routing_test.go @@ -296,7 +296,9 @@ func (t testSpecSuite) ExpectAlertsToBeRouted(amPort int) error { } } } - Expect(found).To(BeTrue()) + if !found { + return fmt.Errorf("expected to find finalizer for '%s'=%s in alertmanager state", ns, conditionId) + } } // Addr is unique for each server uniqServers := map[string]lo.Tuple2[*alerting.MockIntegrationWebhookServer, string]{} @@ -313,8 +315,9 @@ func (t testSpecSuite) ExpectAlertsToBeRouted(amPort int) error { expectedIds[server.Addr] = append(expectedIds[server.Addr], spec.id) } } - - Expect(expectedIds).NotTo(HaveLen(0)) + if len(expectedIds) == 0 { + return fmt.Errorf("expected to find at least one server") + } for _, server := range uniqServers { ids := []string{} for _, msg := range server.A.GetBuffer() { diff --git a/web/pkg/opni/components/Endpoint/index.vue b/web/pkg/opni/components/Endpoint/index.vue index 0d725c1b36..9c961924d9 100644 --- a/web/pkg/opni/components/Endpoint/index.vue +++ b/web/pkg/opni/components/Endpoint/index.vue @@ -7,7 +7,12 @@ import Tab from '@shell/components/Tabbed/Tab'; import Tabbed from '@shell/components/Tabbed'; import { Banner } from '@components/Banner'; import { exceptionToErrorsArray } from '@pkg/opni/utils/error'; -import { createAlertEndpoint, getAlertEndpoint, testAlertEndpoint, updateAlertEndpoint } from '@pkg/opni/utils/requests/alerts'; +import { + createAlertEndpoint, + getAlertEndpoint, + testAlertEndpoint, + updateAlertEndpoint, +} from '@pkg/opni/utils/requests/alerts'; import Slack from './Slack'; import Email from './Email'; import PagerDuty from './PagerDuty'; @@ -29,7 +34,8 @@ export default { }, async fetch() { - const endpointRequest = this.$route.params.id && this.$route.params.id !== 'create' ? getAlertEndpoint(this.$route.params.id, this) : Promise.resolve(false); + const endpointRequest = + this.$route.params.id && this.$route.params.id !== 'create' ? getAlertEndpoint(this.$route.params.id, this) : Promise.resolve(false); if (await endpointRequest) { const endpoint = await endpointRequest; @@ -45,34 +51,37 @@ export default { const types = [ { label: 'Slack', - value: 'slack' + value: 'slack', }, { label: 'Email', - value: 'email' + value: 'email', }, { label: 'PagerDuty', - value: 'pagerDuty' + value: 'pagerDuty', }, { label: 'Webhook', - value: 'webhook' - } + value: 'webhook', + }, ]; const type = types[3].value; return { - error: '', + error: '', types, type, config: { name: '', description: '', endpoint: { - slack: {}, email: {}, pagerDuty: {}, webhook: {} - } - } + slack: {}, + email: {}, + pagerDuty: {}, + webhook: {}, + }, + }, }; }, @@ -92,7 +101,7 @@ export default { const updateConfig = { forceUpdate: true, id: { id: this.$route.params.id }, - updateAlert: config + updateAlert: config, }; await updateAlertEndpoint(updateConfig); @@ -117,20 +126,23 @@ export default { createConfig() { return { - name: this.config.name, description: this.config.description, [this.type]: this.config.endpoint[this.type], id: this.$route.params.id + name: this.config.name, + description: this.config.description, + [this.type]: this.config.endpoint[this.type], + id: this.$route.params.id, }; }, async testEndpoint(buttonCallback) { try { - await testAlertEndpoint({ endpoint: this.createConfig() }); + await testAlertEndpoint({ id: this.$route.params.id }); this.$set(this, 'error', ''); buttonCallback(true); } catch (err) { this.$set(this, 'error', exceptionToErrorsArray(err).join('; ')); buttonCallback(false); } - } + }, }, computed: { @@ -148,34 +160,39 @@ export default {