diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index dddaa6b8b5..348f7874a6 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc (unknown) // source: mpi/v1/command.proto @@ -88,6 +88,70 @@ func (InstanceHealth_InstanceHealthStatus) EnumDescriptor() ([]byte, []int) { return file_mpi_v1_command_proto_rawDescGZIP(), []int{9, 0} } +type DataPlaneResponse_RequestType int32 + +const ( + DataPlaneResponse_UNSPECIFIED_REQUEST DataPlaneResponse_RequestType = 0 + DataPlaneResponse_CONFIG_APPLY_REQUEST DataPlaneResponse_RequestType = 1 + DataPlaneResponse_CONFIG_UPLOAD_REQUEST DataPlaneResponse_RequestType = 2 + DataPlaneResponse_HEALTH_REQUEST DataPlaneResponse_RequestType = 3 + DataPlaneResponse_STATUS_REQUEST DataPlaneResponse_RequestType = 4 + DataPlaneResponse_API_ACTION_REQUEST DataPlaneResponse_RequestType = 5 + DataPlaneResponse_COMMAND_STATUS_REQUEST DataPlaneResponse_RequestType = 6 + DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST DataPlaneResponse_RequestType = 7 +) + +// Enum value maps for DataPlaneResponse_RequestType. +var ( + DataPlaneResponse_RequestType_name = map[int32]string{ + 0: "UNSPECIFIED_REQUEST", + 1: "CONFIG_APPLY_REQUEST", + 2: "CONFIG_UPLOAD_REQUEST", + 3: "HEALTH_REQUEST", + 4: "STATUS_REQUEST", + 5: "API_ACTION_REQUEST", + 6: "COMMAND_STATUS_REQUEST", + 7: "UPDATE_AGENT_CONFIG_REQUEST", + } + DataPlaneResponse_RequestType_value = map[string]int32{ + "UNSPECIFIED_REQUEST": 0, + "CONFIG_APPLY_REQUEST": 1, + "CONFIG_UPLOAD_REQUEST": 2, + "HEALTH_REQUEST": 3, + "STATUS_REQUEST": 4, + "API_ACTION_REQUEST": 5, + "COMMAND_STATUS_REQUEST": 6, + "UPDATE_AGENT_CONFIG_REQUEST": 7, + } +) + +func (x DataPlaneResponse_RequestType) Enum() *DataPlaneResponse_RequestType { + p := new(DataPlaneResponse_RequestType) + *p = x + return p +} + +func (x DataPlaneResponse_RequestType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (DataPlaneResponse_RequestType) Descriptor() protoreflect.EnumDescriptor { + return file_mpi_v1_command_proto_enumTypes[1].Descriptor() +} + +func (DataPlaneResponse_RequestType) Type() protoreflect.EnumType { + return &file_mpi_v1_command_proto_enumTypes[1] +} + +func (x DataPlaneResponse_RequestType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use DataPlaneResponse_RequestType.Descriptor instead. +func (DataPlaneResponse_RequestType) EnumDescriptor() ([]byte, []int) { + return file_mpi_v1_command_proto_rawDescGZIP(), []int{12, 0} +} + // the types of instances possible type InstanceMeta_InstanceType int32 @@ -137,11 +201,11 @@ func (x InstanceMeta_InstanceType) String() string { } func (InstanceMeta_InstanceType) Descriptor() protoreflect.EnumDescriptor { - return file_mpi_v1_command_proto_enumTypes[1].Descriptor() + return file_mpi_v1_command_proto_enumTypes[2].Descriptor() } func (InstanceMeta_InstanceType) Type() protoreflect.EnumType { - return &file_mpi_v1_command_proto_enumTypes[1] + return &file_mpi_v1_command_proto_enumTypes[2] } func (x InstanceMeta_InstanceType) Number() protoreflect.EnumNumber { @@ -193,11 +257,11 @@ func (x Log_LogLevel) String() string { } func (Log_LogLevel) Descriptor() protoreflect.EnumDescriptor { - return file_mpi_v1_command_proto_enumTypes[2].Descriptor() + return file_mpi_v1_command_proto_enumTypes[3].Descriptor() } func (Log_LogLevel) Type() protoreflect.EnumType { - return &file_mpi_v1_command_proto_enumTypes[2] + return &file_mpi_v1_command_proto_enumTypes[3] } func (x Log_LogLevel) Number() protoreflect.EnumNumber { @@ -942,7 +1006,9 @@ type DataPlaneResponse struct { // The command response with the associated request CommandResponse *CommandResponse `protobuf:"bytes,2,opt,name=command_response,json=commandResponse,proto3" json:"command_response,omitempty"` // The instance identifier, if applicable, for this response - InstanceId string `protobuf:"bytes,3,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"` + InstanceId string `protobuf:"bytes,3,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"` + // The management plane request type that is being responded to + RequestType DataPlaneResponse_RequestType `protobuf:"varint,4,opt,name=request_type,json=requestType,proto3,enum=mpi.v1.DataPlaneResponse_RequestType" json:"request_type,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -998,6 +1064,13 @@ func (x *DataPlaneResponse) GetInstanceId() string { return "" } +func (x *DataPlaneResponse) GetRequestType() DataPlaneResponse_RequestType { + if x != nil { + return x.RequestType + } + return DataPlaneResponse_UNSPECIFIED_REQUEST +} + // A Management Plane request for information, triggers an associated rpc on the Data Plane type ManagementPlaneRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2974,12 +3047,22 @@ const file_mpi_v1_command_proto_rawDesc = "" + "\x1cUpdateDataPlaneHealthRequest\x126\n" + "\fmessage_meta\x18\x01 \x01(\v2\x13.mpi.v1.MessageMetaR\vmessageMeta\x12A\n" + "\x10instance_healths\x18\x02 \x03(\v2\x16.mpi.v1.InstanceHealthR\x0finstanceHealths\"\x1f\n" + - "\x1dUpdateDataPlaneHealthResponse\"\xb0\x01\n" + + "\x1dUpdateDataPlaneHealthResponse\"\xd5\x03\n" + "\x11DataPlaneResponse\x126\n" + "\fmessage_meta\x18\x01 \x01(\v2\x13.mpi.v1.MessageMetaR\vmessageMeta\x12B\n" + "\x10command_response\x18\x02 \x01(\v2\x17.mpi.v1.CommandResponseR\x0fcommandResponse\x12\x1f\n" + "\vinstance_id\x18\x03 \x01(\tR\n" + - "instanceId\"\xfa\x04\n" + + "instanceId\x12H\n" + + "\frequest_type\x18\x04 \x01(\x0e2%.mpi.v1.DataPlaneResponse.RequestTypeR\vrequestType\"\xd8\x01\n" + + "\vRequestType\x12\x17\n" + + "\x13UNSPECIFIED_REQUEST\x10\x00\x12\x18\n" + + "\x14CONFIG_APPLY_REQUEST\x10\x01\x12\x19\n" + + "\x15CONFIG_UPLOAD_REQUEST\x10\x02\x12\x12\n" + + "\x0eHEALTH_REQUEST\x10\x03\x12\x12\n" + + "\x0eSTATUS_REQUEST\x10\x04\x12\x16\n" + + "\x12API_ACTION_REQUEST\x10\x05\x12\x1a\n" + + "\x16COMMAND_STATUS_REQUEST\x10\x06\x12\x1f\n" + + "\x1bUPDATE_AGENT_CONFIG_REQUEST\x10\a\"\xfa\x04\n" + "\x16ManagementPlaneRequest\x126\n" + "\fmessage_meta\x18\x01 \x01(\v2\x13.mpi.v1.MessageMetaR\vmessageMeta\x12>\n" + "\x0estatus_request\x18\x02 \x01(\v2\x15.mpi.v1.StatusRequestH\x00R\rstatusRequest\x12>\n" + @@ -3132,138 +3215,140 @@ func file_mpi_v1_command_proto_rawDescGZIP() []byte { return file_mpi_v1_command_proto_rawDescData } -var file_mpi_v1_command_proto_enumTypes = make([]protoimpl.EnumInfo, 3) +var file_mpi_v1_command_proto_enumTypes = make([]protoimpl.EnumInfo, 4) var file_mpi_v1_command_proto_msgTypes = make([]protoimpl.MessageInfo, 42) var file_mpi_v1_command_proto_goTypes = []any{ (InstanceHealth_InstanceHealthStatus)(0), // 0: mpi.v1.InstanceHealth.InstanceHealthStatus - (InstanceMeta_InstanceType)(0), // 1: mpi.v1.InstanceMeta.InstanceType - (Log_LogLevel)(0), // 2: mpi.v1.Log.LogLevel - (*CreateConnectionRequest)(nil), // 3: mpi.v1.CreateConnectionRequest - (*Resource)(nil), // 4: mpi.v1.Resource - (*HostInfo)(nil), // 5: mpi.v1.HostInfo - (*ReleaseInfo)(nil), // 6: mpi.v1.ReleaseInfo - (*ContainerInfo)(nil), // 7: mpi.v1.ContainerInfo - (*CreateConnectionResponse)(nil), // 8: mpi.v1.CreateConnectionResponse - (*UpdateDataPlaneStatusRequest)(nil), // 9: mpi.v1.UpdateDataPlaneStatusRequest - (*UpdateDataPlaneStatusResponse)(nil), // 10: mpi.v1.UpdateDataPlaneStatusResponse - (*UpdateAgentConfigRequest)(nil), // 11: mpi.v1.UpdateAgentConfigRequest - (*InstanceHealth)(nil), // 12: mpi.v1.InstanceHealth - (*UpdateDataPlaneHealthRequest)(nil), // 13: mpi.v1.UpdateDataPlaneHealthRequest - (*UpdateDataPlaneHealthResponse)(nil), // 14: mpi.v1.UpdateDataPlaneHealthResponse - (*DataPlaneResponse)(nil), // 15: mpi.v1.DataPlaneResponse - (*ManagementPlaneRequest)(nil), // 16: mpi.v1.ManagementPlaneRequest - (*StatusRequest)(nil), // 17: mpi.v1.StatusRequest - (*HealthRequest)(nil), // 18: mpi.v1.HealthRequest - (*ConfigApplyRequest)(nil), // 19: mpi.v1.ConfigApplyRequest - (*ConfigUploadRequest)(nil), // 20: mpi.v1.ConfigUploadRequest - (*APIActionRequest)(nil), // 21: mpi.v1.APIActionRequest - (*NGINXPlusAction)(nil), // 22: mpi.v1.NGINXPlusAction - (*UpdateHTTPUpstreamServers)(nil), // 23: mpi.v1.UpdateHTTPUpstreamServers - (*GetHTTPUpstreamServers)(nil), // 24: mpi.v1.GetHTTPUpstreamServers - (*UpdateStreamServers)(nil), // 25: mpi.v1.UpdateStreamServers - (*GetUpstreams)(nil), // 26: mpi.v1.GetUpstreams - (*GetStreamUpstreams)(nil), // 27: mpi.v1.GetStreamUpstreams - (*CommandStatusRequest)(nil), // 28: mpi.v1.CommandStatusRequest - (*Instance)(nil), // 29: mpi.v1.Instance - (*InstanceMeta)(nil), // 30: mpi.v1.InstanceMeta - (*InstanceConfig)(nil), // 31: mpi.v1.InstanceConfig - (*InstanceRuntime)(nil), // 32: mpi.v1.InstanceRuntime - (*InstanceChild)(nil), // 33: mpi.v1.InstanceChild - (*NGINXRuntimeInfo)(nil), // 34: mpi.v1.NGINXRuntimeInfo - (*NGINXPlusRuntimeInfo)(nil), // 35: mpi.v1.NGINXPlusRuntimeInfo - (*APIDetails)(nil), // 36: mpi.v1.APIDetails - (*NGINXAppProtectRuntimeInfo)(nil), // 37: mpi.v1.NGINXAppProtectRuntimeInfo - (*InstanceAction)(nil), // 38: mpi.v1.InstanceAction - (*AgentConfig)(nil), // 39: mpi.v1.AgentConfig - (*Log)(nil), // 40: mpi.v1.Log - (*CommandServer)(nil), // 41: mpi.v1.CommandServer - (*AuxiliaryCommandServer)(nil), // 42: mpi.v1.AuxiliaryCommandServer - (*MetricsServer)(nil), // 43: mpi.v1.MetricsServer - (*FileServer)(nil), // 44: mpi.v1.FileServer - (*MessageMeta)(nil), // 45: mpi.v1.MessageMeta - (*CommandResponse)(nil), // 46: mpi.v1.CommandResponse - (*FileOverview)(nil), // 47: mpi.v1.FileOverview - (*structpb.Struct)(nil), // 48: google.protobuf.Struct - (*ServerSettings)(nil), // 49: mpi.v1.ServerSettings - (*AuthSettings)(nil), // 50: mpi.v1.AuthSettings - (*TLSSettings)(nil), // 51: mpi.v1.TLSSettings + (DataPlaneResponse_RequestType)(0), // 1: mpi.v1.DataPlaneResponse.RequestType + (InstanceMeta_InstanceType)(0), // 2: mpi.v1.InstanceMeta.InstanceType + (Log_LogLevel)(0), // 3: mpi.v1.Log.LogLevel + (*CreateConnectionRequest)(nil), // 4: mpi.v1.CreateConnectionRequest + (*Resource)(nil), // 5: mpi.v1.Resource + (*HostInfo)(nil), // 6: mpi.v1.HostInfo + (*ReleaseInfo)(nil), // 7: mpi.v1.ReleaseInfo + (*ContainerInfo)(nil), // 8: mpi.v1.ContainerInfo + (*CreateConnectionResponse)(nil), // 9: mpi.v1.CreateConnectionResponse + (*UpdateDataPlaneStatusRequest)(nil), // 10: mpi.v1.UpdateDataPlaneStatusRequest + (*UpdateDataPlaneStatusResponse)(nil), // 11: mpi.v1.UpdateDataPlaneStatusResponse + (*UpdateAgentConfigRequest)(nil), // 12: mpi.v1.UpdateAgentConfigRequest + (*InstanceHealth)(nil), // 13: mpi.v1.InstanceHealth + (*UpdateDataPlaneHealthRequest)(nil), // 14: mpi.v1.UpdateDataPlaneHealthRequest + (*UpdateDataPlaneHealthResponse)(nil), // 15: mpi.v1.UpdateDataPlaneHealthResponse + (*DataPlaneResponse)(nil), // 16: mpi.v1.DataPlaneResponse + (*ManagementPlaneRequest)(nil), // 17: mpi.v1.ManagementPlaneRequest + (*StatusRequest)(nil), // 18: mpi.v1.StatusRequest + (*HealthRequest)(nil), // 19: mpi.v1.HealthRequest + (*ConfigApplyRequest)(nil), // 20: mpi.v1.ConfigApplyRequest + (*ConfigUploadRequest)(nil), // 21: mpi.v1.ConfigUploadRequest + (*APIActionRequest)(nil), // 22: mpi.v1.APIActionRequest + (*NGINXPlusAction)(nil), // 23: mpi.v1.NGINXPlusAction + (*UpdateHTTPUpstreamServers)(nil), // 24: mpi.v1.UpdateHTTPUpstreamServers + (*GetHTTPUpstreamServers)(nil), // 25: mpi.v1.GetHTTPUpstreamServers + (*UpdateStreamServers)(nil), // 26: mpi.v1.UpdateStreamServers + (*GetUpstreams)(nil), // 27: mpi.v1.GetUpstreams + (*GetStreamUpstreams)(nil), // 28: mpi.v1.GetStreamUpstreams + (*CommandStatusRequest)(nil), // 29: mpi.v1.CommandStatusRequest + (*Instance)(nil), // 30: mpi.v1.Instance + (*InstanceMeta)(nil), // 31: mpi.v1.InstanceMeta + (*InstanceConfig)(nil), // 32: mpi.v1.InstanceConfig + (*InstanceRuntime)(nil), // 33: mpi.v1.InstanceRuntime + (*InstanceChild)(nil), // 34: mpi.v1.InstanceChild + (*NGINXRuntimeInfo)(nil), // 35: mpi.v1.NGINXRuntimeInfo + (*NGINXPlusRuntimeInfo)(nil), // 36: mpi.v1.NGINXPlusRuntimeInfo + (*APIDetails)(nil), // 37: mpi.v1.APIDetails + (*NGINXAppProtectRuntimeInfo)(nil), // 38: mpi.v1.NGINXAppProtectRuntimeInfo + (*InstanceAction)(nil), // 39: mpi.v1.InstanceAction + (*AgentConfig)(nil), // 40: mpi.v1.AgentConfig + (*Log)(nil), // 41: mpi.v1.Log + (*CommandServer)(nil), // 42: mpi.v1.CommandServer + (*AuxiliaryCommandServer)(nil), // 43: mpi.v1.AuxiliaryCommandServer + (*MetricsServer)(nil), // 44: mpi.v1.MetricsServer + (*FileServer)(nil), // 45: mpi.v1.FileServer + (*MessageMeta)(nil), // 46: mpi.v1.MessageMeta + (*CommandResponse)(nil), // 47: mpi.v1.CommandResponse + (*FileOverview)(nil), // 48: mpi.v1.FileOverview + (*structpb.Struct)(nil), // 49: google.protobuf.Struct + (*ServerSettings)(nil), // 50: mpi.v1.ServerSettings + (*AuthSettings)(nil), // 51: mpi.v1.AuthSettings + (*TLSSettings)(nil), // 52: mpi.v1.TLSSettings } var file_mpi_v1_command_proto_depIdxs = []int32{ - 45, // 0: mpi.v1.CreateConnectionRequest.message_meta:type_name -> mpi.v1.MessageMeta - 4, // 1: mpi.v1.CreateConnectionRequest.resource:type_name -> mpi.v1.Resource - 29, // 2: mpi.v1.Resource.instances:type_name -> mpi.v1.Instance - 5, // 3: mpi.v1.Resource.host_info:type_name -> mpi.v1.HostInfo - 7, // 4: mpi.v1.Resource.container_info:type_name -> mpi.v1.ContainerInfo - 6, // 5: mpi.v1.HostInfo.release_info:type_name -> mpi.v1.ReleaseInfo - 6, // 6: mpi.v1.ContainerInfo.release_info:type_name -> mpi.v1.ReleaseInfo - 46, // 7: mpi.v1.CreateConnectionResponse.response:type_name -> mpi.v1.CommandResponse - 39, // 8: mpi.v1.CreateConnectionResponse.agent_config:type_name -> mpi.v1.AgentConfig - 45, // 9: mpi.v1.UpdateDataPlaneStatusRequest.message_meta:type_name -> mpi.v1.MessageMeta - 4, // 10: mpi.v1.UpdateDataPlaneStatusRequest.resource:type_name -> mpi.v1.Resource - 45, // 11: mpi.v1.UpdateAgentConfigRequest.message_meta:type_name -> mpi.v1.MessageMeta - 39, // 12: mpi.v1.UpdateAgentConfigRequest.agent_config:type_name -> mpi.v1.AgentConfig + 46, // 0: mpi.v1.CreateConnectionRequest.message_meta:type_name -> mpi.v1.MessageMeta + 5, // 1: mpi.v1.CreateConnectionRequest.resource:type_name -> mpi.v1.Resource + 30, // 2: mpi.v1.Resource.instances:type_name -> mpi.v1.Instance + 6, // 3: mpi.v1.Resource.host_info:type_name -> mpi.v1.HostInfo + 8, // 4: mpi.v1.Resource.container_info:type_name -> mpi.v1.ContainerInfo + 7, // 5: mpi.v1.HostInfo.release_info:type_name -> mpi.v1.ReleaseInfo + 7, // 6: mpi.v1.ContainerInfo.release_info:type_name -> mpi.v1.ReleaseInfo + 47, // 7: mpi.v1.CreateConnectionResponse.response:type_name -> mpi.v1.CommandResponse + 40, // 8: mpi.v1.CreateConnectionResponse.agent_config:type_name -> mpi.v1.AgentConfig + 46, // 9: mpi.v1.UpdateDataPlaneStatusRequest.message_meta:type_name -> mpi.v1.MessageMeta + 5, // 10: mpi.v1.UpdateDataPlaneStatusRequest.resource:type_name -> mpi.v1.Resource + 46, // 11: mpi.v1.UpdateAgentConfigRequest.message_meta:type_name -> mpi.v1.MessageMeta + 40, // 12: mpi.v1.UpdateAgentConfigRequest.agent_config:type_name -> mpi.v1.AgentConfig 0, // 13: mpi.v1.InstanceHealth.instance_health_status:type_name -> mpi.v1.InstanceHealth.InstanceHealthStatus - 45, // 14: mpi.v1.UpdateDataPlaneHealthRequest.message_meta:type_name -> mpi.v1.MessageMeta - 12, // 15: mpi.v1.UpdateDataPlaneHealthRequest.instance_healths:type_name -> mpi.v1.InstanceHealth - 45, // 16: mpi.v1.DataPlaneResponse.message_meta:type_name -> mpi.v1.MessageMeta - 46, // 17: mpi.v1.DataPlaneResponse.command_response:type_name -> mpi.v1.CommandResponse - 45, // 18: mpi.v1.ManagementPlaneRequest.message_meta:type_name -> mpi.v1.MessageMeta - 17, // 19: mpi.v1.ManagementPlaneRequest.status_request:type_name -> mpi.v1.StatusRequest - 18, // 20: mpi.v1.ManagementPlaneRequest.health_request:type_name -> mpi.v1.HealthRequest - 19, // 21: mpi.v1.ManagementPlaneRequest.config_apply_request:type_name -> mpi.v1.ConfigApplyRequest - 20, // 22: mpi.v1.ManagementPlaneRequest.config_upload_request:type_name -> mpi.v1.ConfigUploadRequest - 21, // 23: mpi.v1.ManagementPlaneRequest.action_request:type_name -> mpi.v1.APIActionRequest - 28, // 24: mpi.v1.ManagementPlaneRequest.command_status_request:type_name -> mpi.v1.CommandStatusRequest - 11, // 25: mpi.v1.ManagementPlaneRequest.update_agent_config_request:type_name -> mpi.v1.UpdateAgentConfigRequest - 47, // 26: mpi.v1.ConfigApplyRequest.overview:type_name -> mpi.v1.FileOverview - 47, // 27: mpi.v1.ConfigUploadRequest.overview:type_name -> mpi.v1.FileOverview - 22, // 28: mpi.v1.APIActionRequest.nginx_plus_action:type_name -> mpi.v1.NGINXPlusAction - 23, // 29: mpi.v1.NGINXPlusAction.update_http_upstream_servers:type_name -> mpi.v1.UpdateHTTPUpstreamServers - 24, // 30: mpi.v1.NGINXPlusAction.get_http_upstream_servers:type_name -> mpi.v1.GetHTTPUpstreamServers - 25, // 31: mpi.v1.NGINXPlusAction.update_stream_servers:type_name -> mpi.v1.UpdateStreamServers - 26, // 32: mpi.v1.NGINXPlusAction.get_upstreams:type_name -> mpi.v1.GetUpstreams - 27, // 33: mpi.v1.NGINXPlusAction.get_stream_upstreams:type_name -> mpi.v1.GetStreamUpstreams - 48, // 34: mpi.v1.UpdateHTTPUpstreamServers.servers:type_name -> google.protobuf.Struct - 48, // 35: mpi.v1.UpdateStreamServers.servers:type_name -> google.protobuf.Struct - 30, // 36: mpi.v1.Instance.instance_meta:type_name -> mpi.v1.InstanceMeta - 31, // 37: mpi.v1.Instance.instance_config:type_name -> mpi.v1.InstanceConfig - 32, // 38: mpi.v1.Instance.instance_runtime:type_name -> mpi.v1.InstanceRuntime - 1, // 39: mpi.v1.InstanceMeta.instance_type:type_name -> mpi.v1.InstanceMeta.InstanceType - 38, // 40: mpi.v1.InstanceConfig.actions:type_name -> mpi.v1.InstanceAction - 39, // 41: mpi.v1.InstanceConfig.agent_config:type_name -> mpi.v1.AgentConfig - 34, // 42: mpi.v1.InstanceRuntime.nginx_runtime_info:type_name -> mpi.v1.NGINXRuntimeInfo - 35, // 43: mpi.v1.InstanceRuntime.nginx_plus_runtime_info:type_name -> mpi.v1.NGINXPlusRuntimeInfo - 37, // 44: mpi.v1.InstanceRuntime.nginx_app_protect_runtime_info:type_name -> mpi.v1.NGINXAppProtectRuntimeInfo - 33, // 45: mpi.v1.InstanceRuntime.instance_children:type_name -> mpi.v1.InstanceChild - 36, // 46: mpi.v1.NGINXRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails - 36, // 47: mpi.v1.NGINXPlusRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails - 36, // 48: mpi.v1.NGINXPlusRuntimeInfo.plus_api:type_name -> mpi.v1.APIDetails - 41, // 49: mpi.v1.AgentConfig.command:type_name -> mpi.v1.CommandServer - 43, // 50: mpi.v1.AgentConfig.metrics:type_name -> mpi.v1.MetricsServer - 44, // 51: mpi.v1.AgentConfig.file:type_name -> mpi.v1.FileServer - 48, // 52: mpi.v1.AgentConfig.labels:type_name -> google.protobuf.Struct - 42, // 53: mpi.v1.AgentConfig.auxiliary_command:type_name -> mpi.v1.AuxiliaryCommandServer - 40, // 54: mpi.v1.AgentConfig.log:type_name -> mpi.v1.Log - 2, // 55: mpi.v1.Log.log_level:type_name -> mpi.v1.Log.LogLevel - 49, // 56: mpi.v1.CommandServer.server:type_name -> mpi.v1.ServerSettings - 50, // 57: mpi.v1.CommandServer.auth:type_name -> mpi.v1.AuthSettings - 51, // 58: mpi.v1.CommandServer.tls:type_name -> mpi.v1.TLSSettings - 49, // 59: mpi.v1.AuxiliaryCommandServer.server:type_name -> mpi.v1.ServerSettings - 50, // 60: mpi.v1.AuxiliaryCommandServer.auth:type_name -> mpi.v1.AuthSettings - 51, // 61: mpi.v1.AuxiliaryCommandServer.tls:type_name -> mpi.v1.TLSSettings - 3, // 62: mpi.v1.CommandService.CreateConnection:input_type -> mpi.v1.CreateConnectionRequest - 9, // 63: mpi.v1.CommandService.UpdateDataPlaneStatus:input_type -> mpi.v1.UpdateDataPlaneStatusRequest - 13, // 64: mpi.v1.CommandService.UpdateDataPlaneHealth:input_type -> mpi.v1.UpdateDataPlaneHealthRequest - 15, // 65: mpi.v1.CommandService.Subscribe:input_type -> mpi.v1.DataPlaneResponse - 8, // 66: mpi.v1.CommandService.CreateConnection:output_type -> mpi.v1.CreateConnectionResponse - 10, // 67: mpi.v1.CommandService.UpdateDataPlaneStatus:output_type -> mpi.v1.UpdateDataPlaneStatusResponse - 14, // 68: mpi.v1.CommandService.UpdateDataPlaneHealth:output_type -> mpi.v1.UpdateDataPlaneHealthResponse - 16, // 69: mpi.v1.CommandService.Subscribe:output_type -> mpi.v1.ManagementPlaneRequest - 66, // [66:70] is the sub-list for method output_type - 62, // [62:66] is the sub-list for method input_type - 62, // [62:62] is the sub-list for extension type_name - 62, // [62:62] is the sub-list for extension extendee - 0, // [0:62] is the sub-list for field type_name + 46, // 14: mpi.v1.UpdateDataPlaneHealthRequest.message_meta:type_name -> mpi.v1.MessageMeta + 13, // 15: mpi.v1.UpdateDataPlaneHealthRequest.instance_healths:type_name -> mpi.v1.InstanceHealth + 46, // 16: mpi.v1.DataPlaneResponse.message_meta:type_name -> mpi.v1.MessageMeta + 47, // 17: mpi.v1.DataPlaneResponse.command_response:type_name -> mpi.v1.CommandResponse + 1, // 18: mpi.v1.DataPlaneResponse.request_type:type_name -> mpi.v1.DataPlaneResponse.RequestType + 46, // 19: mpi.v1.ManagementPlaneRequest.message_meta:type_name -> mpi.v1.MessageMeta + 18, // 20: mpi.v1.ManagementPlaneRequest.status_request:type_name -> mpi.v1.StatusRequest + 19, // 21: mpi.v1.ManagementPlaneRequest.health_request:type_name -> mpi.v1.HealthRequest + 20, // 22: mpi.v1.ManagementPlaneRequest.config_apply_request:type_name -> mpi.v1.ConfigApplyRequest + 21, // 23: mpi.v1.ManagementPlaneRequest.config_upload_request:type_name -> mpi.v1.ConfigUploadRequest + 22, // 24: mpi.v1.ManagementPlaneRequest.action_request:type_name -> mpi.v1.APIActionRequest + 29, // 25: mpi.v1.ManagementPlaneRequest.command_status_request:type_name -> mpi.v1.CommandStatusRequest + 12, // 26: mpi.v1.ManagementPlaneRequest.update_agent_config_request:type_name -> mpi.v1.UpdateAgentConfigRequest + 48, // 27: mpi.v1.ConfigApplyRequest.overview:type_name -> mpi.v1.FileOverview + 48, // 28: mpi.v1.ConfigUploadRequest.overview:type_name -> mpi.v1.FileOverview + 23, // 29: mpi.v1.APIActionRequest.nginx_plus_action:type_name -> mpi.v1.NGINXPlusAction + 24, // 30: mpi.v1.NGINXPlusAction.update_http_upstream_servers:type_name -> mpi.v1.UpdateHTTPUpstreamServers + 25, // 31: mpi.v1.NGINXPlusAction.get_http_upstream_servers:type_name -> mpi.v1.GetHTTPUpstreamServers + 26, // 32: mpi.v1.NGINXPlusAction.update_stream_servers:type_name -> mpi.v1.UpdateStreamServers + 27, // 33: mpi.v1.NGINXPlusAction.get_upstreams:type_name -> mpi.v1.GetUpstreams + 28, // 34: mpi.v1.NGINXPlusAction.get_stream_upstreams:type_name -> mpi.v1.GetStreamUpstreams + 49, // 35: mpi.v1.UpdateHTTPUpstreamServers.servers:type_name -> google.protobuf.Struct + 49, // 36: mpi.v1.UpdateStreamServers.servers:type_name -> google.protobuf.Struct + 31, // 37: mpi.v1.Instance.instance_meta:type_name -> mpi.v1.InstanceMeta + 32, // 38: mpi.v1.Instance.instance_config:type_name -> mpi.v1.InstanceConfig + 33, // 39: mpi.v1.Instance.instance_runtime:type_name -> mpi.v1.InstanceRuntime + 2, // 40: mpi.v1.InstanceMeta.instance_type:type_name -> mpi.v1.InstanceMeta.InstanceType + 39, // 41: mpi.v1.InstanceConfig.actions:type_name -> mpi.v1.InstanceAction + 40, // 42: mpi.v1.InstanceConfig.agent_config:type_name -> mpi.v1.AgentConfig + 35, // 43: mpi.v1.InstanceRuntime.nginx_runtime_info:type_name -> mpi.v1.NGINXRuntimeInfo + 36, // 44: mpi.v1.InstanceRuntime.nginx_plus_runtime_info:type_name -> mpi.v1.NGINXPlusRuntimeInfo + 38, // 45: mpi.v1.InstanceRuntime.nginx_app_protect_runtime_info:type_name -> mpi.v1.NGINXAppProtectRuntimeInfo + 34, // 46: mpi.v1.InstanceRuntime.instance_children:type_name -> mpi.v1.InstanceChild + 37, // 47: mpi.v1.NGINXRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails + 37, // 48: mpi.v1.NGINXPlusRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails + 37, // 49: mpi.v1.NGINXPlusRuntimeInfo.plus_api:type_name -> mpi.v1.APIDetails + 42, // 50: mpi.v1.AgentConfig.command:type_name -> mpi.v1.CommandServer + 44, // 51: mpi.v1.AgentConfig.metrics:type_name -> mpi.v1.MetricsServer + 45, // 52: mpi.v1.AgentConfig.file:type_name -> mpi.v1.FileServer + 49, // 53: mpi.v1.AgentConfig.labels:type_name -> google.protobuf.Struct + 43, // 54: mpi.v1.AgentConfig.auxiliary_command:type_name -> mpi.v1.AuxiliaryCommandServer + 41, // 55: mpi.v1.AgentConfig.log:type_name -> mpi.v1.Log + 3, // 56: mpi.v1.Log.log_level:type_name -> mpi.v1.Log.LogLevel + 50, // 57: mpi.v1.CommandServer.server:type_name -> mpi.v1.ServerSettings + 51, // 58: mpi.v1.CommandServer.auth:type_name -> mpi.v1.AuthSettings + 52, // 59: mpi.v1.CommandServer.tls:type_name -> mpi.v1.TLSSettings + 50, // 60: mpi.v1.AuxiliaryCommandServer.server:type_name -> mpi.v1.ServerSettings + 51, // 61: mpi.v1.AuxiliaryCommandServer.auth:type_name -> mpi.v1.AuthSettings + 52, // 62: mpi.v1.AuxiliaryCommandServer.tls:type_name -> mpi.v1.TLSSettings + 4, // 63: mpi.v1.CommandService.CreateConnection:input_type -> mpi.v1.CreateConnectionRequest + 10, // 64: mpi.v1.CommandService.UpdateDataPlaneStatus:input_type -> mpi.v1.UpdateDataPlaneStatusRequest + 14, // 65: mpi.v1.CommandService.UpdateDataPlaneHealth:input_type -> mpi.v1.UpdateDataPlaneHealthRequest + 16, // 66: mpi.v1.CommandService.Subscribe:input_type -> mpi.v1.DataPlaneResponse + 9, // 67: mpi.v1.CommandService.CreateConnection:output_type -> mpi.v1.CreateConnectionResponse + 11, // 68: mpi.v1.CommandService.UpdateDataPlaneStatus:output_type -> mpi.v1.UpdateDataPlaneStatusResponse + 15, // 69: mpi.v1.CommandService.UpdateDataPlaneHealth:output_type -> mpi.v1.UpdateDataPlaneHealthResponse + 17, // 70: mpi.v1.CommandService.Subscribe:output_type -> mpi.v1.ManagementPlaneRequest + 67, // [67:71] is the sub-list for method output_type + 63, // [63:67] is the sub-list for method input_type + 63, // [63:63] is the sub-list for extension type_name + 63, // [63:63] is the sub-list for extension extendee + 0, // [0:63] is the sub-list for field type_name } func init() { file_mpi_v1_command_proto_init() } @@ -3309,7 +3394,7 @@ func file_mpi_v1_command_proto_init() { File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_mpi_v1_command_proto_rawDesc), len(file_mpi_v1_command_proto_rawDesc)), - NumEnums: 3, + NumEnums: 4, NumMessages: 42, NumExtensions: 0, NumServices: 1, diff --git a/api/grpc/mpi/v1/command.pb.validate.go b/api/grpc/mpi/v1/command.pb.validate.go index ab7777024d..0cb38eae2b 100644 --- a/api/grpc/mpi/v1/command.pb.validate.go +++ b/api/grpc/mpi/v1/command.pb.validate.go @@ -1832,6 +1832,8 @@ func (m *DataPlaneResponse) validate(all bool) error { // no validation rules for InstanceId + // no validation rules for RequestType + if len(errors) > 0 { return DataPlaneResponseMultiError(errors) } diff --git a/api/grpc/mpi/v1/command.proto b/api/grpc/mpi/v1/command.proto index 0456757ade..d245fdb01b 100644 --- a/api/grpc/mpi/v1/command.proto +++ b/api/grpc/mpi/v1/command.proto @@ -152,12 +152,25 @@ message UpdateDataPlaneHealthResponse {} // Reports the status of an associated command. This may be in response to a ManagementPlaneRequest message DataPlaneResponse { + enum RequestType { + UNSPECIFIED_REQUEST = 0; + CONFIG_APPLY_REQUEST = 1; + CONFIG_UPLOAD_REQUEST = 2; + HEALTH_REQUEST = 3; + STATUS_REQUEST = 4; + API_ACTION_REQUEST = 5; + COMMAND_STATUS_REQUEST = 6; + UPDATE_AGENT_CONFIG_REQUEST = 7; + } + // Meta-information associated with a message mpi.v1.MessageMeta message_meta = 1; // The command response with the associated request mpi.v1.CommandResponse command_response = 2; // The instance identifier, if applicable, for this response string instance_id = 3; + // The management plane request type that is being responded to + RequestType request_type = 4; } // A Management Plane request for information, triggers an associated rpc on the Data Plane diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index b59f47b5af..a4fc36217d 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc (unknown) // source: mpi/v1/common.proto diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index 0223bae3a7..57fdff253a 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.36.10 +// protoc-gen-go v1.36.11 // protoc (unknown) // source: mpi/v1/files.proto diff --git a/docs/proto/protos.md b/docs/proto/protos.md index 18301abd50..8bf916886e 100644 --- a/docs/proto/protos.md +++ b/docs/proto/protos.md @@ -85,6 +85,7 @@ - [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers) - [UpdateStreamServers](#mpi-v1-UpdateStreamServers) + - [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType) - [InstanceHealth.InstanceHealthStatus](#mpi-v1-InstanceHealth-InstanceHealthStatus) - [InstanceMeta.InstanceType](#mpi-v1-InstanceMeta-InstanceType) - [Log.LogLevel](#mpi-v1-Log-LogLevel) @@ -862,6 +863,7 @@ Reports the status of an associated command. This may be in response to a Manage | message_meta | [MessageMeta](#mpi-v1-MessageMeta) | | Meta-information associated with a message | | command_response | [CommandResponse](#mpi-v1-CommandResponse) | | The command response with the associated request | | instance_id | [string](#string) | | The instance identifier, if applicable, for this response | +| request_type | [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType) | | The management plane request type that is being responded to | @@ -1326,6 +1328,24 @@ Update Upstream Stream Servers for an instance + + +### DataPlaneResponse.RequestType + + +| Name | Number | Description | +| ---- | ------ | ----------- | +| UNSPECIFIED_REQUEST | 0 | | +| CONFIG_APPLY_REQUEST | 1 | | +| CONFIG_UPLOAD_REQUEST | 2 | | +| HEALTH_REQUEST | 3 | | +| STATUS_REQUEST | 4 | | +| API_ACTION_REQUEST | 5 | | +| COMMAND_STATUS_REQUEST | 6 | | +| UPDATE_AGENT_CONFIG_REQUEST | 7 | | + + + ### InstanceHealth.InstanceHealthStatus diff --git a/internal/bus/message_pipe.go b/internal/bus/message_pipe.go index b618c99075..38ee2fcab1 100644 --- a/internal/bus/message_pipe.go +++ b/internal/bus/message_pipe.go @@ -206,8 +206,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon // If the agent update was received from a create connection request no data plane response needs to be sent if topic == AgentConfigUpdateTopic { - response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Failed to update agent config", reconfigureError.Error()) + response := p.createDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST, + "Failed to update agent config", + reconfigureError.Error(), + ) p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response}) } @@ -222,8 +227,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon slog.InfoContext(ctx, "Finished reconfiguring plugins", "plugins", p.plugins) if topic == AgentConfigUpdateTopic { - response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Successfully updated agent config", "") + response := p.createDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_OK, + mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST, + "Successfully updated agent config", + "", + ) p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response}) } } @@ -339,7 +349,10 @@ func (p *MessagePipe) initPlugins(ctx context.Context) { } } -func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, +func (p *MessagePipe) createDataPlaneResponse( + correlationID string, + status mpi.CommandResponse_CommandStatus, + requestType mpi.DataPlaneResponse_RequestType, message, err string, ) *mpi.DataPlaneResponse { return &mpi.DataPlaneResponse{ @@ -353,6 +366,7 @@ func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.C Message: message, Error: err, }, + RequestType: requestType, } } diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index d2992f7f72..ba35f01483 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -143,7 +143,6 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro return errors.New("OTel collector already running") } - slog.InfoContext(ctx, "Starting OTel collector") bootErr := oc.bootup(runCtx) if bootErr != nil { slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr) @@ -163,30 +162,7 @@ func (oc *Collector) Info() *bus.Info { func (oc *Collector) Close(ctx context.Context) error { slog.InfoContext(ctx, "Closing OTel Collector plugin") - if !oc.stopped { - slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState()) - oc.service.Shutdown() - oc.cancel() - - settings := *oc.config.Client.Backoff - settings.MaxElapsedTime = maxTimeToWaitForShutdown - err := backoff.WaitUntil(ctx, &settings, func() error { - if oc.service.GetState() == otelcol.StateClosed { - return nil - } - - return errors.New("OTel Collector not in a closed state yet") - }) - - if err != nil { - slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState()) - } else { - slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState()) - oc.stopped = true - } - } - - return nil + return oc.shutdownCollector(ctx) } // Process an incoming Message Bus message in the plugin @@ -236,6 +212,33 @@ func (oc *Collector) Reconfigure(ctx context.Context, agentConfig *config.Config return nil } +func (oc *Collector) shutdownCollector(ctx context.Context) error { + if !oc.stopped { + slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState()) + oc.service.Shutdown() + oc.cancel() + + settings := *oc.config.Client.Backoff + settings.MaxElapsedTime = maxTimeToWaitForShutdown + err := backoff.WaitUntil(ctx, &settings, func() error { + if oc.service.GetState() == otelcol.StateClosed { + return nil + } + + return errors.New("OTel Collector not in a closed state yet") + }) + + if err != nil { + slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState()) + } else { + slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState()) + oc.stopped = true + } + } + + return nil +} + // Process receivers and log warning for sub-optimal configurations func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) { for _, receiver := range receivers { @@ -275,11 +278,12 @@ func (oc *Collector) bootup(ctx context.Context) error { oc.setProxyIfNeeded(ctx) } + slog.InfoContext(ctx, "Starting OTel collector") appErr := oc.service.Run(ctx) if appErr != nil { errChan <- appErr } - slog.InfoContext(ctx, "OTel collector run finished") + slog.InfoContext(ctx, "OTel collector has stopped running") }() for { @@ -453,7 +457,7 @@ func (oc *Collector) writeRunningConfig(ctx context.Context, settings otelcol.Co } func (oc *Collector) restartCollector(ctx context.Context) { - err := oc.Close(ctx) + err := oc.shutdownCollector(ctx) if err != nil { slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err) return diff --git a/internal/collector/settings.go b/internal/collector/settings.go index 3346a0df94..9445a871c4 100644 --- a/internal/collector/settings.go +++ b/internal/collector/settings.go @@ -96,20 +96,20 @@ func createFile(confPath string) error { // Generates an OTel Collector config to a file by injecting the Metrics Config to a Go template. func writeCollectorConfig(conf *config.Collector) error { + confPath := filepath.Clean(conf.ConfigPath) + + slog.Info("Writing OTel collector config", "config_path", confPath) + if conf.Processors.Resource["default"] != nil { addDefaultResourceProcessor(conf.Pipelines.Metrics) addDefaultResourceProcessor(conf.Pipelines.Logs) } - slog.Info("Writing OTel collector config") - otelcolTemplate, templateErr := template.New(otelTemplatePath).Parse(otelcolTemplate) if templateErr != nil { return templateErr } - confPath := filepath.Clean(conf.ConfigPath) - // Ensure file exists and has correct permissions if err := ensureFileExists(confPath); err != nil { return err diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go index feb1c3b6d2..a27104b128 100644 --- a/internal/command/command_plugin.go +++ b/internal/command/command_plugin.go @@ -217,14 +217,24 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me cp.processDataPlaneResponse(ctx, &bus.Message{ Topic: bus.DataPlaneResponseTopic, - Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Failed to send the health status update", err.Error()), + Data: cp.createDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_FAILURE, + mpi.DataPlaneResponse_HEALTH_REQUEST, + "Failed to send the health status update", + err.Error(), + ), }) } cp.processDataPlaneResponse(ctx, &bus.Message{ Topic: bus.DataPlaneResponseTopic, - Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Successfully sent health status update", ""), + Data: cp.createDataPlaneResponse( + correlationID, + mpi.CommandResponse_COMMAND_STATUS_OK, + mpi.DataPlaneResponse_HEALTH_REQUEST, + "Successfully sent health status update", + "", + ), }) } } @@ -242,8 +252,25 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) { slog.DebugContext(ctx, "Command plugin received data plane response message") if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok { - slog.InfoContext(ctx, "Sending data plane response message", "message", - response.GetCommandResponse().GetMessage(), "status", response.GetCommandResponse().GetStatus()) + // To prevent this type of request from spamming the logs too much, we use debug level + if response.GetRequestType() != mpi.DataPlaneResponse_HEALTH_REQUEST { + slog.InfoContext( + ctx, + "Sending data plane response message", + "message", response.GetCommandResponse().GetMessage(), + "status", response.GetCommandResponse().GetStatus(), + "error", response.GetCommandResponse().GetError(), + ) + } else { + slog.DebugContext( + ctx, + "Sending data plane response message", + "message", response.GetCommandResponse().GetMessage(), + "status", response.GetCommandResponse().GetStatus(), + "error", response.GetCommandResponse().GetError(), + ) + } + err := cp.commandService.SendDataPlaneResponse(ctx, response) if err != nil { slog.ErrorContext(ctx, "Unable to send data plane response", "error", err) @@ -318,7 +345,8 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) { slog.InfoContext(ctx, "Received management plane config apply request") cp.handleConfigApplyRequest(newCtx, message) case *mpi.ManagementPlaneRequest_HealthRequest: - slog.InfoContext(ctx, "Received management plane health request") + // To prevent this type of request from spamming the logs too much, we use debug level + slog.DebugContext(ctx, "Received management plane health request") cp.handleHealthRequest(newCtx) case *mpi.ManagementPlaneRequest_ActionRequest: if cp.commandServerType != model.Command { @@ -445,7 +473,10 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context, } } -func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, +func (cp *CommandPlugin) createDataPlaneResponse( + correlationID string, + status mpi.CommandResponse_CommandStatus, + requestType mpi.DataPlaneResponse_RequestType, message, err string, ) *mpi.DataPlaneResponse { return &mpi.DataPlaneResponse{ @@ -459,5 +490,6 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp Message: message, Error: err, }, + RequestType: requestType, } } diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index ef8782614c..bc2291f5f8 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -434,6 +434,7 @@ func Test_createDataPlaneResponse(t *testing.T) { commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command) result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(), expected.GetCommandResponse().GetStatus(), + expected.GetRequestType(), expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError()) assert.Equal(t, expected.GetCommandResponse(), result.GetCommandResponse()) diff --git a/internal/command/command_service.go b/internal/command/command_service.go index 61daf603fa..f73a85c5c8 100644 --- a/internal/command/command_service.go +++ b/internal/command/command_service.go @@ -524,7 +524,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e return nil } - slog.ErrorContext(ctx, "Failed to"+errorMsg, "error", err) + slog.ErrorContext(ctx, "Failed to "+errorMsg, "error", err) return err } diff --git a/internal/datasource/proto/response.go b/internal/datasource/proto/response.go index f2c778a7f8..38276e2ef4 100644 --- a/internal/datasource/proto/response.go +++ b/internal/datasource/proto/response.go @@ -11,8 +11,11 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" ) -func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, - message, instanceID, err string, +func CreateDataPlaneResponse( + correlationID string, + commandResponse *mpi.CommandResponse, + requestType mpi.DataPlaneResponse_RequestType, + instanceID string, ) *mpi.DataPlaneResponse { return &mpi.DataPlaneResponse{ MessageMeta: &mpi.MessageMeta{ @@ -20,11 +23,8 @@ func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_Co CorrelationId: correlationID, Timestamp: timestamppb.Now(), }, - CommandResponse: &mpi.CommandResponse{ - Status: status, - Message: message, - Error: err, - }, - InstanceId: instanceID, + CommandResponse: commandResponse, + InstanceId: instanceID, + RequestType: requestType, } } diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go index 08fc0f2d2c..92eb51240d 100644 --- a/internal/file/file_manager_service.go +++ b/internal/file/file_manager_service.go @@ -220,13 +220,14 @@ func (fms *FileManagerService) ClearCache() { //nolint:revive,cyclop // cognitive-complexity of 13 max is 12, loop is needed cant be broken up func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error { - slog.InfoContext(ctx, "Rolling back config for instance", "instance_id", instanceID) + slog.InfoContext(ctx, "Rolling back config apply updates", "instance_id", instanceID) fms.filesMutex.Lock() defer fms.filesMutex.Unlock() for _, fileAction := range fms.fileActions { switch fileAction.Action { case model.Add: + slog.InfoContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName()) if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) { return fmt.Errorf("error deleting file: %s error: %w", fileAction.File.GetFileMeta().GetName(), err) } @@ -236,7 +237,7 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) continue case model.Delete, model.Update: - content, err := fms.restoreFiles(fileAction) + content, err := fms.restoreFiles(ctx, fileAction) if err != nil { return err } @@ -276,9 +277,15 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context, slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError) } - slog.InfoContext(ctx, "Updating overview after nginx config update") - err := fms.fileServiceOperator.UpdateOverview(ctx, nginxConfigContext.InstanceID, - nginxConfigContext.Files, nginxConfigContext.ConfigPath, 0) + slog.InfoContext(ctx, "Sending file overview update due to NGINX configuration updates") + + err := fms.fileServiceOperator.UpdateOverview( + ctx, + nginxConfigContext.InstanceID, + nginxConfigContext.Files, + nginxConfigContext.ConfigPath, + 0, + ) if err != nil { slog.ErrorContext( ctx, @@ -287,6 +294,7 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context, "error", err, ) } + slog.InfoContext(ctx, "Finished updating file overview") } @@ -545,7 +553,7 @@ func (fms *FileManagerService) backupFiles(ctx context.Context) error { return nil } -func (fms *FileManagerService) restoreFiles(fileAction *model.FileCache) ([]byte, error) { +func (fms *FileManagerService) restoreFiles(ctx context.Context, fileAction *model.FileCache) ([]byte, error) { fileMeta := fileAction.File.GetFileMeta() fileName := fileMeta.GetName() @@ -556,6 +564,8 @@ func (fms *FileManagerService) restoreFiles(fileAction *model.FileCache) ([]byte return nil, fmt.Errorf("failed to create directories for %s: %w", fileName, err) } + slog.InfoContext(ctx, "Restoring file from it's backup", "file", fileName, "backup_file", tempFilePath) + moveErr := os.Rename(tempFilePath, fileName) if moveErr != nil { return nil, fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr) @@ -623,7 +633,7 @@ func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Co } if len(downloadFiles) == 0 { - slog.DebugContext(ctx, "No updated files to download") + slog.InfoContext(ctx, "No files require downloading") return nil } @@ -634,7 +644,7 @@ func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Co errGroup.Go(func() error { tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName()) - slog.DebugContext( + slog.InfoContext( errGroupCtx, "Downloading file to temp location", "file", tempFilePath, @@ -652,7 +662,7 @@ actionsLoop: for _, fileAction := range fms.fileActions { switch fileAction.Action { case model.Delete: - slog.DebugContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName()) + slog.InfoContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName()) if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) { actionError = fmt.Errorf("error deleting file: %s error: %w", fileAction.File.GetFileMeta().GetName(), err) diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go index 4aa05be346..dc464e0327 100644 --- a/internal/file/file_plugin.go +++ b/internal/file/file_plugin.go @@ -243,13 +243,21 @@ func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *b err := fp.fileManagerService.Rollback(ctx, data.InstanceID) if err != nil { - rollbackResponse := fp.createDataPlaneResponse(data.CorrelationID, + rollbackResponse := fp.createDataPlaneResponse( + data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Rollback failed", data.InstanceID, err.Error()) + "Rollback failed", + data.InstanceID, + err.Error(), + ) - applyResponse := fp.createDataPlaneResponse(data.CorrelationID, + applyResponse := fp.createDataPlaneResponse( + data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback failed", data.InstanceID, data.Error.Error()) + "Config apply failed, rollback failed", + data.InstanceID, + data.Error.Error(), + ) fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) @@ -346,7 +354,8 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback failed", instanceID, - rollbackErr.Error()) + rollbackErr.Error(), + ) fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: rollbackResponse}) @@ -358,7 +367,8 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback successful", instanceID, - err.Error()) + err.Error(), + ) fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response}) @@ -416,6 +426,8 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me Status: mpi.CommandResponse_COMMAND_STATUS_OK, Message: "Successfully updated all files", }, + InstanceId: configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(), + RequestType: mpi.DataPlaneResponse_CONFIG_UPLOAD_REQUEST, } if updatingFilesError != nil { @@ -442,7 +454,9 @@ func (fp *FilePlugin) handleAgentConfigUpdate(ctx context.Context, msg *bus.Mess fp.config = agentConfig } -func (fp *FilePlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus, +func (fp *FilePlugin) createDataPlaneResponse( + correlationID string, + status mpi.CommandResponse_CommandStatus, message, instanceID, err string, ) *mpi.DataPlaneResponse { return &mpi.DataPlaneResponse{ @@ -456,6 +470,7 @@ func (fp *FilePlugin) createDataPlaneResponse(correlationID string, status mpi.C Message: message, Error: err, }, - InstanceId: instanceID, + InstanceId: instanceID, + RequestType: mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, } } diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go index 6fce049b39..70c61129ea 100644 --- a/internal/file/file_service_operator.go +++ b/internal/file/file_service_operator.go @@ -265,9 +265,16 @@ func (fso *FileServiceOperator) UpdateFile( instanceID string, fileToUpdate *mpi.File, ) error { - slog.InfoContext(ctx, "Updating file", "file_name", fileToUpdate.GetFileMeta().GetName(), "instance_id", instanceID) + slog.InfoContext( + ctx, + "Sending file updates", + "file_name", fileToUpdate.GetFileMeta().GetName(), + "instance_id", instanceID, + ) - slog.DebugContext(ctx, "Checking file size", + slog.DebugContext( + ctx, + "Checking file size", "file_size", fileToUpdate.GetFileMeta().GetSize(), "max_file_size", int64(fso.agentConfig.Client.Grpc.MaxFileSize), ) @@ -283,7 +290,7 @@ func (fso *FileServiceOperator) UpdateFile( func (fso *FileServiceOperator) RenameFile( ctx context.Context, hash, source, desination string, ) error { - slog.DebugContext(ctx, fmt.Sprintf("Renaming file %s to %s", source, desination)) + slog.InfoContext(ctx, fmt.Sprintf("Renaming file %s to %s", source, desination)) // Create parent directories for the target file if they don't exist if err := os.MkdirAll(filepath.Dir(desination), dirPerm); err != nil { @@ -332,7 +339,7 @@ func (fso *FileServiceOperator) updateFiles( } iteration++ - slog.InfoContext(ctx, "Updating file overview after file updates", "attempt_number", iteration) + slog.InfoContext(ctx, "Sending file overview updates", "attempt_number", iteration) return fso.UpdateOverview(ctx, instanceID, diffFiles, configPath, iteration) } diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go index 4c044d2fb1..2f369ab956 100644 --- a/internal/grpc/grpc.go +++ b/internal/grpc/grpc.go @@ -83,7 +83,7 @@ func NewGrpcConnection(ctx context.Context, agentConfig *config.Config, serverAddr := serverAddress(ctx, commandConfig) - slog.InfoContext(ctx, "Dialing grpc server", "server_addr", serverAddr) + slog.InfoContext(ctx, "Creating connection to management plane server", "server_address", serverAddr) var err error info := host.NewInfo() @@ -217,7 +217,12 @@ func DialOptions(agentConfig *config.Config, commandConfig *config.Command, reso // Proxy support: If proxy config exists, use HTTP CONNECT dialer if commandConfig.Server.Proxy != nil && commandConfig.Server.Proxy.URL != "" { opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) { - slog.InfoContext(ctx, "Dialing grpc server via proxy") + slog.InfoContext( + ctx, + "Creating connection to management plane server via proxy", + "proxy_url", commandConfig.Server.Proxy.URL, + ) + return DialViaHTTPProxy(ctx, commandConfig.Server.Proxy, addr) })) } diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go index fde011d8ba..cb27070c00 100644 --- a/internal/plugin/plugin_manager.go +++ b/internal/plugin/plugin_manager.go @@ -10,6 +10,7 @@ import ( "log/slog" "sync" + "github.com/nginx/agent/v3/internal/logger" "github.com/nginx/agent/v3/internal/model" pkg "github.com/nginx/agent/v3/pkg/config" @@ -50,9 +51,14 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo manifestLock *sync.RWMutex, ) []bus.Plugin { if agentConfig.IsCommandGrpcClientConfigured() { - grpcConnection, err := grpc.NewGrpcConnection(ctx, agentConfig, agentConfig.Command) + newCtx := context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, model.Command), + ) + + grpcConnection, err := grpc.NewGrpcConnection(newCtx, agentConfig, agentConfig.Command) if err != nil { - slog.WarnContext(ctx, "Failed to create gRPC connection for command server", "error", err) + slog.WarnContext(newCtx, "Failed to create gRPC connection for command server", "error", err) } else { commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, model.Command) plugins = append(plugins, commandPlugin) @@ -71,9 +77,14 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin agentConfig *config.Config, manifestLock *sync.RWMutex, ) []bus.Plugin { if agentConfig.IsAuxiliaryCommandGrpcClientConfigured() { - auxGRPCConnection, err := grpc.NewGrpcConnection(ctx, agentConfig, agentConfig.AuxiliaryCommand) + newCtx := context.WithValue( + ctx, + logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, model.Auxiliary), + ) + + auxGRPCConnection, err := grpc.NewGrpcConnection(newCtx, agentConfig, agentConfig.AuxiliaryCommand) if err != nil { - slog.WarnContext(ctx, "Failed to create gRPC connection for auxiliary command server", "error", err) + slog.WarnContext(newCtx, "Failed to create gRPC connection for auxiliary command server", "error", err) } else { auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, model.Auxiliary) plugins = append(plugins, auxCommandPlugin) diff --git a/internal/resource/nginx_instance_operator.go b/internal/resource/nginx_instance_operator.go index 2fa4c226cc..b1d4106cff 100644 --- a/internal/resource/nginx_instance_operator.go +++ b/internal/resource/nginx_instance_operator.go @@ -43,7 +43,7 @@ func NewInstanceOperator(agentConfig *config.Config) *NginxInstanceOperator { } func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Instance) error { - slog.DebugContext(ctx, "Validating NGINX config") + slog.InfoContext(ctx, "Validating NGINX configuration") exePath := instance.GetInstanceRuntime().GetBinaryPath() out, err := i.executer.RunCmd(ctx, exePath, "-t") @@ -56,7 +56,7 @@ func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Inst return err } - slog.InfoContext(ctx, "NGINX config tested", "output", out) + slog.InfoContext(ctx, "NGINX configuration tested", "output", out) return nil } @@ -66,8 +66,7 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan var errorsFound error pid := instance.GetInstanceRuntime().GetProcessId() - slog.InfoContext(ctx, "Reloading NGINX PID", "pid", - pid) + slog.InfoContext(ctx, "Reloading NGINX master process", "pid", pid) workers := i.nginxProcessOperator.NginxWorkerProcesses(ctx, pid) @@ -95,25 +94,25 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan i.checkWorkers(ctx, instance.GetInstanceMeta().GetInstanceId(), createdTime, processes) } - slog.InfoContext(ctx, "NGINX reloaded", "process_id", pid) + slog.InfoContext(ctx, "Finished reloading NGINX master process", "process_id", pid) numberOfExpectedMessages := len(errorLogs) for range numberOfExpectedMessages { logErr := <-logErrorChannel - slog.InfoContext(ctx, "Message received in logErrorChannel", "error", logErr) if logErr != nil { errorsFound = errors.Join(errorsFound, logErr) - slog.InfoContext(ctx, "Errors Found", "", errorsFound) } } - slog.InfoContext(ctx, "Finished monitoring post reload") + slog.InfoContext(ctx, "Finished monitoring NGINX error logs after reload") if errorsFound != nil { return errorsFound } + slog.InfoContext(ctx, "No errors found in NGINX error logs after reload") + return nil } @@ -128,7 +127,7 @@ func (i *NginxInstanceOperator) checkWorkers(ctx context.Context, instanceID str Multiplier: i.agentConfig.DataPlaneConfig.Nginx.ReloadBackoff.Multiplier, } - slog.DebugContext(ctx, "Waiting for NGINX to finish reloading") + slog.InfoContext(ctx, "Waiting for NGINX worker processes to be reloaded") newPid, findErr := i.nginxProcessOperator.FindParentProcessID(ctx, instanceID, processes, i.executer) if findErr != nil { @@ -153,12 +152,14 @@ func (i *NginxInstanceOperator) checkWorkers(ctx context.Context, instanceID str } } - return fmt.Errorf("waiting for NGINX worker to be newer "+ - "than %v", createdTime) + return fmt.Errorf("waiting for NGINX worker to be newer than %v", createdTime) }) if err != nil { - slog.WarnContext(ctx, "Failed to check if NGINX worker processes have successfully reloaded, "+ - "timed out waiting", "error", err) + slog.WarnContext( + ctx, + "Failed to check if NGINX worker processes have successfully reloaded, timed out waiting", + "error", err, + ) return } @@ -197,6 +198,7 @@ func (i *NginxInstanceOperator) monitorLogs(ctx context.Context, errorLogs []str } for _, errorLog := range errorLogs { + slog.InfoContext(ctx, "Starting to monitor NGINX error log for errors", "log_file", errorLog) go i.logTailer.Tail(ctx, errorLog, errorChannel) } } diff --git a/internal/resource/nginx_plus_actions.go b/internal/resource/nginx_plus_actions.go index a0b949f04a..33db5f2edf 100644 --- a/internal/resource/nginx_plus_actions.go +++ b/internal/resource/nginx_plus_actions.go @@ -21,82 +21,138 @@ type APIAction struct { ResourceService resourceServiceInterface } -func (a *APIAction) HandleUpdateStreamServersRequest(ctx context.Context, action *mpi.NGINXPlusAction, +func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context, instance *mpi.Instance, ) *mpi.DataPlaneResponse { - correlationID := logger.CorrelationID(ctx) - instanceID := instance.GetInstanceMeta().GetInstanceId() - - add, update, del, err := a.ResourceService.UpdateStreamServers(ctx, instance, - action.GetUpdateStreamServers().GetUpstreamStreamName(), action.GetUpdateStreamServers().GetServers()) - if err != nil { - slog.ErrorContext(ctx, "Unable to update stream servers of upstream", "request", - action.GetUpdateHttpUpstreamServers(), "error", err) - - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, err.Error()) - } - - slog.DebugContext(ctx, "Successfully updated stream upstream servers", "http_upstream_name", - action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update), - "delete", len(del)) + return a.handleUpstreamGetRequest( + ctx, + instance, + func(ctx context.Context, instance *mpi.Instance) (interface{}, error) { + return a.ResourceService.GetStreamUpstreams(ctx, instance) + }, + "Unable to get stream upstreams", + ) +} - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Successfully updated stream upstream servers", instanceID, "") +func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse { + return a.handleUpstreamGetRequest( + ctx, + instance, + func(ctx context.Context, instance *mpi.Instance) (interface{}, error) { + return a.ResourceService.GetUpstreams(ctx, instance) + }, + "Unable to get upstreams", + ) } -func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context, +func (a *APIAction) HandleGetHTTPUpstreamsServersRequest( + ctx context.Context, + action *mpi.NGINXPlusAction, instance *mpi.Instance, ) *mpi.DataPlaneResponse { correlationID := logger.CorrelationID(ctx) instanceID := instance.GetInstanceMeta().GetInstanceId() - streamUpstreamsResponse := emptyResponse + upstreamsResponse := emptyResponse - streamUpstreams, err := a.ResourceService.GetStreamUpstreams(ctx, instance) + upstreams, err := a.ResourceService.GetHTTPUpstreamServers( + ctx, + instance, + action.GetGetHttpUpstreamServers().GetHttpUpstreamName(), + ) if err != nil { - slog.ErrorContext(ctx, "Unable to get stream upstreams", "error", err) - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, err.Error()) + slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err) + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "", + Error: err.Error(), + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } - if streamUpstreams != nil { - streamUpstreamsJSON, jsonErr := json.Marshal(streamUpstreams) + if upstreams != nil { + upstreamsJSON, jsonErr := json.Marshal(upstreams) if jsonErr != nil { - slog.ErrorContext(ctx, "Unable to marshal stream upstreams", "err", err) + slog.ErrorContext(ctx, "Unable to marshal http upstreams", "error", jsonErr) } - streamUpstreamsResponse = string(streamUpstreamsJSON) + upstreamsResponse = string(upstreamsJSON) } - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - streamUpstreamsResponse, instanceID, "") + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: upstreamsResponse, + Error: "", + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } -func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse { +//nolint:dupl // Having common code duplicated for clarity and ease of maintenance +func (a *APIAction) HandleUpdateStreamServersRequest( + ctx context.Context, + action *mpi.NGINXPlusAction, + instance *mpi.Instance, +) *mpi.DataPlaneResponse { correlationID := logger.CorrelationID(ctx) instanceID := instance.GetInstanceMeta().GetInstanceId() - upstreamsResponse := emptyResponse - upstreams, err := a.ResourceService.GetUpstreams(ctx, instance) + add, update, del, err := a.ResourceService.UpdateStreamServers( + ctx, + instance, + action.GetUpdateStreamServers().GetUpstreamStreamName(), + action.GetUpdateStreamServers().GetServers(), + ) if err != nil { - slog.InfoContext(ctx, "Unable to get upstreams", "error", err) - - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, err.Error()) + slog.ErrorContext( + ctx, + "Unable to update stream servers of upstream", + "request", action.GetUpdateStreamServers(), + "error", err, + ) + + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "", + Error: err.Error(), + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } - if upstreams != nil { - upstreamsJSON, jsonErr := json.Marshal(upstreams) - if jsonErr != nil { - slog.ErrorContext(ctx, "Unable to marshal upstreams", "err", err) - } - upstreamsResponse = string(upstreamsJSON) - } - - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - upstreamsResponse, instanceID, "") + slog.DebugContext( + ctx, + "Successfully updated stream upstream servers", + "http_upstream_name", action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), + "add", len(add), + "update", len(update), + "delete", len(del), + ) + + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Successfully updated stream upstream servers", + Error: "", + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } -func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action *mpi.NGINXPlusAction, +//nolint:dupl // Having common code duplicated for clarity and ease of maintenance +func (a *APIAction) HandleUpdateHTTPUpstreamsRequest( + ctx context.Context, + action *mpi.NGINXPlusAction, instance *mpi.Instance, ) *mpi.DataPlaneResponse { correlationID := logger.CorrelationID(ctx) @@ -106,44 +162,88 @@ func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), action.GetUpdateHttpUpstreamServers().GetServers()) if err != nil { - slog.ErrorContext(ctx, "Unable to update HTTP servers of upstream", "request", - action.GetUpdateHttpUpstreamServers(), "error", err) - - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, err.Error()) + slog.ErrorContext( + ctx, + "Unable to update HTTP servers of upstream", + "request", action.GetUpdateHttpUpstreamServers(), + "error", err, + ) + + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "", + Error: err.Error(), + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } - slog.DebugContext(ctx, "Successfully updated http upstream servers", "http_upstream_name", - action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update), - "delete", len(del)) - - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Successfully updated HTTP Upstreams", instanceID, "") + slog.DebugContext( + ctx, + "Successfully updated http upstream servers", + "http_upstream_name", action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), + "add", len(add), + "update", len(update), + "delete", len(del), + ) + + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Successfully updated HTTP Upstreams", + Error: "", + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } -func (a *APIAction) HandleGetHTTPUpstreamsServersRequest(ctx context.Context, action *mpi.NGINXPlusAction, +// handleUpstreamGetRequest is a generic helper function to handle GET requests for API actions +func (a *APIAction) handleUpstreamGetRequest( + ctx context.Context, instance *mpi.Instance, + getData func(context.Context, *mpi.Instance) (interface{}, error), + errorMsg string, ) *mpi.DataPlaneResponse { correlationID := logger.CorrelationID(ctx) instanceID := instance.GetInstanceMeta().GetInstanceId() - upstreamsResponse := emptyResponse + jsonResponse := emptyResponse - upstreams, err := a.ResourceService.GetHTTPUpstreamServers(ctx, instance, - action.GetGetHttpUpstreamServers().GetHttpUpstreamName()) + data, err := getData(ctx, instance) if err != nil { - slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err) - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, err.Error()) + slog.ErrorContext(ctx, errorMsg, "error", err) + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "", + Error: err.Error(), + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } - if upstreams != nil { - upstreamsJSON, jsonErr := json.Marshal(upstreams) + if data != nil { + dataJSON, jsonErr := json.Marshal(data) if jsonErr != nil { - slog.ErrorContext(ctx, "Unable to marshal http upstreams", "err", err) + slog.ErrorContext(ctx, "Unable to marshal data", "error", jsonErr) } - upstreamsResponse = string(upstreamsJSON) + jsonResponse = string(dataJSON) } - return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK, - upstreamsResponse, instanceID, "") + return response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: jsonResponse, + Error: "", + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) } diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go index fcb0138f62..27c33cda7e 100644 --- a/internal/resource/resource_plugin.go +++ b/internal/resource/resource_plugin.go @@ -8,7 +8,6 @@ package resource import ( "context" "errors" - "fmt" "log/slog" "sync" @@ -189,9 +188,16 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi } if instance == nil { slog.ErrorContext(ctx, "Unable to find instance with ID", "id", instanceID) - resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, fmt.Sprintf("failed to preform API "+ - "action, could not find instance with ID: %s", instanceID)) + resp := response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "", + Error: "failed to preform API action, could not find instance with ID: " + instanceID, + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) @@ -200,8 +206,16 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi if instance.GetInstanceMeta().GetInstanceType() != mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS { slog.ErrorContext(ctx, "Failed to preform API action", "error", errors.New("instance is not NGINX Plus")) - resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "", instanceID, "failed to preform API action, instance is not NGINX Plus") + resp := response.CreateDataPlaneResponse( + correlationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "", + Error: "failed to preform API action, instance is not NGINX Plus", + }, + mpi.DataPlaneResponse_API_ACTION_REQUEST, + instanceID, + ) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp}) @@ -245,19 +259,40 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes configContext, err := r.resourceService.ApplyConfig(ctx, data.InstanceID) if err != nil { data.Error = err - slog.ErrorContext(ctx, "errors found during config apply, "+ - "sending error status, rolling back config", "err", err) - dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR, - "Config apply failed, rolling back config", data.InstanceID, err.Error()) - r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) + slog.ErrorContext( + ctx, + "Errors found during config apply, sending error status and rolling back configuration updates", + "error", err, + ) + + dpResponse := response.CreateDataPlaneResponse( + data.CorrelationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_ERROR, + Message: "Config apply failed, rolling back config", + Error: err.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + data.InstanceID, + ) + + r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse}) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data}) return } - dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_OK, - "Config apply successful", data.InstanceID, "") + dpResponse := response.CreateDataPlaneResponse( + data.CorrelationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_OK, + Message: "Config apply successful", + Error: "", + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + data.InstanceID, + ) successMessage := &model.ReloadSuccess{ ConfigContext: configContext, @@ -277,14 +312,29 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { } _, err := r.resourceService.ApplyConfig(ctx, data.InstanceID) if err != nil { - slog.ErrorContext(ctx, "errors found during rollback, sending failure status", "err", err) - - rollbackResponse := response.CreateDataPlaneResponse(data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_ERROR, "Rollback failed", data.InstanceID, err.Error()) - - applyResponse := response.CreateDataPlaneResponse(data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback failed", - data.InstanceID, data.Error.Error()) + slog.ErrorContext(ctx, "Errors found during rollback, sending failure status", "error", err) + + rollbackResponse := response.CreateDataPlaneResponse( + data.CorrelationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_ERROR, + Message: "Rollback failed", + Error: err.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + data.InstanceID, + ) + + applyResponse := response.CreateDataPlaneResponse( + data.CorrelationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "Config apply failed, rollback failed", + Error: data.Error.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + data.InstanceID, + ) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse}) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) @@ -292,9 +342,16 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) { return } - applyResponse := response.CreateDataPlaneResponse(data.CorrelationID, - mpi.CommandResponse_COMMAND_STATUS_FAILURE, - "Config apply failed, rollback successful", data.InstanceID, data.Error.Error()) + applyResponse := response.CreateDataPlaneResponse( + data.CorrelationID, + &mpi.CommandResponse{ + Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE, + Message: "Config apply failed, rollback successful", + Error: data.Error.Error(), + }, + mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST, + data.InstanceID, + ) r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse}) } diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go index 00bff24e53..2ae6ede88e 100644 --- a/internal/resource/resource_service.go +++ b/internal/resource/resource_service.go @@ -256,8 +256,9 @@ func (r *ResourceService) GetHTTPUpstreamServers(ctx context.Context, instance * } servers, getServersErr := plusClient.GetHTTPServers(ctx, upstream) - - slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetHTTPUpstreamServers", "err", getServersErr) + if getServersErr != nil { + slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetHTTPUpstreamServers", "error", getServersErr) + } return servers, createPlusAPIError(getServersErr) } @@ -272,7 +273,9 @@ func (r *ResourceService) GetUpstreams(ctx context.Context, instance *mpi.Instan servers, getUpstreamsErr := plusClient.GetUpstreams(ctx) - slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetUpstreams", "err", getUpstreamsErr) + if getUpstreamsErr != nil { + slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetUpstreams", "error", getUpstreamsErr) + } return servers, createPlusAPIError(getUpstreamsErr) } @@ -287,7 +290,9 @@ func (r *ResourceService) GetStreamUpstreams(ctx context.Context, instance *mpi. streamUpstreams, getServersErr := plusClient.GetStreamUpstreams(ctx) - slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetStreamUpstreams", "err", getServersErr) + if getServersErr != nil { + slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetStreamUpstreams", "error", getServersErr) + } return streamUpstreams, createPlusAPIError(getServersErr) } @@ -308,7 +313,9 @@ func (r *ResourceService) UpdateStreamServers(ctx context.Context, instance *mpi added, updated, deleted, updateError := plusClient.UpdateStreamServers(ctx, upstream, servers) - slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateStreamServers", "err", updateError) + if updateError != nil { + slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateStreamServers", "error", updateError) + } return added, updated, deleted, createPlusAPIError(updateError) } @@ -330,7 +337,7 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers) if updateError != nil { - slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError) + slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "error", updateError) } return added, updated, deleted, createPlusAPIError(updateError) diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go index 96c450031e..9e41dad8b3 100644 --- a/internal/watcher/file/file_watcher_service.go +++ b/internal/watcher/file/file_watcher_service.go @@ -144,7 +144,7 @@ func (fws *FileWatcherService) Update(ctx context.Context, nginxConfigContext *m fws.directoriesToWatch = directoriesToWatch if fws.watcher != nil { - slog.InfoContext(ctx, "Updating file watcher", "allowed", fws.agentConfig.AllowedDirectories) + slog.DebugContext(ctx, "No watcher exists, creating new watcher") // Start watching new directories fws.addWatchers(ctx) diff --git a/internal/watcher/health/health_watcher_service.go b/internal/watcher/health/health_watcher_service.go index 9c40820369..87768f4b1a 100644 --- a/internal/watcher/health/health_watcher_service.go +++ b/internal/watcher/health/health_watcher_service.go @@ -51,7 +51,7 @@ func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService { } } -func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) { +func (hw *HealthWatcherService) AddHealthWatcher(ctx context.Context, instances []*mpi.Instance) { hw.healthWatcherMutex.Lock() defer hw.healthWatcherMutex.Unlock() @@ -66,8 +66,11 @@ func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) { mpi.InstanceMeta_INSTANCE_TYPE_NGINX_APP_PROTECT: fallthrough default: - slog.Warn("Health watcher not implemented", "instance_type", - instance.GetInstanceMeta().GetInstanceType()) + slog.DebugContext( + ctx, + "Health watcher not implemented", + "instance_type", instance.GetInstanceMeta().GetInstanceType(), + ) } hw.instances[instance.GetInstanceMeta().GetInstanceId()] = instance } diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go index b2ed9b78c9..9dc875f01d 100644 --- a/internal/watcher/health/health_watcher_service_test.go +++ b/internal/watcher/health/health_watcher_service_test.go @@ -47,7 +47,7 @@ func TestHealthWatcherService_AddHealthWatcher(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { healthWatcher := NewHealthWatcherService(agentConfig) - healthWatcher.AddHealthWatcher(test.instances) + healthWatcher.AddHealthWatcher(t.Context(), test.instances) if test.numWatchers == 1 { assert.Len(t, healthWatcher.watchers, 1) @@ -66,7 +66,7 @@ func TestHealthWatcherService_DeleteHealthWatcher(t *testing.T) { instance := protos.NginxOssInstance([]string{}) instances := []*mpi.Instance{instance} - healthWatcher.AddHealthWatcher(instances) + healthWatcher.AddHealthWatcher(t.Context(), instances) assert.Len(t, healthWatcher.watchers, 1) healthWatcher.DeleteHealthWatcher(instances) @@ -82,7 +82,7 @@ func TestHealthWatcherService_UpdateHealthWatcher(t *testing.T) { updatedInstance.GetInstanceMeta().InstanceId = instance.GetInstanceMeta().GetInstanceId() instances := []*mpi.Instance{instance} - healthWatcher.AddHealthWatcher(instances) + healthWatcher.AddHealthWatcher(t.Context(), instances) assert.Equal(t, instance, healthWatcher.instances[instance.GetInstanceMeta().GetInstanceId()]) healthWatcher.UpdateHealthWatcher([]*mpi.Instance{updatedInstance}) diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go index 4d00ce70b4..0279375992 100644 --- a/internal/watcher/instance/instance_watcher_service.go +++ b/internal/watcher/instance/instance_watcher_service.go @@ -7,6 +7,7 @@ package instance import ( "context" + "encoding/json" "log/slog" "slices" "sync" @@ -43,13 +44,14 @@ type ( processOperator process.ProcessOperatorInterface nginxConfigParser parser.ConfigParser executer exec.ExecInterface + nginxParser processParser enabled *atomic.Bool agentConfig *config.Config instanceCache map[string]*mpi.Instance nginxConfigCache map[string]*model.NginxConfigContext instancesChannel chan<- InstanceUpdatesMessage nginxConfigContextChannel chan<- NginxConfigContextMessage - nginxParser processParser + processCache []*nginxprocess.Process cacheMutex sync.Mutex } @@ -84,6 +86,7 @@ func NewInstanceWatcherService(agentConfig *config.Config) *InstanceWatcherServi nginxConfigCache: make(map[string]*model.NginxConfigContext), executer: &exec.Exec{}, enabled: enabled, + processCache: []*nginxprocess.Process{}, } } @@ -270,6 +273,18 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) ( return instanceUpdates, err } + if !slices.EqualFunc(iw.processCache, nginxProcesses, func(a, b *nginxprocess.Process) bool { + return a.Equal(b) + }) { + processesJSON, marshalErr := json.Marshal(nginxProcesses) + if marshalErr != nil { + slog.DebugContext(ctx, "Unable to marshal NGINX processes", "error", marshalErr) + } else { + slog.DebugContext(ctx, "NGINX processes changed", "processes", processesJSON) + iw.processCache = nginxProcesses + } + } + // NGINX Agent is always the first instance in the list instancesFound := make(map[string]*mpi.Instance) agentInstance := iw.agentInstance(ctx) diff --git a/internal/watcher/instance/nginx_process_parser.go b/internal/watcher/instance/nginx_process_parser.go index 3da8786b85..85a7276624 100644 --- a/internal/watcher/instance/nginx_process_parser.go +++ b/internal/watcher/instance/nginx_process_parser.go @@ -48,24 +48,12 @@ func NewNginxProcessParser() *NginxProcessParser { // //nolint:revive,gocognit // cognitive complexity of 20 because of the if statements in the for loop func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*nginxprocess.Process) map[string]*mpi.Instance { - slog.DebugContext(ctx, "Parsing NGINX processes", "number_of_processes", len(processes)) - instanceMap := make(map[string]*mpi.Instance) // key is instanceID workers := make(map[int32][]*mpi.InstanceChild) // key is ppid of process processesByPID := convertToMap(processes) for _, proc := range processesByPID { - slog.DebugContext(ctx, "NGINX process details", - "ppid", proc.PPID, - "pid", proc.PID, - "name", proc.Name, - "created", proc.Created, - "status", proc.Status, - "cmd", proc.Cmd, - "exe", proc.Exe, - ) - if proc.IsWorker() { // Here we are determining if the worker process has a master if masterProcess, ok := processesByPID[proc.PPID]; ok { diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go index 8f19f92266..75cdf2ab06 100644 --- a/internal/watcher/watcher_plugin.go +++ b/internal/watcher/watcher_plugin.go @@ -306,7 +306,7 @@ func (w *Watcher) handleCredentialUpdate(ctx context.Context, message credential func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance.InstanceUpdatesMessage) { if len(message.InstanceUpdates.NewInstances) > 0 { slog.DebugContext(newCtx, "New instances found", "instances", message.InstanceUpdates.NewInstances) - w.healthWatcherService.AddHealthWatcher(message.InstanceUpdates.NewInstances) + w.healthWatcherService.AddHealthWatcher(newCtx, message.InstanceUpdates.NewInstances) w.messagePipe.Process( newCtx, &bus.Message{Topic: bus.AddInstancesTopic, Data: message.InstanceUpdates.NewInstances}, diff --git a/pkg/nginxprocess/process.go b/pkg/nginxprocess/process.go index 2224916b08..4c18883fb3 100644 --- a/pkg/nginxprocess/process.go +++ b/pkg/nginxprocess/process.go @@ -18,13 +18,13 @@ import ( type Process struct { // Created is when this process was created, precision varies by platform and is at best to the millisecond. On // linux there can be significant skew compared to [time.Now], ± 1s. - Created time.Time - Name string - Cmd string - Exe string // path to the executable - Status string // process status, only present if this process was created using [WithStatus] - PID int32 - PPID int32 // parent PID + Created time.Time `json:"created"` + Name string `json:"name"` + Cmd string `json:"cmd"` + Exe string `json:"exe"` // path to the executable + Status string `json:"status"` // process status, only present if this process was created using [WithStatus] + PID int32 `json:"pid"` + PPID int32 `json:"ppid"` // parent PID } // IsWorker returns true if the process is a NGINX worker process. @@ -50,6 +50,11 @@ func (p *Process) IsHealthy() bool { return p.Status != "" && !strings.Contains(p.Status, process.Zombie) } +func (p *Process) Equal(b *Process) bool { + return p.PID == b.PID && p.Name == b.Name && p.PPID == b.PPID && p.Cmd == b.Cmd && + p.Exe == b.Exe && p.Created.Equal(b.Created) && p.Status == b.Status +} + type options struct { loadStatus bool }