diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go
index dddaa6b8b5..348f7874a6 100644
--- a/api/grpc/mpi/v1/command.pb.go
+++ b/api/grpc/mpi/v1/command.pb.go
@@ -8,7 +8,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.36.10
+// protoc-gen-go v1.36.11
// protoc (unknown)
// source: mpi/v1/command.proto
@@ -88,6 +88,70 @@ func (InstanceHealth_InstanceHealthStatus) EnumDescriptor() ([]byte, []int) {
return file_mpi_v1_command_proto_rawDescGZIP(), []int{9, 0}
}
+type DataPlaneResponse_RequestType int32
+
+const (
+ DataPlaneResponse_UNSPECIFIED_REQUEST DataPlaneResponse_RequestType = 0
+ DataPlaneResponse_CONFIG_APPLY_REQUEST DataPlaneResponse_RequestType = 1
+ DataPlaneResponse_CONFIG_UPLOAD_REQUEST DataPlaneResponse_RequestType = 2
+ DataPlaneResponse_HEALTH_REQUEST DataPlaneResponse_RequestType = 3
+ DataPlaneResponse_STATUS_REQUEST DataPlaneResponse_RequestType = 4
+ DataPlaneResponse_API_ACTION_REQUEST DataPlaneResponse_RequestType = 5
+ DataPlaneResponse_COMMAND_STATUS_REQUEST DataPlaneResponse_RequestType = 6
+ DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST DataPlaneResponse_RequestType = 7
+)
+
+// Enum value maps for DataPlaneResponse_RequestType.
+var (
+ DataPlaneResponse_RequestType_name = map[int32]string{
+ 0: "UNSPECIFIED_REQUEST",
+ 1: "CONFIG_APPLY_REQUEST",
+ 2: "CONFIG_UPLOAD_REQUEST",
+ 3: "HEALTH_REQUEST",
+ 4: "STATUS_REQUEST",
+ 5: "API_ACTION_REQUEST",
+ 6: "COMMAND_STATUS_REQUEST",
+ 7: "UPDATE_AGENT_CONFIG_REQUEST",
+ }
+ DataPlaneResponse_RequestType_value = map[string]int32{
+ "UNSPECIFIED_REQUEST": 0,
+ "CONFIG_APPLY_REQUEST": 1,
+ "CONFIG_UPLOAD_REQUEST": 2,
+ "HEALTH_REQUEST": 3,
+ "STATUS_REQUEST": 4,
+ "API_ACTION_REQUEST": 5,
+ "COMMAND_STATUS_REQUEST": 6,
+ "UPDATE_AGENT_CONFIG_REQUEST": 7,
+ }
+)
+
+func (x DataPlaneResponse_RequestType) Enum() *DataPlaneResponse_RequestType {
+ p := new(DataPlaneResponse_RequestType)
+ *p = x
+ return p
+}
+
+func (x DataPlaneResponse_RequestType) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (DataPlaneResponse_RequestType) Descriptor() protoreflect.EnumDescriptor {
+ return file_mpi_v1_command_proto_enumTypes[1].Descriptor()
+}
+
+func (DataPlaneResponse_RequestType) Type() protoreflect.EnumType {
+ return &file_mpi_v1_command_proto_enumTypes[1]
+}
+
+func (x DataPlaneResponse_RequestType) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use DataPlaneResponse_RequestType.Descriptor instead.
+func (DataPlaneResponse_RequestType) EnumDescriptor() ([]byte, []int) {
+ return file_mpi_v1_command_proto_rawDescGZIP(), []int{12, 0}
+}
+
// the types of instances possible
type InstanceMeta_InstanceType int32
@@ -137,11 +201,11 @@ func (x InstanceMeta_InstanceType) String() string {
}
func (InstanceMeta_InstanceType) Descriptor() protoreflect.EnumDescriptor {
- return file_mpi_v1_command_proto_enumTypes[1].Descriptor()
+ return file_mpi_v1_command_proto_enumTypes[2].Descriptor()
}
func (InstanceMeta_InstanceType) Type() protoreflect.EnumType {
- return &file_mpi_v1_command_proto_enumTypes[1]
+ return &file_mpi_v1_command_proto_enumTypes[2]
}
func (x InstanceMeta_InstanceType) Number() protoreflect.EnumNumber {
@@ -193,11 +257,11 @@ func (x Log_LogLevel) String() string {
}
func (Log_LogLevel) Descriptor() protoreflect.EnumDescriptor {
- return file_mpi_v1_command_proto_enumTypes[2].Descriptor()
+ return file_mpi_v1_command_proto_enumTypes[3].Descriptor()
}
func (Log_LogLevel) Type() protoreflect.EnumType {
- return &file_mpi_v1_command_proto_enumTypes[2]
+ return &file_mpi_v1_command_proto_enumTypes[3]
}
func (x Log_LogLevel) Number() protoreflect.EnumNumber {
@@ -942,7 +1006,9 @@ type DataPlaneResponse struct {
// The command response with the associated request
CommandResponse *CommandResponse `protobuf:"bytes,2,opt,name=command_response,json=commandResponse,proto3" json:"command_response,omitempty"`
// The instance identifier, if applicable, for this response
- InstanceId string `protobuf:"bytes,3,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"`
+ InstanceId string `protobuf:"bytes,3,opt,name=instance_id,json=instanceId,proto3" json:"instance_id,omitempty"`
+ // The management plane request type that is being responded to
+ RequestType DataPlaneResponse_RequestType `protobuf:"varint,4,opt,name=request_type,json=requestType,proto3,enum=mpi.v1.DataPlaneResponse_RequestType" json:"request_type,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -998,6 +1064,13 @@ func (x *DataPlaneResponse) GetInstanceId() string {
return ""
}
+func (x *DataPlaneResponse) GetRequestType() DataPlaneResponse_RequestType {
+ if x != nil {
+ return x.RequestType
+ }
+ return DataPlaneResponse_UNSPECIFIED_REQUEST
+}
+
// A Management Plane request for information, triggers an associated rpc on the Data Plane
type ManagementPlaneRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
@@ -2974,12 +3047,22 @@ const file_mpi_v1_command_proto_rawDesc = "" +
"\x1cUpdateDataPlaneHealthRequest\x126\n" +
"\fmessage_meta\x18\x01 \x01(\v2\x13.mpi.v1.MessageMetaR\vmessageMeta\x12A\n" +
"\x10instance_healths\x18\x02 \x03(\v2\x16.mpi.v1.InstanceHealthR\x0finstanceHealths\"\x1f\n" +
- "\x1dUpdateDataPlaneHealthResponse\"\xb0\x01\n" +
+ "\x1dUpdateDataPlaneHealthResponse\"\xd5\x03\n" +
"\x11DataPlaneResponse\x126\n" +
"\fmessage_meta\x18\x01 \x01(\v2\x13.mpi.v1.MessageMetaR\vmessageMeta\x12B\n" +
"\x10command_response\x18\x02 \x01(\v2\x17.mpi.v1.CommandResponseR\x0fcommandResponse\x12\x1f\n" +
"\vinstance_id\x18\x03 \x01(\tR\n" +
- "instanceId\"\xfa\x04\n" +
+ "instanceId\x12H\n" +
+ "\frequest_type\x18\x04 \x01(\x0e2%.mpi.v1.DataPlaneResponse.RequestTypeR\vrequestType\"\xd8\x01\n" +
+ "\vRequestType\x12\x17\n" +
+ "\x13UNSPECIFIED_REQUEST\x10\x00\x12\x18\n" +
+ "\x14CONFIG_APPLY_REQUEST\x10\x01\x12\x19\n" +
+ "\x15CONFIG_UPLOAD_REQUEST\x10\x02\x12\x12\n" +
+ "\x0eHEALTH_REQUEST\x10\x03\x12\x12\n" +
+ "\x0eSTATUS_REQUEST\x10\x04\x12\x16\n" +
+ "\x12API_ACTION_REQUEST\x10\x05\x12\x1a\n" +
+ "\x16COMMAND_STATUS_REQUEST\x10\x06\x12\x1f\n" +
+ "\x1bUPDATE_AGENT_CONFIG_REQUEST\x10\a\"\xfa\x04\n" +
"\x16ManagementPlaneRequest\x126\n" +
"\fmessage_meta\x18\x01 \x01(\v2\x13.mpi.v1.MessageMetaR\vmessageMeta\x12>\n" +
"\x0estatus_request\x18\x02 \x01(\v2\x15.mpi.v1.StatusRequestH\x00R\rstatusRequest\x12>\n" +
@@ -3132,138 +3215,140 @@ func file_mpi_v1_command_proto_rawDescGZIP() []byte {
return file_mpi_v1_command_proto_rawDescData
}
-var file_mpi_v1_command_proto_enumTypes = make([]protoimpl.EnumInfo, 3)
+var file_mpi_v1_command_proto_enumTypes = make([]protoimpl.EnumInfo, 4)
var file_mpi_v1_command_proto_msgTypes = make([]protoimpl.MessageInfo, 42)
var file_mpi_v1_command_proto_goTypes = []any{
(InstanceHealth_InstanceHealthStatus)(0), // 0: mpi.v1.InstanceHealth.InstanceHealthStatus
- (InstanceMeta_InstanceType)(0), // 1: mpi.v1.InstanceMeta.InstanceType
- (Log_LogLevel)(0), // 2: mpi.v1.Log.LogLevel
- (*CreateConnectionRequest)(nil), // 3: mpi.v1.CreateConnectionRequest
- (*Resource)(nil), // 4: mpi.v1.Resource
- (*HostInfo)(nil), // 5: mpi.v1.HostInfo
- (*ReleaseInfo)(nil), // 6: mpi.v1.ReleaseInfo
- (*ContainerInfo)(nil), // 7: mpi.v1.ContainerInfo
- (*CreateConnectionResponse)(nil), // 8: mpi.v1.CreateConnectionResponse
- (*UpdateDataPlaneStatusRequest)(nil), // 9: mpi.v1.UpdateDataPlaneStatusRequest
- (*UpdateDataPlaneStatusResponse)(nil), // 10: mpi.v1.UpdateDataPlaneStatusResponse
- (*UpdateAgentConfigRequest)(nil), // 11: mpi.v1.UpdateAgentConfigRequest
- (*InstanceHealth)(nil), // 12: mpi.v1.InstanceHealth
- (*UpdateDataPlaneHealthRequest)(nil), // 13: mpi.v1.UpdateDataPlaneHealthRequest
- (*UpdateDataPlaneHealthResponse)(nil), // 14: mpi.v1.UpdateDataPlaneHealthResponse
- (*DataPlaneResponse)(nil), // 15: mpi.v1.DataPlaneResponse
- (*ManagementPlaneRequest)(nil), // 16: mpi.v1.ManagementPlaneRequest
- (*StatusRequest)(nil), // 17: mpi.v1.StatusRequest
- (*HealthRequest)(nil), // 18: mpi.v1.HealthRequest
- (*ConfigApplyRequest)(nil), // 19: mpi.v1.ConfigApplyRequest
- (*ConfigUploadRequest)(nil), // 20: mpi.v1.ConfigUploadRequest
- (*APIActionRequest)(nil), // 21: mpi.v1.APIActionRequest
- (*NGINXPlusAction)(nil), // 22: mpi.v1.NGINXPlusAction
- (*UpdateHTTPUpstreamServers)(nil), // 23: mpi.v1.UpdateHTTPUpstreamServers
- (*GetHTTPUpstreamServers)(nil), // 24: mpi.v1.GetHTTPUpstreamServers
- (*UpdateStreamServers)(nil), // 25: mpi.v1.UpdateStreamServers
- (*GetUpstreams)(nil), // 26: mpi.v1.GetUpstreams
- (*GetStreamUpstreams)(nil), // 27: mpi.v1.GetStreamUpstreams
- (*CommandStatusRequest)(nil), // 28: mpi.v1.CommandStatusRequest
- (*Instance)(nil), // 29: mpi.v1.Instance
- (*InstanceMeta)(nil), // 30: mpi.v1.InstanceMeta
- (*InstanceConfig)(nil), // 31: mpi.v1.InstanceConfig
- (*InstanceRuntime)(nil), // 32: mpi.v1.InstanceRuntime
- (*InstanceChild)(nil), // 33: mpi.v1.InstanceChild
- (*NGINXRuntimeInfo)(nil), // 34: mpi.v1.NGINXRuntimeInfo
- (*NGINXPlusRuntimeInfo)(nil), // 35: mpi.v1.NGINXPlusRuntimeInfo
- (*APIDetails)(nil), // 36: mpi.v1.APIDetails
- (*NGINXAppProtectRuntimeInfo)(nil), // 37: mpi.v1.NGINXAppProtectRuntimeInfo
- (*InstanceAction)(nil), // 38: mpi.v1.InstanceAction
- (*AgentConfig)(nil), // 39: mpi.v1.AgentConfig
- (*Log)(nil), // 40: mpi.v1.Log
- (*CommandServer)(nil), // 41: mpi.v1.CommandServer
- (*AuxiliaryCommandServer)(nil), // 42: mpi.v1.AuxiliaryCommandServer
- (*MetricsServer)(nil), // 43: mpi.v1.MetricsServer
- (*FileServer)(nil), // 44: mpi.v1.FileServer
- (*MessageMeta)(nil), // 45: mpi.v1.MessageMeta
- (*CommandResponse)(nil), // 46: mpi.v1.CommandResponse
- (*FileOverview)(nil), // 47: mpi.v1.FileOverview
- (*structpb.Struct)(nil), // 48: google.protobuf.Struct
- (*ServerSettings)(nil), // 49: mpi.v1.ServerSettings
- (*AuthSettings)(nil), // 50: mpi.v1.AuthSettings
- (*TLSSettings)(nil), // 51: mpi.v1.TLSSettings
+ (DataPlaneResponse_RequestType)(0), // 1: mpi.v1.DataPlaneResponse.RequestType
+ (InstanceMeta_InstanceType)(0), // 2: mpi.v1.InstanceMeta.InstanceType
+ (Log_LogLevel)(0), // 3: mpi.v1.Log.LogLevel
+ (*CreateConnectionRequest)(nil), // 4: mpi.v1.CreateConnectionRequest
+ (*Resource)(nil), // 5: mpi.v1.Resource
+ (*HostInfo)(nil), // 6: mpi.v1.HostInfo
+ (*ReleaseInfo)(nil), // 7: mpi.v1.ReleaseInfo
+ (*ContainerInfo)(nil), // 8: mpi.v1.ContainerInfo
+ (*CreateConnectionResponse)(nil), // 9: mpi.v1.CreateConnectionResponse
+ (*UpdateDataPlaneStatusRequest)(nil), // 10: mpi.v1.UpdateDataPlaneStatusRequest
+ (*UpdateDataPlaneStatusResponse)(nil), // 11: mpi.v1.UpdateDataPlaneStatusResponse
+ (*UpdateAgentConfigRequest)(nil), // 12: mpi.v1.UpdateAgentConfigRequest
+ (*InstanceHealth)(nil), // 13: mpi.v1.InstanceHealth
+ (*UpdateDataPlaneHealthRequest)(nil), // 14: mpi.v1.UpdateDataPlaneHealthRequest
+ (*UpdateDataPlaneHealthResponse)(nil), // 15: mpi.v1.UpdateDataPlaneHealthResponse
+ (*DataPlaneResponse)(nil), // 16: mpi.v1.DataPlaneResponse
+ (*ManagementPlaneRequest)(nil), // 17: mpi.v1.ManagementPlaneRequest
+ (*StatusRequest)(nil), // 18: mpi.v1.StatusRequest
+ (*HealthRequest)(nil), // 19: mpi.v1.HealthRequest
+ (*ConfigApplyRequest)(nil), // 20: mpi.v1.ConfigApplyRequest
+ (*ConfigUploadRequest)(nil), // 21: mpi.v1.ConfigUploadRequest
+ (*APIActionRequest)(nil), // 22: mpi.v1.APIActionRequest
+ (*NGINXPlusAction)(nil), // 23: mpi.v1.NGINXPlusAction
+ (*UpdateHTTPUpstreamServers)(nil), // 24: mpi.v1.UpdateHTTPUpstreamServers
+ (*GetHTTPUpstreamServers)(nil), // 25: mpi.v1.GetHTTPUpstreamServers
+ (*UpdateStreamServers)(nil), // 26: mpi.v1.UpdateStreamServers
+ (*GetUpstreams)(nil), // 27: mpi.v1.GetUpstreams
+ (*GetStreamUpstreams)(nil), // 28: mpi.v1.GetStreamUpstreams
+ (*CommandStatusRequest)(nil), // 29: mpi.v1.CommandStatusRequest
+ (*Instance)(nil), // 30: mpi.v1.Instance
+ (*InstanceMeta)(nil), // 31: mpi.v1.InstanceMeta
+ (*InstanceConfig)(nil), // 32: mpi.v1.InstanceConfig
+ (*InstanceRuntime)(nil), // 33: mpi.v1.InstanceRuntime
+ (*InstanceChild)(nil), // 34: mpi.v1.InstanceChild
+ (*NGINXRuntimeInfo)(nil), // 35: mpi.v1.NGINXRuntimeInfo
+ (*NGINXPlusRuntimeInfo)(nil), // 36: mpi.v1.NGINXPlusRuntimeInfo
+ (*APIDetails)(nil), // 37: mpi.v1.APIDetails
+ (*NGINXAppProtectRuntimeInfo)(nil), // 38: mpi.v1.NGINXAppProtectRuntimeInfo
+ (*InstanceAction)(nil), // 39: mpi.v1.InstanceAction
+ (*AgentConfig)(nil), // 40: mpi.v1.AgentConfig
+ (*Log)(nil), // 41: mpi.v1.Log
+ (*CommandServer)(nil), // 42: mpi.v1.CommandServer
+ (*AuxiliaryCommandServer)(nil), // 43: mpi.v1.AuxiliaryCommandServer
+ (*MetricsServer)(nil), // 44: mpi.v1.MetricsServer
+ (*FileServer)(nil), // 45: mpi.v1.FileServer
+ (*MessageMeta)(nil), // 46: mpi.v1.MessageMeta
+ (*CommandResponse)(nil), // 47: mpi.v1.CommandResponse
+ (*FileOverview)(nil), // 48: mpi.v1.FileOverview
+ (*structpb.Struct)(nil), // 49: google.protobuf.Struct
+ (*ServerSettings)(nil), // 50: mpi.v1.ServerSettings
+ (*AuthSettings)(nil), // 51: mpi.v1.AuthSettings
+ (*TLSSettings)(nil), // 52: mpi.v1.TLSSettings
}
var file_mpi_v1_command_proto_depIdxs = []int32{
- 45, // 0: mpi.v1.CreateConnectionRequest.message_meta:type_name -> mpi.v1.MessageMeta
- 4, // 1: mpi.v1.CreateConnectionRequest.resource:type_name -> mpi.v1.Resource
- 29, // 2: mpi.v1.Resource.instances:type_name -> mpi.v1.Instance
- 5, // 3: mpi.v1.Resource.host_info:type_name -> mpi.v1.HostInfo
- 7, // 4: mpi.v1.Resource.container_info:type_name -> mpi.v1.ContainerInfo
- 6, // 5: mpi.v1.HostInfo.release_info:type_name -> mpi.v1.ReleaseInfo
- 6, // 6: mpi.v1.ContainerInfo.release_info:type_name -> mpi.v1.ReleaseInfo
- 46, // 7: mpi.v1.CreateConnectionResponse.response:type_name -> mpi.v1.CommandResponse
- 39, // 8: mpi.v1.CreateConnectionResponse.agent_config:type_name -> mpi.v1.AgentConfig
- 45, // 9: mpi.v1.UpdateDataPlaneStatusRequest.message_meta:type_name -> mpi.v1.MessageMeta
- 4, // 10: mpi.v1.UpdateDataPlaneStatusRequest.resource:type_name -> mpi.v1.Resource
- 45, // 11: mpi.v1.UpdateAgentConfigRequest.message_meta:type_name -> mpi.v1.MessageMeta
- 39, // 12: mpi.v1.UpdateAgentConfigRequest.agent_config:type_name -> mpi.v1.AgentConfig
+ 46, // 0: mpi.v1.CreateConnectionRequest.message_meta:type_name -> mpi.v1.MessageMeta
+ 5, // 1: mpi.v1.CreateConnectionRequest.resource:type_name -> mpi.v1.Resource
+ 30, // 2: mpi.v1.Resource.instances:type_name -> mpi.v1.Instance
+ 6, // 3: mpi.v1.Resource.host_info:type_name -> mpi.v1.HostInfo
+ 8, // 4: mpi.v1.Resource.container_info:type_name -> mpi.v1.ContainerInfo
+ 7, // 5: mpi.v1.HostInfo.release_info:type_name -> mpi.v1.ReleaseInfo
+ 7, // 6: mpi.v1.ContainerInfo.release_info:type_name -> mpi.v1.ReleaseInfo
+ 47, // 7: mpi.v1.CreateConnectionResponse.response:type_name -> mpi.v1.CommandResponse
+ 40, // 8: mpi.v1.CreateConnectionResponse.agent_config:type_name -> mpi.v1.AgentConfig
+ 46, // 9: mpi.v1.UpdateDataPlaneStatusRequest.message_meta:type_name -> mpi.v1.MessageMeta
+ 5, // 10: mpi.v1.UpdateDataPlaneStatusRequest.resource:type_name -> mpi.v1.Resource
+ 46, // 11: mpi.v1.UpdateAgentConfigRequest.message_meta:type_name -> mpi.v1.MessageMeta
+ 40, // 12: mpi.v1.UpdateAgentConfigRequest.agent_config:type_name -> mpi.v1.AgentConfig
0, // 13: mpi.v1.InstanceHealth.instance_health_status:type_name -> mpi.v1.InstanceHealth.InstanceHealthStatus
- 45, // 14: mpi.v1.UpdateDataPlaneHealthRequest.message_meta:type_name -> mpi.v1.MessageMeta
- 12, // 15: mpi.v1.UpdateDataPlaneHealthRequest.instance_healths:type_name -> mpi.v1.InstanceHealth
- 45, // 16: mpi.v1.DataPlaneResponse.message_meta:type_name -> mpi.v1.MessageMeta
- 46, // 17: mpi.v1.DataPlaneResponse.command_response:type_name -> mpi.v1.CommandResponse
- 45, // 18: mpi.v1.ManagementPlaneRequest.message_meta:type_name -> mpi.v1.MessageMeta
- 17, // 19: mpi.v1.ManagementPlaneRequest.status_request:type_name -> mpi.v1.StatusRequest
- 18, // 20: mpi.v1.ManagementPlaneRequest.health_request:type_name -> mpi.v1.HealthRequest
- 19, // 21: mpi.v1.ManagementPlaneRequest.config_apply_request:type_name -> mpi.v1.ConfigApplyRequest
- 20, // 22: mpi.v1.ManagementPlaneRequest.config_upload_request:type_name -> mpi.v1.ConfigUploadRequest
- 21, // 23: mpi.v1.ManagementPlaneRequest.action_request:type_name -> mpi.v1.APIActionRequest
- 28, // 24: mpi.v1.ManagementPlaneRequest.command_status_request:type_name -> mpi.v1.CommandStatusRequest
- 11, // 25: mpi.v1.ManagementPlaneRequest.update_agent_config_request:type_name -> mpi.v1.UpdateAgentConfigRequest
- 47, // 26: mpi.v1.ConfigApplyRequest.overview:type_name -> mpi.v1.FileOverview
- 47, // 27: mpi.v1.ConfigUploadRequest.overview:type_name -> mpi.v1.FileOverview
- 22, // 28: mpi.v1.APIActionRequest.nginx_plus_action:type_name -> mpi.v1.NGINXPlusAction
- 23, // 29: mpi.v1.NGINXPlusAction.update_http_upstream_servers:type_name -> mpi.v1.UpdateHTTPUpstreamServers
- 24, // 30: mpi.v1.NGINXPlusAction.get_http_upstream_servers:type_name -> mpi.v1.GetHTTPUpstreamServers
- 25, // 31: mpi.v1.NGINXPlusAction.update_stream_servers:type_name -> mpi.v1.UpdateStreamServers
- 26, // 32: mpi.v1.NGINXPlusAction.get_upstreams:type_name -> mpi.v1.GetUpstreams
- 27, // 33: mpi.v1.NGINXPlusAction.get_stream_upstreams:type_name -> mpi.v1.GetStreamUpstreams
- 48, // 34: mpi.v1.UpdateHTTPUpstreamServers.servers:type_name -> google.protobuf.Struct
- 48, // 35: mpi.v1.UpdateStreamServers.servers:type_name -> google.protobuf.Struct
- 30, // 36: mpi.v1.Instance.instance_meta:type_name -> mpi.v1.InstanceMeta
- 31, // 37: mpi.v1.Instance.instance_config:type_name -> mpi.v1.InstanceConfig
- 32, // 38: mpi.v1.Instance.instance_runtime:type_name -> mpi.v1.InstanceRuntime
- 1, // 39: mpi.v1.InstanceMeta.instance_type:type_name -> mpi.v1.InstanceMeta.InstanceType
- 38, // 40: mpi.v1.InstanceConfig.actions:type_name -> mpi.v1.InstanceAction
- 39, // 41: mpi.v1.InstanceConfig.agent_config:type_name -> mpi.v1.AgentConfig
- 34, // 42: mpi.v1.InstanceRuntime.nginx_runtime_info:type_name -> mpi.v1.NGINXRuntimeInfo
- 35, // 43: mpi.v1.InstanceRuntime.nginx_plus_runtime_info:type_name -> mpi.v1.NGINXPlusRuntimeInfo
- 37, // 44: mpi.v1.InstanceRuntime.nginx_app_protect_runtime_info:type_name -> mpi.v1.NGINXAppProtectRuntimeInfo
- 33, // 45: mpi.v1.InstanceRuntime.instance_children:type_name -> mpi.v1.InstanceChild
- 36, // 46: mpi.v1.NGINXRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails
- 36, // 47: mpi.v1.NGINXPlusRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails
- 36, // 48: mpi.v1.NGINXPlusRuntimeInfo.plus_api:type_name -> mpi.v1.APIDetails
- 41, // 49: mpi.v1.AgentConfig.command:type_name -> mpi.v1.CommandServer
- 43, // 50: mpi.v1.AgentConfig.metrics:type_name -> mpi.v1.MetricsServer
- 44, // 51: mpi.v1.AgentConfig.file:type_name -> mpi.v1.FileServer
- 48, // 52: mpi.v1.AgentConfig.labels:type_name -> google.protobuf.Struct
- 42, // 53: mpi.v1.AgentConfig.auxiliary_command:type_name -> mpi.v1.AuxiliaryCommandServer
- 40, // 54: mpi.v1.AgentConfig.log:type_name -> mpi.v1.Log
- 2, // 55: mpi.v1.Log.log_level:type_name -> mpi.v1.Log.LogLevel
- 49, // 56: mpi.v1.CommandServer.server:type_name -> mpi.v1.ServerSettings
- 50, // 57: mpi.v1.CommandServer.auth:type_name -> mpi.v1.AuthSettings
- 51, // 58: mpi.v1.CommandServer.tls:type_name -> mpi.v1.TLSSettings
- 49, // 59: mpi.v1.AuxiliaryCommandServer.server:type_name -> mpi.v1.ServerSettings
- 50, // 60: mpi.v1.AuxiliaryCommandServer.auth:type_name -> mpi.v1.AuthSettings
- 51, // 61: mpi.v1.AuxiliaryCommandServer.tls:type_name -> mpi.v1.TLSSettings
- 3, // 62: mpi.v1.CommandService.CreateConnection:input_type -> mpi.v1.CreateConnectionRequest
- 9, // 63: mpi.v1.CommandService.UpdateDataPlaneStatus:input_type -> mpi.v1.UpdateDataPlaneStatusRequest
- 13, // 64: mpi.v1.CommandService.UpdateDataPlaneHealth:input_type -> mpi.v1.UpdateDataPlaneHealthRequest
- 15, // 65: mpi.v1.CommandService.Subscribe:input_type -> mpi.v1.DataPlaneResponse
- 8, // 66: mpi.v1.CommandService.CreateConnection:output_type -> mpi.v1.CreateConnectionResponse
- 10, // 67: mpi.v1.CommandService.UpdateDataPlaneStatus:output_type -> mpi.v1.UpdateDataPlaneStatusResponse
- 14, // 68: mpi.v1.CommandService.UpdateDataPlaneHealth:output_type -> mpi.v1.UpdateDataPlaneHealthResponse
- 16, // 69: mpi.v1.CommandService.Subscribe:output_type -> mpi.v1.ManagementPlaneRequest
- 66, // [66:70] is the sub-list for method output_type
- 62, // [62:66] is the sub-list for method input_type
- 62, // [62:62] is the sub-list for extension type_name
- 62, // [62:62] is the sub-list for extension extendee
- 0, // [0:62] is the sub-list for field type_name
+ 46, // 14: mpi.v1.UpdateDataPlaneHealthRequest.message_meta:type_name -> mpi.v1.MessageMeta
+ 13, // 15: mpi.v1.UpdateDataPlaneHealthRequest.instance_healths:type_name -> mpi.v1.InstanceHealth
+ 46, // 16: mpi.v1.DataPlaneResponse.message_meta:type_name -> mpi.v1.MessageMeta
+ 47, // 17: mpi.v1.DataPlaneResponse.command_response:type_name -> mpi.v1.CommandResponse
+ 1, // 18: mpi.v1.DataPlaneResponse.request_type:type_name -> mpi.v1.DataPlaneResponse.RequestType
+ 46, // 19: mpi.v1.ManagementPlaneRequest.message_meta:type_name -> mpi.v1.MessageMeta
+ 18, // 20: mpi.v1.ManagementPlaneRequest.status_request:type_name -> mpi.v1.StatusRequest
+ 19, // 21: mpi.v1.ManagementPlaneRequest.health_request:type_name -> mpi.v1.HealthRequest
+ 20, // 22: mpi.v1.ManagementPlaneRequest.config_apply_request:type_name -> mpi.v1.ConfigApplyRequest
+ 21, // 23: mpi.v1.ManagementPlaneRequest.config_upload_request:type_name -> mpi.v1.ConfigUploadRequest
+ 22, // 24: mpi.v1.ManagementPlaneRequest.action_request:type_name -> mpi.v1.APIActionRequest
+ 29, // 25: mpi.v1.ManagementPlaneRequest.command_status_request:type_name -> mpi.v1.CommandStatusRequest
+ 12, // 26: mpi.v1.ManagementPlaneRequest.update_agent_config_request:type_name -> mpi.v1.UpdateAgentConfigRequest
+ 48, // 27: mpi.v1.ConfigApplyRequest.overview:type_name -> mpi.v1.FileOverview
+ 48, // 28: mpi.v1.ConfigUploadRequest.overview:type_name -> mpi.v1.FileOverview
+ 23, // 29: mpi.v1.APIActionRequest.nginx_plus_action:type_name -> mpi.v1.NGINXPlusAction
+ 24, // 30: mpi.v1.NGINXPlusAction.update_http_upstream_servers:type_name -> mpi.v1.UpdateHTTPUpstreamServers
+ 25, // 31: mpi.v1.NGINXPlusAction.get_http_upstream_servers:type_name -> mpi.v1.GetHTTPUpstreamServers
+ 26, // 32: mpi.v1.NGINXPlusAction.update_stream_servers:type_name -> mpi.v1.UpdateStreamServers
+ 27, // 33: mpi.v1.NGINXPlusAction.get_upstreams:type_name -> mpi.v1.GetUpstreams
+ 28, // 34: mpi.v1.NGINXPlusAction.get_stream_upstreams:type_name -> mpi.v1.GetStreamUpstreams
+ 49, // 35: mpi.v1.UpdateHTTPUpstreamServers.servers:type_name -> google.protobuf.Struct
+ 49, // 36: mpi.v1.UpdateStreamServers.servers:type_name -> google.protobuf.Struct
+ 31, // 37: mpi.v1.Instance.instance_meta:type_name -> mpi.v1.InstanceMeta
+ 32, // 38: mpi.v1.Instance.instance_config:type_name -> mpi.v1.InstanceConfig
+ 33, // 39: mpi.v1.Instance.instance_runtime:type_name -> mpi.v1.InstanceRuntime
+ 2, // 40: mpi.v1.InstanceMeta.instance_type:type_name -> mpi.v1.InstanceMeta.InstanceType
+ 39, // 41: mpi.v1.InstanceConfig.actions:type_name -> mpi.v1.InstanceAction
+ 40, // 42: mpi.v1.InstanceConfig.agent_config:type_name -> mpi.v1.AgentConfig
+ 35, // 43: mpi.v1.InstanceRuntime.nginx_runtime_info:type_name -> mpi.v1.NGINXRuntimeInfo
+ 36, // 44: mpi.v1.InstanceRuntime.nginx_plus_runtime_info:type_name -> mpi.v1.NGINXPlusRuntimeInfo
+ 38, // 45: mpi.v1.InstanceRuntime.nginx_app_protect_runtime_info:type_name -> mpi.v1.NGINXAppProtectRuntimeInfo
+ 34, // 46: mpi.v1.InstanceRuntime.instance_children:type_name -> mpi.v1.InstanceChild
+ 37, // 47: mpi.v1.NGINXRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails
+ 37, // 48: mpi.v1.NGINXPlusRuntimeInfo.stub_status:type_name -> mpi.v1.APIDetails
+ 37, // 49: mpi.v1.NGINXPlusRuntimeInfo.plus_api:type_name -> mpi.v1.APIDetails
+ 42, // 50: mpi.v1.AgentConfig.command:type_name -> mpi.v1.CommandServer
+ 44, // 51: mpi.v1.AgentConfig.metrics:type_name -> mpi.v1.MetricsServer
+ 45, // 52: mpi.v1.AgentConfig.file:type_name -> mpi.v1.FileServer
+ 49, // 53: mpi.v1.AgentConfig.labels:type_name -> google.protobuf.Struct
+ 43, // 54: mpi.v1.AgentConfig.auxiliary_command:type_name -> mpi.v1.AuxiliaryCommandServer
+ 41, // 55: mpi.v1.AgentConfig.log:type_name -> mpi.v1.Log
+ 3, // 56: mpi.v1.Log.log_level:type_name -> mpi.v1.Log.LogLevel
+ 50, // 57: mpi.v1.CommandServer.server:type_name -> mpi.v1.ServerSettings
+ 51, // 58: mpi.v1.CommandServer.auth:type_name -> mpi.v1.AuthSettings
+ 52, // 59: mpi.v1.CommandServer.tls:type_name -> mpi.v1.TLSSettings
+ 50, // 60: mpi.v1.AuxiliaryCommandServer.server:type_name -> mpi.v1.ServerSettings
+ 51, // 61: mpi.v1.AuxiliaryCommandServer.auth:type_name -> mpi.v1.AuthSettings
+ 52, // 62: mpi.v1.AuxiliaryCommandServer.tls:type_name -> mpi.v1.TLSSettings
+ 4, // 63: mpi.v1.CommandService.CreateConnection:input_type -> mpi.v1.CreateConnectionRequest
+ 10, // 64: mpi.v1.CommandService.UpdateDataPlaneStatus:input_type -> mpi.v1.UpdateDataPlaneStatusRequest
+ 14, // 65: mpi.v1.CommandService.UpdateDataPlaneHealth:input_type -> mpi.v1.UpdateDataPlaneHealthRequest
+ 16, // 66: mpi.v1.CommandService.Subscribe:input_type -> mpi.v1.DataPlaneResponse
+ 9, // 67: mpi.v1.CommandService.CreateConnection:output_type -> mpi.v1.CreateConnectionResponse
+ 11, // 68: mpi.v1.CommandService.UpdateDataPlaneStatus:output_type -> mpi.v1.UpdateDataPlaneStatusResponse
+ 15, // 69: mpi.v1.CommandService.UpdateDataPlaneHealth:output_type -> mpi.v1.UpdateDataPlaneHealthResponse
+ 17, // 70: mpi.v1.CommandService.Subscribe:output_type -> mpi.v1.ManagementPlaneRequest
+ 67, // [67:71] is the sub-list for method output_type
+ 63, // [63:67] is the sub-list for method input_type
+ 63, // [63:63] is the sub-list for extension type_name
+ 63, // [63:63] is the sub-list for extension extendee
+ 0, // [0:63] is the sub-list for field type_name
}
func init() { file_mpi_v1_command_proto_init() }
@@ -3309,7 +3394,7 @@ func file_mpi_v1_command_proto_init() {
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_mpi_v1_command_proto_rawDesc), len(file_mpi_v1_command_proto_rawDesc)),
- NumEnums: 3,
+ NumEnums: 4,
NumMessages: 42,
NumExtensions: 0,
NumServices: 1,
diff --git a/api/grpc/mpi/v1/command.pb.validate.go b/api/grpc/mpi/v1/command.pb.validate.go
index ab7777024d..0cb38eae2b 100644
--- a/api/grpc/mpi/v1/command.pb.validate.go
+++ b/api/grpc/mpi/v1/command.pb.validate.go
@@ -1832,6 +1832,8 @@ func (m *DataPlaneResponse) validate(all bool) error {
// no validation rules for InstanceId
+ // no validation rules for RequestType
+
if len(errors) > 0 {
return DataPlaneResponseMultiError(errors)
}
diff --git a/api/grpc/mpi/v1/command.proto b/api/grpc/mpi/v1/command.proto
index 0456757ade..d245fdb01b 100644
--- a/api/grpc/mpi/v1/command.proto
+++ b/api/grpc/mpi/v1/command.proto
@@ -152,12 +152,25 @@ message UpdateDataPlaneHealthResponse {}
// Reports the status of an associated command. This may be in response to a ManagementPlaneRequest
message DataPlaneResponse {
+ enum RequestType {
+ UNSPECIFIED_REQUEST = 0;
+ CONFIG_APPLY_REQUEST = 1;
+ CONFIG_UPLOAD_REQUEST = 2;
+ HEALTH_REQUEST = 3;
+ STATUS_REQUEST = 4;
+ API_ACTION_REQUEST = 5;
+ COMMAND_STATUS_REQUEST = 6;
+ UPDATE_AGENT_CONFIG_REQUEST = 7;
+ }
+
// Meta-information associated with a message
mpi.v1.MessageMeta message_meta = 1;
// The command response with the associated request
mpi.v1.CommandResponse command_response = 2;
// The instance identifier, if applicable, for this response
string instance_id = 3;
+ // The management plane request type that is being responded to
+ RequestType request_type = 4;
}
// A Management Plane request for information, triggers an associated rpc on the Data Plane
diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go
index b59f47b5af..a4fc36217d 100644
--- a/api/grpc/mpi/v1/common.pb.go
+++ b/api/grpc/mpi/v1/common.pb.go
@@ -5,7 +5,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.36.10
+// protoc-gen-go v1.36.11
// protoc (unknown)
// source: mpi/v1/common.proto
diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go
index 0223bae3a7..57fdff253a 100644
--- a/api/grpc/mpi/v1/files.pb.go
+++ b/api/grpc/mpi/v1/files.pb.go
@@ -5,7 +5,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.36.10
+// protoc-gen-go v1.36.11
// protoc (unknown)
// source: mpi/v1/files.proto
diff --git a/docs/proto/protos.md b/docs/proto/protos.md
index 18301abd50..8bf916886e 100644
--- a/docs/proto/protos.md
+++ b/docs/proto/protos.md
@@ -85,6 +85,7 @@
- [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers)
- [UpdateStreamServers](#mpi-v1-UpdateStreamServers)
+ - [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType)
- [InstanceHealth.InstanceHealthStatus](#mpi-v1-InstanceHealth-InstanceHealthStatus)
- [InstanceMeta.InstanceType](#mpi-v1-InstanceMeta-InstanceType)
- [Log.LogLevel](#mpi-v1-Log-LogLevel)
@@ -862,6 +863,7 @@ Reports the status of an associated command. This may be in response to a Manage
| message_meta | [MessageMeta](#mpi-v1-MessageMeta) | | Meta-information associated with a message |
| command_response | [CommandResponse](#mpi-v1-CommandResponse) | | The command response with the associated request |
| instance_id | [string](#string) | | The instance identifier, if applicable, for this response |
+| request_type | [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType) | | The management plane request type that is being responded to |
@@ -1326,6 +1328,24 @@ Update Upstream Stream Servers for an instance
+
+
+### DataPlaneResponse.RequestType
+
+
+| Name | Number | Description |
+| ---- | ------ | ----------- |
+| UNSPECIFIED_REQUEST | 0 | |
+| CONFIG_APPLY_REQUEST | 1 | |
+| CONFIG_UPLOAD_REQUEST | 2 | |
+| HEALTH_REQUEST | 3 | |
+| STATUS_REQUEST | 4 | |
+| API_ACTION_REQUEST | 5 | |
+| COMMAND_STATUS_REQUEST | 6 | |
+| UPDATE_AGENT_CONFIG_REQUEST | 7 | |
+
+
+
### InstanceHealth.InstanceHealthStatus
diff --git a/internal/bus/message_pipe.go b/internal/bus/message_pipe.go
index b618c99075..38ee2fcab1 100644
--- a/internal/bus/message_pipe.go
+++ b/internal/bus/message_pipe.go
@@ -206,8 +206,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon
// If the agent update was received from a create connection request no data plane response needs to be sent
if topic == AgentConfigUpdateTopic {
- response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "Failed to update agent config", reconfigureError.Error())
+ response := p.createDataPlaneResponse(
+ correlationID,
+ mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST,
+ "Failed to update agent config",
+ reconfigureError.Error(),
+ )
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}
@@ -222,8 +227,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon
slog.InfoContext(ctx, "Finished reconfiguring plugins", "plugins", p.plugins)
if topic == AgentConfigUpdateTopic {
- response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- "Successfully updated agent config", "")
+ response := p.createDataPlaneResponse(
+ correlationID,
+ mpi.CommandResponse_COMMAND_STATUS_OK,
+ mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST,
+ "Successfully updated agent config",
+ "",
+ )
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}
}
@@ -339,7 +349,10 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
}
}
-func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
+func (p *MessagePipe) createDataPlaneResponse(
+ correlationID string,
+ status mpi.CommandResponse_CommandStatus,
+ requestType mpi.DataPlaneResponse_RequestType,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
@@ -353,6 +366,7 @@ func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.C
Message: message,
Error: err,
},
+ RequestType: requestType,
}
}
diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go
index d2992f7f72..ba35f01483 100644
--- a/internal/collector/otel_collector_plugin.go
+++ b/internal/collector/otel_collector_plugin.go
@@ -143,7 +143,6 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
return errors.New("OTel collector already running")
}
- slog.InfoContext(ctx, "Starting OTel collector")
bootErr := oc.bootup(runCtx)
if bootErr != nil {
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr)
@@ -163,30 +162,7 @@ func (oc *Collector) Info() *bus.Info {
func (oc *Collector) Close(ctx context.Context) error {
slog.InfoContext(ctx, "Closing OTel Collector plugin")
- if !oc.stopped {
- slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
- oc.service.Shutdown()
- oc.cancel()
-
- settings := *oc.config.Client.Backoff
- settings.MaxElapsedTime = maxTimeToWaitForShutdown
- err := backoff.WaitUntil(ctx, &settings, func() error {
- if oc.service.GetState() == otelcol.StateClosed {
- return nil
- }
-
- return errors.New("OTel Collector not in a closed state yet")
- })
-
- if err != nil {
- slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
- } else {
- slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
- oc.stopped = true
- }
- }
-
- return nil
+ return oc.shutdownCollector(ctx)
}
// Process an incoming Message Bus message in the plugin
@@ -236,6 +212,33 @@ func (oc *Collector) Reconfigure(ctx context.Context, agentConfig *config.Config
return nil
}
+func (oc *Collector) shutdownCollector(ctx context.Context) error {
+ if !oc.stopped {
+ slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
+ oc.service.Shutdown()
+ oc.cancel()
+
+ settings := *oc.config.Client.Backoff
+ settings.MaxElapsedTime = maxTimeToWaitForShutdown
+ err := backoff.WaitUntil(ctx, &settings, func() error {
+ if oc.service.GetState() == otelcol.StateClosed {
+ return nil
+ }
+
+ return errors.New("OTel Collector not in a closed state yet")
+ })
+
+ if err != nil {
+ slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
+ } else {
+ slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
+ oc.stopped = true
+ }
+ }
+
+ return nil
+}
+
// Process receivers and log warning for sub-optimal configurations
func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) {
for _, receiver := range receivers {
@@ -275,11 +278,12 @@ func (oc *Collector) bootup(ctx context.Context) error {
oc.setProxyIfNeeded(ctx)
}
+ slog.InfoContext(ctx, "Starting OTel collector")
appErr := oc.service.Run(ctx)
if appErr != nil {
errChan <- appErr
}
- slog.InfoContext(ctx, "OTel collector run finished")
+ slog.InfoContext(ctx, "OTel collector has stopped running")
}()
for {
@@ -453,7 +457,7 @@ func (oc *Collector) writeRunningConfig(ctx context.Context, settings otelcol.Co
}
func (oc *Collector) restartCollector(ctx context.Context) {
- err := oc.Close(ctx)
+ err := oc.shutdownCollector(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
return
diff --git a/internal/collector/settings.go b/internal/collector/settings.go
index 3346a0df94..9445a871c4 100644
--- a/internal/collector/settings.go
+++ b/internal/collector/settings.go
@@ -96,20 +96,20 @@ func createFile(confPath string) error {
// Generates an OTel Collector config to a file by injecting the Metrics Config to a Go template.
func writeCollectorConfig(conf *config.Collector) error {
+ confPath := filepath.Clean(conf.ConfigPath)
+
+ slog.Info("Writing OTel collector config", "config_path", confPath)
+
if conf.Processors.Resource["default"] != nil {
addDefaultResourceProcessor(conf.Pipelines.Metrics)
addDefaultResourceProcessor(conf.Pipelines.Logs)
}
- slog.Info("Writing OTel collector config")
-
otelcolTemplate, templateErr := template.New(otelTemplatePath).Parse(otelcolTemplate)
if templateErr != nil {
return templateErr
}
- confPath := filepath.Clean(conf.ConfigPath)
-
// Ensure file exists and has correct permissions
if err := ensureFileExists(confPath); err != nil {
return err
diff --git a/internal/command/command_plugin.go b/internal/command/command_plugin.go
index feb1c3b6d2..a27104b128 100644
--- a/internal/command/command_plugin.go
+++ b/internal/command/command_plugin.go
@@ -217,14 +217,24 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me
cp.processDataPlaneResponse(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
- Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "Failed to send the health status update", err.Error()),
+ Data: cp.createDataPlaneResponse(
+ correlationID,
+ mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ mpi.DataPlaneResponse_HEALTH_REQUEST,
+ "Failed to send the health status update",
+ err.Error(),
+ ),
})
}
cp.processDataPlaneResponse(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
- Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- "Successfully sent health status update", ""),
+ Data: cp.createDataPlaneResponse(
+ correlationID,
+ mpi.CommandResponse_COMMAND_STATUS_OK,
+ mpi.DataPlaneResponse_HEALTH_REQUEST,
+ "Successfully sent health status update",
+ "",
+ ),
})
}
}
@@ -242,8 +252,25 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received data plane response message")
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
- slog.InfoContext(ctx, "Sending data plane response message", "message",
- response.GetCommandResponse().GetMessage(), "status", response.GetCommandResponse().GetStatus())
+ // To prevent this type of request from spamming the logs too much, we use debug level
+ if response.GetRequestType() != mpi.DataPlaneResponse_HEALTH_REQUEST {
+ slog.InfoContext(
+ ctx,
+ "Sending data plane response message",
+ "message", response.GetCommandResponse().GetMessage(),
+ "status", response.GetCommandResponse().GetStatus(),
+ "error", response.GetCommandResponse().GetError(),
+ )
+ } else {
+ slog.DebugContext(
+ ctx,
+ "Sending data plane response message",
+ "message", response.GetCommandResponse().GetMessage(),
+ "status", response.GetCommandResponse().GetStatus(),
+ "error", response.GetCommandResponse().GetError(),
+ )
+ }
+
err := cp.commandService.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
@@ -318,7 +345,8 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
slog.InfoContext(ctx, "Received management plane config apply request")
cp.handleConfigApplyRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_HealthRequest:
- slog.InfoContext(ctx, "Received management plane health request")
+ // To prevent this type of request from spamming the logs too much, we use debug level
+ slog.DebugContext(ctx, "Received management plane health request")
cp.handleHealthRequest(newCtx)
case *mpi.ManagementPlaneRequest_ActionRequest:
if cp.commandServerType != model.Command {
@@ -445,7 +473,10 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
}
}
-func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
+func (cp *CommandPlugin) createDataPlaneResponse(
+ correlationID string,
+ status mpi.CommandResponse_CommandStatus,
+ requestType mpi.DataPlaneResponse_RequestType,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
@@ -459,5 +490,6 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp
Message: message,
Error: err,
},
+ RequestType: requestType,
}
}
diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go
index ef8782614c..bc2291f5f8 100644
--- a/internal/command/command_plugin_test.go
+++ b/internal/command/command_plugin_test.go
@@ -434,6 +434,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
expected.GetCommandResponse().GetStatus(),
+ expected.GetRequestType(),
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())
assert.Equal(t, expected.GetCommandResponse(), result.GetCommandResponse())
diff --git a/internal/command/command_service.go b/internal/command/command_service.go
index 61daf603fa..f73a85c5c8 100644
--- a/internal/command/command_service.go
+++ b/internal/command/command_service.go
@@ -524,7 +524,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e
return nil
}
- slog.ErrorContext(ctx, "Failed to"+errorMsg, "error", err)
+ slog.ErrorContext(ctx, "Failed to "+errorMsg, "error", err)
return err
}
diff --git a/internal/datasource/proto/response.go b/internal/datasource/proto/response.go
index f2c778a7f8..38276e2ef4 100644
--- a/internal/datasource/proto/response.go
+++ b/internal/datasource/proto/response.go
@@ -11,8 +11,11 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)
-func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
- message, instanceID, err string,
+func CreateDataPlaneResponse(
+ correlationID string,
+ commandResponse *mpi.CommandResponse,
+ requestType mpi.DataPlaneResponse_RequestType,
+ instanceID string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
@@ -20,11 +23,8 @@ func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_Co
CorrelationId: correlationID,
Timestamp: timestamppb.Now(),
},
- CommandResponse: &mpi.CommandResponse{
- Status: status,
- Message: message,
- Error: err,
- },
- InstanceId: instanceID,
+ CommandResponse: commandResponse,
+ InstanceId: instanceID,
+ RequestType: requestType,
}
}
diff --git a/internal/file/file_manager_service.go b/internal/file/file_manager_service.go
index 08fc0f2d2c..92eb51240d 100644
--- a/internal/file/file_manager_service.go
+++ b/internal/file/file_manager_service.go
@@ -220,13 +220,14 @@ func (fms *FileManagerService) ClearCache() {
//nolint:revive,cyclop // cognitive-complexity of 13 max is 12, loop is needed cant be broken up
func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string) error {
- slog.InfoContext(ctx, "Rolling back config for instance", "instance_id", instanceID)
+ slog.InfoContext(ctx, "Rolling back config apply updates", "instance_id", instanceID)
fms.filesMutex.Lock()
defer fms.filesMutex.Unlock()
for _, fileAction := range fms.fileActions {
switch fileAction.Action {
case model.Add:
+ slog.InfoContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName())
if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("error deleting file: %s error: %w", fileAction.File.GetFileMeta().GetName(), err)
}
@@ -236,7 +237,7 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)
continue
case model.Delete, model.Update:
- content, err := fms.restoreFiles(fileAction)
+ content, err := fms.restoreFiles(ctx, fileAction)
if err != nil {
return err
}
@@ -276,9 +277,15 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
slog.ErrorContext(ctx, "Unable to update current files on disk", "error", updateError)
}
- slog.InfoContext(ctx, "Updating overview after nginx config update")
- err := fms.fileServiceOperator.UpdateOverview(ctx, nginxConfigContext.InstanceID,
- nginxConfigContext.Files, nginxConfigContext.ConfigPath, 0)
+ slog.InfoContext(ctx, "Sending file overview update due to NGINX configuration updates")
+
+ err := fms.fileServiceOperator.UpdateOverview(
+ ctx,
+ nginxConfigContext.InstanceID,
+ nginxConfigContext.Files,
+ nginxConfigContext.ConfigPath,
+ 0,
+ )
if err != nil {
slog.ErrorContext(
ctx,
@@ -287,6 +294,7 @@ func (fms *FileManagerService) ConfigUpdate(ctx context.Context,
"error", err,
)
}
+
slog.InfoContext(ctx, "Finished updating file overview")
}
@@ -545,7 +553,7 @@ func (fms *FileManagerService) backupFiles(ctx context.Context) error {
return nil
}
-func (fms *FileManagerService) restoreFiles(fileAction *model.FileCache) ([]byte, error) {
+func (fms *FileManagerService) restoreFiles(ctx context.Context, fileAction *model.FileCache) ([]byte, error) {
fileMeta := fileAction.File.GetFileMeta()
fileName := fileMeta.GetName()
@@ -556,6 +564,8 @@ func (fms *FileManagerService) restoreFiles(fileAction *model.FileCache) ([]byte
return nil, fmt.Errorf("failed to create directories for %s: %w", fileName, err)
}
+ slog.InfoContext(ctx, "Restoring file from it's backup", "file", fileName, "backup_file", tempFilePath)
+
moveErr := os.Rename(tempFilePath, fileName)
if moveErr != nil {
return nil, fmt.Errorf("failed to rename file, %s to %s: %w", tempFilePath, fileName, moveErr)
@@ -623,7 +633,7 @@ func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Co
}
if len(downloadFiles) == 0 {
- slog.DebugContext(ctx, "No updated files to download")
+ slog.InfoContext(ctx, "No files require downloading")
return nil
}
@@ -634,7 +644,7 @@ func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(ctx context.Co
errGroup.Go(func() error {
tempFilePath := tempFilePath(fileAction.File.GetFileMeta().GetName())
- slog.DebugContext(
+ slog.InfoContext(
errGroupCtx,
"Downloading file to temp location",
"file", tempFilePath,
@@ -652,7 +662,7 @@ actionsLoop:
for _, fileAction := range fms.fileActions {
switch fileAction.Action {
case model.Delete:
- slog.DebugContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName())
+ slog.InfoContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName())
if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) {
actionError = fmt.Errorf("error deleting file: %s error: %w",
fileAction.File.GetFileMeta().GetName(), err)
diff --git a/internal/file/file_plugin.go b/internal/file/file_plugin.go
index 4aa05be346..dc464e0327 100644
--- a/internal/file/file_plugin.go
+++ b/internal/file/file_plugin.go
@@ -243,13 +243,21 @@ func (fp *FilePlugin) handleConfigApplyFailedRequest(ctx context.Context, msg *b
err := fp.fileManagerService.Rollback(ctx, data.InstanceID)
if err != nil {
- rollbackResponse := fp.createDataPlaneResponse(data.CorrelationID,
+ rollbackResponse := fp.createDataPlaneResponse(
+ data.CorrelationID,
mpi.CommandResponse_COMMAND_STATUS_ERROR,
- "Rollback failed", data.InstanceID, err.Error())
+ "Rollback failed",
+ data.InstanceID,
+ err.Error(),
+ )
- applyResponse := fp.createDataPlaneResponse(data.CorrelationID,
+ applyResponse := fp.createDataPlaneResponse(
+ data.CorrelationID,
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "Config apply failed, rollback failed", data.InstanceID, data.Error.Error())
+ "Config apply failed, rollback failed",
+ data.InstanceID,
+ data.Error.Error(),
+ )
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse})
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
@@ -346,7 +354,8 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Config apply failed, rollback failed",
instanceID,
- rollbackErr.Error())
+ rollbackErr.Error(),
+ )
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: rollbackResponse})
@@ -358,7 +367,8 @@ func (fp *FilePlugin) handleConfigApplyRequest(ctx context.Context, msg *bus.Mes
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Config apply failed, rollback successful",
instanceID,
- err.Error())
+ err.Error(),
+ )
fp.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: response})
@@ -416,6 +426,8 @@ func (fp *FilePlugin) handleConfigUploadRequest(ctx context.Context, msg *bus.Me
Status: mpi.CommandResponse_COMMAND_STATUS_OK,
Message: "Successfully updated all files",
},
+ InstanceId: configUploadRequest.GetOverview().GetConfigVersion().GetInstanceId(),
+ RequestType: mpi.DataPlaneResponse_CONFIG_UPLOAD_REQUEST,
}
if updatingFilesError != nil {
@@ -442,7 +454,9 @@ func (fp *FilePlugin) handleAgentConfigUpdate(ctx context.Context, msg *bus.Mess
fp.config = agentConfig
}
-func (fp *FilePlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
+func (fp *FilePlugin) createDataPlaneResponse(
+ correlationID string,
+ status mpi.CommandResponse_CommandStatus,
message, instanceID, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
@@ -456,6 +470,7 @@ func (fp *FilePlugin) createDataPlaneResponse(correlationID string, status mpi.C
Message: message,
Error: err,
},
- InstanceId: instanceID,
+ InstanceId: instanceID,
+ RequestType: mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST,
}
}
diff --git a/internal/file/file_service_operator.go b/internal/file/file_service_operator.go
index 6fce049b39..70c61129ea 100644
--- a/internal/file/file_service_operator.go
+++ b/internal/file/file_service_operator.go
@@ -265,9 +265,16 @@ func (fso *FileServiceOperator) UpdateFile(
instanceID string,
fileToUpdate *mpi.File,
) error {
- slog.InfoContext(ctx, "Updating file", "file_name", fileToUpdate.GetFileMeta().GetName(), "instance_id", instanceID)
+ slog.InfoContext(
+ ctx,
+ "Sending file updates",
+ "file_name", fileToUpdate.GetFileMeta().GetName(),
+ "instance_id", instanceID,
+ )
- slog.DebugContext(ctx, "Checking file size",
+ slog.DebugContext(
+ ctx,
+ "Checking file size",
"file_size", fileToUpdate.GetFileMeta().GetSize(),
"max_file_size", int64(fso.agentConfig.Client.Grpc.MaxFileSize),
)
@@ -283,7 +290,7 @@ func (fso *FileServiceOperator) UpdateFile(
func (fso *FileServiceOperator) RenameFile(
ctx context.Context, hash, source, desination string,
) error {
- slog.DebugContext(ctx, fmt.Sprintf("Renaming file %s to %s", source, desination))
+ slog.InfoContext(ctx, fmt.Sprintf("Renaming file %s to %s", source, desination))
// Create parent directories for the target file if they don't exist
if err := os.MkdirAll(filepath.Dir(desination), dirPerm); err != nil {
@@ -332,7 +339,7 @@ func (fso *FileServiceOperator) updateFiles(
}
iteration++
- slog.InfoContext(ctx, "Updating file overview after file updates", "attempt_number", iteration)
+ slog.InfoContext(ctx, "Sending file overview updates", "attempt_number", iteration)
return fso.UpdateOverview(ctx, instanceID, diffFiles, configPath, iteration)
}
diff --git a/internal/grpc/grpc.go b/internal/grpc/grpc.go
index 4c044d2fb1..2f369ab956 100644
--- a/internal/grpc/grpc.go
+++ b/internal/grpc/grpc.go
@@ -83,7 +83,7 @@ func NewGrpcConnection(ctx context.Context, agentConfig *config.Config,
serverAddr := serverAddress(ctx, commandConfig)
- slog.InfoContext(ctx, "Dialing grpc server", "server_addr", serverAddr)
+ slog.InfoContext(ctx, "Creating connection to management plane server", "server_address", serverAddr)
var err error
info := host.NewInfo()
@@ -217,7 +217,12 @@ func DialOptions(agentConfig *config.Config, commandConfig *config.Command, reso
// Proxy support: If proxy config exists, use HTTP CONNECT dialer
if commandConfig.Server.Proxy != nil && commandConfig.Server.Proxy.URL != "" {
opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
- slog.InfoContext(ctx, "Dialing grpc server via proxy")
+ slog.InfoContext(
+ ctx,
+ "Creating connection to management plane server via proxy",
+ "proxy_url", commandConfig.Server.Proxy.URL,
+ )
+
return DialViaHTTPProxy(ctx, commandConfig.Server.Proxy, addr)
}))
}
diff --git a/internal/plugin/plugin_manager.go b/internal/plugin/plugin_manager.go
index fde011d8ba..cb27070c00 100644
--- a/internal/plugin/plugin_manager.go
+++ b/internal/plugin/plugin_manager.go
@@ -10,6 +10,7 @@ import (
"log/slog"
"sync"
+ "github.com/nginx/agent/v3/internal/logger"
"github.com/nginx/agent/v3/internal/model"
pkg "github.com/nginx/agent/v3/pkg/config"
@@ -50,9 +51,14 @@ func addCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin, agentCo
manifestLock *sync.RWMutex,
) []bus.Plugin {
if agentConfig.IsCommandGrpcClientConfigured() {
- grpcConnection, err := grpc.NewGrpcConnection(ctx, agentConfig, agentConfig.Command)
+ newCtx := context.WithValue(
+ ctx,
+ logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, model.Command),
+ )
+
+ grpcConnection, err := grpc.NewGrpcConnection(newCtx, agentConfig, agentConfig.Command)
if err != nil {
- slog.WarnContext(ctx, "Failed to create gRPC connection for command server", "error", err)
+ slog.WarnContext(newCtx, "Failed to create gRPC connection for command server", "error", err)
} else {
commandPlugin := command.NewCommandPlugin(agentConfig, grpcConnection, model.Command)
plugins = append(plugins, commandPlugin)
@@ -71,9 +77,14 @@ func addAuxiliaryCommandAndFilePlugins(ctx context.Context, plugins []bus.Plugin
agentConfig *config.Config, manifestLock *sync.RWMutex,
) []bus.Plugin {
if agentConfig.IsAuxiliaryCommandGrpcClientConfigured() {
- auxGRPCConnection, err := grpc.NewGrpcConnection(ctx, agentConfig, agentConfig.AuxiliaryCommand)
+ newCtx := context.WithValue(
+ ctx,
+ logger.ServerTypeContextKey, slog.Any(logger.ServerTypeKey, model.Auxiliary),
+ )
+
+ auxGRPCConnection, err := grpc.NewGrpcConnection(newCtx, agentConfig, agentConfig.AuxiliaryCommand)
if err != nil {
- slog.WarnContext(ctx, "Failed to create gRPC connection for auxiliary command server", "error", err)
+ slog.WarnContext(newCtx, "Failed to create gRPC connection for auxiliary command server", "error", err)
} else {
auxCommandPlugin := command.NewCommandPlugin(agentConfig, auxGRPCConnection, model.Auxiliary)
plugins = append(plugins, auxCommandPlugin)
diff --git a/internal/resource/nginx_instance_operator.go b/internal/resource/nginx_instance_operator.go
index 2fa4c226cc..b1d4106cff 100644
--- a/internal/resource/nginx_instance_operator.go
+++ b/internal/resource/nginx_instance_operator.go
@@ -43,7 +43,7 @@ func NewInstanceOperator(agentConfig *config.Config) *NginxInstanceOperator {
}
func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Instance) error {
- slog.DebugContext(ctx, "Validating NGINX config")
+ slog.InfoContext(ctx, "Validating NGINX configuration")
exePath := instance.GetInstanceRuntime().GetBinaryPath()
out, err := i.executer.RunCmd(ctx, exePath, "-t")
@@ -56,7 +56,7 @@ func (i *NginxInstanceOperator) Validate(ctx context.Context, instance *mpi.Inst
return err
}
- slog.InfoContext(ctx, "NGINX config tested", "output", out)
+ slog.InfoContext(ctx, "NGINX configuration tested", "output", out)
return nil
}
@@ -66,8 +66,7 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan
var errorsFound error
pid := instance.GetInstanceRuntime().GetProcessId()
- slog.InfoContext(ctx, "Reloading NGINX PID", "pid",
- pid)
+ slog.InfoContext(ctx, "Reloading NGINX master process", "pid", pid)
workers := i.nginxProcessOperator.NginxWorkerProcesses(ctx, pid)
@@ -95,25 +94,25 @@ func (i *NginxInstanceOperator) Reload(ctx context.Context, instance *mpi.Instan
i.checkWorkers(ctx, instance.GetInstanceMeta().GetInstanceId(), createdTime, processes)
}
- slog.InfoContext(ctx, "NGINX reloaded", "process_id", pid)
+ slog.InfoContext(ctx, "Finished reloading NGINX master process", "process_id", pid)
numberOfExpectedMessages := len(errorLogs)
for range numberOfExpectedMessages {
logErr := <-logErrorChannel
- slog.InfoContext(ctx, "Message received in logErrorChannel", "error", logErr)
if logErr != nil {
errorsFound = errors.Join(errorsFound, logErr)
- slog.InfoContext(ctx, "Errors Found", "", errorsFound)
}
}
- slog.InfoContext(ctx, "Finished monitoring post reload")
+ slog.InfoContext(ctx, "Finished monitoring NGINX error logs after reload")
if errorsFound != nil {
return errorsFound
}
+ slog.InfoContext(ctx, "No errors found in NGINX error logs after reload")
+
return nil
}
@@ -128,7 +127,7 @@ func (i *NginxInstanceOperator) checkWorkers(ctx context.Context, instanceID str
Multiplier: i.agentConfig.DataPlaneConfig.Nginx.ReloadBackoff.Multiplier,
}
- slog.DebugContext(ctx, "Waiting for NGINX to finish reloading")
+ slog.InfoContext(ctx, "Waiting for NGINX worker processes to be reloaded")
newPid, findErr := i.nginxProcessOperator.FindParentProcessID(ctx, instanceID, processes, i.executer)
if findErr != nil {
@@ -153,12 +152,14 @@ func (i *NginxInstanceOperator) checkWorkers(ctx context.Context, instanceID str
}
}
- return fmt.Errorf("waiting for NGINX worker to be newer "+
- "than %v", createdTime)
+ return fmt.Errorf("waiting for NGINX worker to be newer than %v", createdTime)
})
if err != nil {
- slog.WarnContext(ctx, "Failed to check if NGINX worker processes have successfully reloaded, "+
- "timed out waiting", "error", err)
+ slog.WarnContext(
+ ctx,
+ "Failed to check if NGINX worker processes have successfully reloaded, timed out waiting",
+ "error", err,
+ )
return
}
@@ -197,6 +198,7 @@ func (i *NginxInstanceOperator) monitorLogs(ctx context.Context, errorLogs []str
}
for _, errorLog := range errorLogs {
+ slog.InfoContext(ctx, "Starting to monitor NGINX error log for errors", "log_file", errorLog)
go i.logTailer.Tail(ctx, errorLog, errorChannel)
}
}
diff --git a/internal/resource/nginx_plus_actions.go b/internal/resource/nginx_plus_actions.go
index a0b949f04a..33db5f2edf 100644
--- a/internal/resource/nginx_plus_actions.go
+++ b/internal/resource/nginx_plus_actions.go
@@ -21,82 +21,138 @@ type APIAction struct {
ResourceService resourceServiceInterface
}
-func (a *APIAction) HandleUpdateStreamServersRequest(ctx context.Context, action *mpi.NGINXPlusAction,
+func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
- correlationID := logger.CorrelationID(ctx)
- instanceID := instance.GetInstanceMeta().GetInstanceId()
-
- add, update, del, err := a.ResourceService.UpdateStreamServers(ctx, instance,
- action.GetUpdateStreamServers().GetUpstreamStreamName(), action.GetUpdateStreamServers().GetServers())
- if err != nil {
- slog.ErrorContext(ctx, "Unable to update stream servers of upstream", "request",
- action.GetUpdateHttpUpstreamServers(), "error", err)
-
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, err.Error())
- }
-
- slog.DebugContext(ctx, "Successfully updated stream upstream servers", "http_upstream_name",
- action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update),
- "delete", len(del))
+ return a.handleUpstreamGetRequest(
+ ctx,
+ instance,
+ func(ctx context.Context, instance *mpi.Instance) (interface{}, error) {
+ return a.ResourceService.GetStreamUpstreams(ctx, instance)
+ },
+ "Unable to get stream upstreams",
+ )
+}
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- "Successfully updated stream upstream servers", instanceID, "")
+func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse {
+ return a.handleUpstreamGetRequest(
+ ctx,
+ instance,
+ func(ctx context.Context, instance *mpi.Instance) (interface{}, error) {
+ return a.ResourceService.GetUpstreams(ctx, instance)
+ },
+ "Unable to get upstreams",
+ )
}
-func (a *APIAction) HandleGetStreamUpstreamsRequest(ctx context.Context,
+func (a *APIAction) HandleGetHTTPUpstreamsServersRequest(
+ ctx context.Context,
+ action *mpi.NGINXPlusAction,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
correlationID := logger.CorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()
- streamUpstreamsResponse := emptyResponse
+ upstreamsResponse := emptyResponse
- streamUpstreams, err := a.ResourceService.GetStreamUpstreams(ctx, instance)
+ upstreams, err := a.ResourceService.GetHTTPUpstreamServers(
+ ctx,
+ instance,
+ action.GetGetHttpUpstreamServers().GetHttpUpstreamName(),
+ )
if err != nil {
- slog.ErrorContext(ctx, "Unable to get stream upstreams", "error", err)
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, err.Error())
+ slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err)
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "",
+ Error: err.Error(),
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
- if streamUpstreams != nil {
- streamUpstreamsJSON, jsonErr := json.Marshal(streamUpstreams)
+ if upstreams != nil {
+ upstreamsJSON, jsonErr := json.Marshal(upstreams)
if jsonErr != nil {
- slog.ErrorContext(ctx, "Unable to marshal stream upstreams", "err", err)
+ slog.ErrorContext(ctx, "Unable to marshal http upstreams", "error", jsonErr)
}
- streamUpstreamsResponse = string(streamUpstreamsJSON)
+ upstreamsResponse = string(upstreamsJSON)
}
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- streamUpstreamsResponse, instanceID, "")
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_OK,
+ Message: upstreamsResponse,
+ Error: "",
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
-func (a *APIAction) HandleGetUpstreamsRequest(ctx context.Context, instance *mpi.Instance) *mpi.DataPlaneResponse {
+//nolint:dupl // Having common code duplicated for clarity and ease of maintenance
+func (a *APIAction) HandleUpdateStreamServersRequest(
+ ctx context.Context,
+ action *mpi.NGINXPlusAction,
+ instance *mpi.Instance,
+) *mpi.DataPlaneResponse {
correlationID := logger.CorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()
- upstreamsResponse := emptyResponse
- upstreams, err := a.ResourceService.GetUpstreams(ctx, instance)
+ add, update, del, err := a.ResourceService.UpdateStreamServers(
+ ctx,
+ instance,
+ action.GetUpdateStreamServers().GetUpstreamStreamName(),
+ action.GetUpdateStreamServers().GetServers(),
+ )
if err != nil {
- slog.InfoContext(ctx, "Unable to get upstreams", "error", err)
-
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, err.Error())
+ slog.ErrorContext(
+ ctx,
+ "Unable to update stream servers of upstream",
+ "request", action.GetUpdateStreamServers(),
+ "error", err,
+ )
+
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "",
+ Error: err.Error(),
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
- if upstreams != nil {
- upstreamsJSON, jsonErr := json.Marshal(upstreams)
- if jsonErr != nil {
- slog.ErrorContext(ctx, "Unable to marshal upstreams", "err", err)
- }
- upstreamsResponse = string(upstreamsJSON)
- }
-
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- upstreamsResponse, instanceID, "")
+ slog.DebugContext(
+ ctx,
+ "Successfully updated stream upstream servers",
+ "http_upstream_name", action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(),
+ "add", len(add),
+ "update", len(update),
+ "delete", len(del),
+ )
+
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_OK,
+ Message: "Successfully updated stream upstream servers",
+ Error: "",
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
-func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action *mpi.NGINXPlusAction,
+//nolint:dupl // Having common code duplicated for clarity and ease of maintenance
+func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(
+ ctx context.Context,
+ action *mpi.NGINXPlusAction,
instance *mpi.Instance,
) *mpi.DataPlaneResponse {
correlationID := logger.CorrelationID(ctx)
@@ -106,44 +162,88 @@ func (a *APIAction) HandleUpdateHTTPUpstreamsRequest(ctx context.Context, action
action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(),
action.GetUpdateHttpUpstreamServers().GetServers())
if err != nil {
- slog.ErrorContext(ctx, "Unable to update HTTP servers of upstream", "request",
- action.GetUpdateHttpUpstreamServers(), "error", err)
-
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, err.Error())
+ slog.ErrorContext(
+ ctx,
+ "Unable to update HTTP servers of upstream",
+ "request", action.GetUpdateHttpUpstreamServers(),
+ "error", err,
+ )
+
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "",
+ Error: err.Error(),
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
- slog.DebugContext(ctx, "Successfully updated http upstream servers", "http_upstream_name",
- action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(), "add", len(add), "update", len(update),
- "delete", len(del))
-
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- "Successfully updated HTTP Upstreams", instanceID, "")
+ slog.DebugContext(
+ ctx,
+ "Successfully updated http upstream servers",
+ "http_upstream_name", action.GetUpdateHttpUpstreamServers().GetHttpUpstreamName(),
+ "add", len(add),
+ "update", len(update),
+ "delete", len(del),
+ )
+
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_OK,
+ Message: "Successfully updated HTTP Upstreams",
+ Error: "",
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
-func (a *APIAction) HandleGetHTTPUpstreamsServersRequest(ctx context.Context, action *mpi.NGINXPlusAction,
+// handleUpstreamGetRequest is a generic helper function to handle GET requests for API actions
+func (a *APIAction) handleUpstreamGetRequest(
+ ctx context.Context,
instance *mpi.Instance,
+ getData func(context.Context, *mpi.Instance) (interface{}, error),
+ errorMsg string,
) *mpi.DataPlaneResponse {
correlationID := logger.CorrelationID(ctx)
instanceID := instance.GetInstanceMeta().GetInstanceId()
- upstreamsResponse := emptyResponse
+ jsonResponse := emptyResponse
- upstreams, err := a.ResourceService.GetHTTPUpstreamServers(ctx, instance,
- action.GetGetHttpUpstreamServers().GetHttpUpstreamName())
+ data, err := getData(ctx, instance)
if err != nil {
- slog.ErrorContext(ctx, "Unable to get HTTP servers of upstream", "error", err)
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, err.Error())
+ slog.ErrorContext(ctx, errorMsg, "error", err)
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "",
+ Error: err.Error(),
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
- if upstreams != nil {
- upstreamsJSON, jsonErr := json.Marshal(upstreams)
+ if data != nil {
+ dataJSON, jsonErr := json.Marshal(data)
if jsonErr != nil {
- slog.ErrorContext(ctx, "Unable to marshal http upstreams", "err", err)
+ slog.ErrorContext(ctx, "Unable to marshal data", "error", jsonErr)
}
- upstreamsResponse = string(upstreamsJSON)
+ jsonResponse = string(dataJSON)
}
- return response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- upstreamsResponse, instanceID, "")
+ return response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_OK,
+ Message: jsonResponse,
+ Error: "",
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
}
diff --git a/internal/resource/resource_plugin.go b/internal/resource/resource_plugin.go
index fcb0138f62..27c33cda7e 100644
--- a/internal/resource/resource_plugin.go
+++ b/internal/resource/resource_plugin.go
@@ -8,7 +8,6 @@ package resource
import (
"context"
"errors"
- "fmt"
"log/slog"
"sync"
@@ -189,9 +188,16 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi
}
if instance == nil {
slog.ErrorContext(ctx, "Unable to find instance with ID", "id", instanceID)
- resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, fmt.Sprintf("failed to preform API "+
- "action, could not find instance with ID: %s", instanceID))
+ resp := response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "",
+ Error: "failed to preform API action, could not find instance with ID: " + instanceID,
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
@@ -200,8 +206,16 @@ func (r *Resource) handleNginxPlusActionRequest(ctx context.Context, action *mpi
if instance.GetInstanceMeta().GetInstanceType() != mpi.InstanceMeta_INSTANCE_TYPE_NGINX_PLUS {
slog.ErrorContext(ctx, "Failed to preform API action", "error", errors.New("instance is not NGINX Plus"))
- resp := response.CreateDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "", instanceID, "failed to preform API action, instance is not NGINX Plus")
+ resp := response.CreateDataPlaneResponse(
+ correlationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "",
+ Error: "failed to preform API action, instance is not NGINX Plus",
+ },
+ mpi.DataPlaneResponse_API_ACTION_REQUEST,
+ instanceID,
+ )
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: resp})
@@ -245,19 +259,40 @@ func (r *Resource) handleWriteConfigSuccessful(ctx context.Context, msg *bus.Mes
configContext, err := r.resourceService.ApplyConfig(ctx, data.InstanceID)
if err != nil {
data.Error = err
- slog.ErrorContext(ctx, "errors found during config apply, "+
- "sending error status, rolling back config", "err", err)
- dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_ERROR,
- "Config apply failed, rolling back config", data.InstanceID, err.Error())
- r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse})
+ slog.ErrorContext(
+ ctx,
+ "Errors found during config apply, sending error status and rolling back configuration updates",
+ "error", err,
+ )
+
+ dpResponse := response.CreateDataPlaneResponse(
+ data.CorrelationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_ERROR,
+ Message: "Config apply failed, rolling back config",
+ Error: err.Error(),
+ },
+ mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST,
+ data.InstanceID,
+ )
+
+ r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: dpResponse})
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyFailedTopic, Data: data})
return
}
- dpResponse := response.CreateDataPlaneResponse(data.CorrelationID, mpi.CommandResponse_COMMAND_STATUS_OK,
- "Config apply successful", data.InstanceID, "")
+ dpResponse := response.CreateDataPlaneResponse(
+ data.CorrelationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_OK,
+ Message: "Config apply successful",
+ Error: "",
+ },
+ mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST,
+ data.InstanceID,
+ )
successMessage := &model.ReloadSuccess{
ConfigContext: configContext,
@@ -277,14 +312,29 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) {
}
_, err := r.resourceService.ApplyConfig(ctx, data.InstanceID)
if err != nil {
- slog.ErrorContext(ctx, "errors found during rollback, sending failure status", "err", err)
-
- rollbackResponse := response.CreateDataPlaneResponse(data.CorrelationID,
- mpi.CommandResponse_COMMAND_STATUS_ERROR, "Rollback failed", data.InstanceID, err.Error())
-
- applyResponse := response.CreateDataPlaneResponse(data.CorrelationID,
- mpi.CommandResponse_COMMAND_STATUS_FAILURE, "Config apply failed, rollback failed",
- data.InstanceID, data.Error.Error())
+ slog.ErrorContext(ctx, "Errors found during rollback, sending failure status", "error", err)
+
+ rollbackResponse := response.CreateDataPlaneResponse(
+ data.CorrelationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_ERROR,
+ Message: "Rollback failed",
+ Error: err.Error(),
+ },
+ mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST,
+ data.InstanceID,
+ )
+
+ applyResponse := response.CreateDataPlaneResponse(
+ data.CorrelationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "Config apply failed, rollback failed",
+ Error: data.Error.Error(),
+ },
+ mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST,
+ data.InstanceID,
+ )
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.DataPlaneResponseTopic, Data: rollbackResponse})
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
@@ -292,9 +342,16 @@ func (r *Resource) handleRollbackWrite(ctx context.Context, msg *bus.Message) {
return
}
- applyResponse := response.CreateDataPlaneResponse(data.CorrelationID,
- mpi.CommandResponse_COMMAND_STATUS_FAILURE,
- "Config apply failed, rollback successful", data.InstanceID, data.Error.Error())
+ applyResponse := response.CreateDataPlaneResponse(
+ data.CorrelationID,
+ &mpi.CommandResponse{
+ Status: mpi.CommandResponse_COMMAND_STATUS_FAILURE,
+ Message: "Config apply failed, rollback successful",
+ Error: data.Error.Error(),
+ },
+ mpi.DataPlaneResponse_CONFIG_APPLY_REQUEST,
+ data.InstanceID,
+ )
r.messagePipe.Process(ctx, &bus.Message{Topic: bus.ConfigApplyCompleteTopic, Data: applyResponse})
}
diff --git a/internal/resource/resource_service.go b/internal/resource/resource_service.go
index 00bff24e53..2ae6ede88e 100644
--- a/internal/resource/resource_service.go
+++ b/internal/resource/resource_service.go
@@ -256,8 +256,9 @@ func (r *ResourceService) GetHTTPUpstreamServers(ctx context.Context, instance *
}
servers, getServersErr := plusClient.GetHTTPServers(ctx, upstream)
-
- slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetHTTPUpstreamServers", "err", getServersErr)
+ if getServersErr != nil {
+ slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetHTTPUpstreamServers", "error", getServersErr)
+ }
return servers, createPlusAPIError(getServersErr)
}
@@ -272,7 +273,9 @@ func (r *ResourceService) GetUpstreams(ctx context.Context, instance *mpi.Instan
servers, getUpstreamsErr := plusClient.GetUpstreams(ctx)
- slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetUpstreams", "err", getUpstreamsErr)
+ if getUpstreamsErr != nil {
+ slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetUpstreams", "error", getUpstreamsErr)
+ }
return servers, createPlusAPIError(getUpstreamsErr)
}
@@ -287,7 +290,9 @@ func (r *ResourceService) GetStreamUpstreams(ctx context.Context, instance *mpi.
streamUpstreams, getServersErr := plusClient.GetStreamUpstreams(ctx)
- slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetStreamUpstreams", "err", getServersErr)
+ if getServersErr != nil {
+ slog.WarnContext(ctx, "Error returned from NGINX Plus client, GetStreamUpstreams", "error", getServersErr)
+ }
return streamUpstreams, createPlusAPIError(getServersErr)
}
@@ -308,7 +313,9 @@ func (r *ResourceService) UpdateStreamServers(ctx context.Context, instance *mpi
added, updated, deleted, updateError := plusClient.UpdateStreamServers(ctx, upstream, servers)
- slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateStreamServers", "err", updateError)
+ if updateError != nil {
+ slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateStreamServers", "error", updateError)
+ }
return added, updated, deleted, createPlusAPIError(updateError)
}
@@ -330,7 +337,7 @@ func (r *ResourceService) UpdateHTTPUpstreamServers(ctx context.Context, instanc
added, updated, deleted, updateError := plusClient.UpdateHTTPServers(ctx, upstream, servers)
if updateError != nil {
- slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "err", updateError)
+ slog.WarnContext(ctx, "Error returned from NGINX Plus client, UpdateHTTPUpstreamServers", "error", updateError)
}
return added, updated, deleted, createPlusAPIError(updateError)
diff --git a/internal/watcher/file/file_watcher_service.go b/internal/watcher/file/file_watcher_service.go
index 96c450031e..9e41dad8b3 100644
--- a/internal/watcher/file/file_watcher_service.go
+++ b/internal/watcher/file/file_watcher_service.go
@@ -144,7 +144,7 @@ func (fws *FileWatcherService) Update(ctx context.Context, nginxConfigContext *m
fws.directoriesToWatch = directoriesToWatch
if fws.watcher != nil {
- slog.InfoContext(ctx, "Updating file watcher", "allowed", fws.agentConfig.AllowedDirectories)
+ slog.DebugContext(ctx, "No watcher exists, creating new watcher")
// Start watching new directories
fws.addWatchers(ctx)
diff --git a/internal/watcher/health/health_watcher_service.go b/internal/watcher/health/health_watcher_service.go
index 9c40820369..87768f4b1a 100644
--- a/internal/watcher/health/health_watcher_service.go
+++ b/internal/watcher/health/health_watcher_service.go
@@ -51,7 +51,7 @@ func NewHealthWatcherService(agentConfig *config.Config) *HealthWatcherService {
}
}
-func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
+func (hw *HealthWatcherService) AddHealthWatcher(ctx context.Context, instances []*mpi.Instance) {
hw.healthWatcherMutex.Lock()
defer hw.healthWatcherMutex.Unlock()
@@ -66,8 +66,11 @@ func (hw *HealthWatcherService) AddHealthWatcher(instances []*mpi.Instance) {
mpi.InstanceMeta_INSTANCE_TYPE_NGINX_APP_PROTECT:
fallthrough
default:
- slog.Warn("Health watcher not implemented", "instance_type",
- instance.GetInstanceMeta().GetInstanceType())
+ slog.DebugContext(
+ ctx,
+ "Health watcher not implemented",
+ "instance_type", instance.GetInstanceMeta().GetInstanceType(),
+ )
}
hw.instances[instance.GetInstanceMeta().GetInstanceId()] = instance
}
diff --git a/internal/watcher/health/health_watcher_service_test.go b/internal/watcher/health/health_watcher_service_test.go
index b2ed9b78c9..9dc875f01d 100644
--- a/internal/watcher/health/health_watcher_service_test.go
+++ b/internal/watcher/health/health_watcher_service_test.go
@@ -47,7 +47,7 @@ func TestHealthWatcherService_AddHealthWatcher(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
healthWatcher := NewHealthWatcherService(agentConfig)
- healthWatcher.AddHealthWatcher(test.instances)
+ healthWatcher.AddHealthWatcher(t.Context(), test.instances)
if test.numWatchers == 1 {
assert.Len(t, healthWatcher.watchers, 1)
@@ -66,7 +66,7 @@ func TestHealthWatcherService_DeleteHealthWatcher(t *testing.T) {
instance := protos.NginxOssInstance([]string{})
instances := []*mpi.Instance{instance}
- healthWatcher.AddHealthWatcher(instances)
+ healthWatcher.AddHealthWatcher(t.Context(), instances)
assert.Len(t, healthWatcher.watchers, 1)
healthWatcher.DeleteHealthWatcher(instances)
@@ -82,7 +82,7 @@ func TestHealthWatcherService_UpdateHealthWatcher(t *testing.T) {
updatedInstance.GetInstanceMeta().InstanceId = instance.GetInstanceMeta().GetInstanceId()
instances := []*mpi.Instance{instance}
- healthWatcher.AddHealthWatcher(instances)
+ healthWatcher.AddHealthWatcher(t.Context(), instances)
assert.Equal(t, instance, healthWatcher.instances[instance.GetInstanceMeta().GetInstanceId()])
healthWatcher.UpdateHealthWatcher([]*mpi.Instance{updatedInstance})
diff --git a/internal/watcher/instance/instance_watcher_service.go b/internal/watcher/instance/instance_watcher_service.go
index 4d00ce70b4..0279375992 100644
--- a/internal/watcher/instance/instance_watcher_service.go
+++ b/internal/watcher/instance/instance_watcher_service.go
@@ -7,6 +7,7 @@ package instance
import (
"context"
+ "encoding/json"
"log/slog"
"slices"
"sync"
@@ -43,13 +44,14 @@ type (
processOperator process.ProcessOperatorInterface
nginxConfigParser parser.ConfigParser
executer exec.ExecInterface
+ nginxParser processParser
enabled *atomic.Bool
agentConfig *config.Config
instanceCache map[string]*mpi.Instance
nginxConfigCache map[string]*model.NginxConfigContext
instancesChannel chan<- InstanceUpdatesMessage
nginxConfigContextChannel chan<- NginxConfigContextMessage
- nginxParser processParser
+ processCache []*nginxprocess.Process
cacheMutex sync.Mutex
}
@@ -84,6 +86,7 @@ func NewInstanceWatcherService(agentConfig *config.Config) *InstanceWatcherServi
nginxConfigCache: make(map[string]*model.NginxConfigContext),
executer: &exec.Exec{},
enabled: enabled,
+ processCache: []*nginxprocess.Process{},
}
}
@@ -270,6 +273,18 @@ func (iw *InstanceWatcherService) instanceUpdates(ctx context.Context) (
return instanceUpdates, err
}
+ if !slices.EqualFunc(iw.processCache, nginxProcesses, func(a, b *nginxprocess.Process) bool {
+ return a.Equal(b)
+ }) {
+ processesJSON, marshalErr := json.Marshal(nginxProcesses)
+ if marshalErr != nil {
+ slog.DebugContext(ctx, "Unable to marshal NGINX processes", "error", marshalErr)
+ } else {
+ slog.DebugContext(ctx, "NGINX processes changed", "processes", processesJSON)
+ iw.processCache = nginxProcesses
+ }
+ }
+
// NGINX Agent is always the first instance in the list
instancesFound := make(map[string]*mpi.Instance)
agentInstance := iw.agentInstance(ctx)
diff --git a/internal/watcher/instance/nginx_process_parser.go b/internal/watcher/instance/nginx_process_parser.go
index 3da8786b85..85a7276624 100644
--- a/internal/watcher/instance/nginx_process_parser.go
+++ b/internal/watcher/instance/nginx_process_parser.go
@@ -48,24 +48,12 @@ func NewNginxProcessParser() *NginxProcessParser {
//
//nolint:revive,gocognit // cognitive complexity of 20 because of the if statements in the for loop
func (npp *NginxProcessParser) Parse(ctx context.Context, processes []*nginxprocess.Process) map[string]*mpi.Instance {
- slog.DebugContext(ctx, "Parsing NGINX processes", "number_of_processes", len(processes))
-
instanceMap := make(map[string]*mpi.Instance) // key is instanceID
workers := make(map[int32][]*mpi.InstanceChild) // key is ppid of process
processesByPID := convertToMap(processes)
for _, proc := range processesByPID {
- slog.DebugContext(ctx, "NGINX process details",
- "ppid", proc.PPID,
- "pid", proc.PID,
- "name", proc.Name,
- "created", proc.Created,
- "status", proc.Status,
- "cmd", proc.Cmd,
- "exe", proc.Exe,
- )
-
if proc.IsWorker() {
// Here we are determining if the worker process has a master
if masterProcess, ok := processesByPID[proc.PPID]; ok {
diff --git a/internal/watcher/watcher_plugin.go b/internal/watcher/watcher_plugin.go
index 8f19f92266..75cdf2ab06 100644
--- a/internal/watcher/watcher_plugin.go
+++ b/internal/watcher/watcher_plugin.go
@@ -306,7 +306,7 @@ func (w *Watcher) handleCredentialUpdate(ctx context.Context, message credential
func (w *Watcher) handleInstanceUpdates(newCtx context.Context, message instance.InstanceUpdatesMessage) {
if len(message.InstanceUpdates.NewInstances) > 0 {
slog.DebugContext(newCtx, "New instances found", "instances", message.InstanceUpdates.NewInstances)
- w.healthWatcherService.AddHealthWatcher(message.InstanceUpdates.NewInstances)
+ w.healthWatcherService.AddHealthWatcher(newCtx, message.InstanceUpdates.NewInstances)
w.messagePipe.Process(
newCtx,
&bus.Message{Topic: bus.AddInstancesTopic, Data: message.InstanceUpdates.NewInstances},
diff --git a/pkg/nginxprocess/process.go b/pkg/nginxprocess/process.go
index 2224916b08..4c18883fb3 100644
--- a/pkg/nginxprocess/process.go
+++ b/pkg/nginxprocess/process.go
@@ -18,13 +18,13 @@ import (
type Process struct {
// Created is when this process was created, precision varies by platform and is at best to the millisecond. On
// linux there can be significant skew compared to [time.Now], ± 1s.
- Created time.Time
- Name string
- Cmd string
- Exe string // path to the executable
- Status string // process status, only present if this process was created using [WithStatus]
- PID int32
- PPID int32 // parent PID
+ Created time.Time `json:"created"`
+ Name string `json:"name"`
+ Cmd string `json:"cmd"`
+ Exe string `json:"exe"` // path to the executable
+ Status string `json:"status"` // process status, only present if this process was created using [WithStatus]
+ PID int32 `json:"pid"`
+ PPID int32 `json:"ppid"` // parent PID
}
// IsWorker returns true if the process is a NGINX worker process.
@@ -50,6 +50,11 @@ func (p *Process) IsHealthy() bool {
return p.Status != "" && !strings.Contains(p.Status, process.Zombie)
}
+func (p *Process) Equal(b *Process) bool {
+ return p.PID == b.PID && p.Name == b.Name && p.PPID == b.PPID && p.Cmd == b.Cmd &&
+ p.Exe == b.Exe && p.Created.Equal(b.Created) && p.Status == b.Status
+}
+
type options struct {
loadStatus bool
}