Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
355 changes: 220 additions & 135 deletions api/grpc/mpi/v1/command.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions api/grpc/mpi/v1/command.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/grpc/mpi/v1/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,25 @@ message UpdateDataPlaneHealthResponse {}

// Reports the status of an associated command. This may be in response to a ManagementPlaneRequest
message DataPlaneResponse {
enum RequestType {
UNSPECIFIED_REQUEST = 0;
CONFIG_APPLY_REQUEST = 1;
CONFIG_UPLOAD_REQUEST = 2;
HEALTH_REQUEST = 3;
STATUS_REQUEST = 4;
API_ACTION_REQUEST = 5;
COMMAND_STATUS_REQUEST = 6;
UPDATE_AGENT_CONFIG_REQUEST = 7;
}

// Meta-information associated with a message
mpi.v1.MessageMeta message_meta = 1;
// The command response with the associated request
mpi.v1.CommandResponse command_response = 2;
// The instance identifier, if applicable, for this response
string instance_id = 3;
// The management plane request type that is being responded to
RequestType request_type = 4;
}

// A Management Plane request for information, triggers an associated rpc on the Data Plane
Expand Down
2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/files.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions docs/proto/protos.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
- [UpdateHTTPUpstreamServers](#mpi-v1-UpdateHTTPUpstreamServers)
- [UpdateStreamServers](#mpi-v1-UpdateStreamServers)

- [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType)
- [InstanceHealth.InstanceHealthStatus](#mpi-v1-InstanceHealth-InstanceHealthStatus)
- [InstanceMeta.InstanceType](#mpi-v1-InstanceMeta-InstanceType)
- [Log.LogLevel](#mpi-v1-Log-LogLevel)
Expand Down Expand Up @@ -862,6 +863,7 @@ Reports the status of an associated command. This may be in response to a Manage
| message_meta | [MessageMeta](#mpi-v1-MessageMeta) | | Meta-information associated with a message |
| command_response | [CommandResponse](#mpi-v1-CommandResponse) | | The command response with the associated request |
| instance_id | [string](#string) | | The instance identifier, if applicable, for this response |
| request_type | [DataPlaneResponse.RequestType](#mpi-v1-DataPlaneResponse-RequestType) | | The management plane request type that is being responded to |



Expand Down Expand Up @@ -1326,6 +1328,24 @@ Update Upstream Stream Servers for an instance



<a name="mpi-v1-DataPlaneResponse-RequestType"></a>

### DataPlaneResponse.RequestType


| Name | Number | Description |
| ---- | ------ | ----------- |
| UNSPECIFIED_REQUEST | 0 | |
| CONFIG_APPLY_REQUEST | 1 | |
| CONFIG_UPLOAD_REQUEST | 2 | |
| HEALTH_REQUEST | 3 | |
| STATUS_REQUEST | 4 | |
| API_ACTION_REQUEST | 5 | |
| COMMAND_STATUS_REQUEST | 6 | |
| UPDATE_AGENT_CONFIG_REQUEST | 7 | |



<a name="mpi-v1-InstanceHealth-InstanceHealthStatus"></a>

### InstanceHealth.InstanceHealthStatus
Expand Down
24 changes: 19 additions & 5 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon

// If the agent update was received from a create connection request no data plane response needs to be sent
if topic == AgentConfigUpdateTopic {
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Failed to update agent config", reconfigureError.Error())
response := p.createDataPlaneResponse(
correlationID,
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST,
"Failed to update agent config",
reconfigureError.Error(),
)
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}

Expand All @@ -222,8 +227,13 @@ func (p *MessagePipe) Reconfigure(ctx context.Context, agentConfig *mpi.AgentCon

slog.InfoContext(ctx, "Finished reconfiguring plugins", "plugins", p.plugins)
if topic == AgentConfigUpdateTopic {
response := p.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully updated agent config", "")
response := p.createDataPlaneResponse(
correlationID,
mpi.CommandResponse_COMMAND_STATUS_OK,
mpi.DataPlaneResponse_UPDATE_AGENT_CONFIG_REQUEST,
"Successfully updated agent config",
"",
)
p.bus.Publish(DataPlaneResponseTopic, ctx, &Message{Topic: DataPlaneResponseTopic, Data: response})
}
}
Expand Down Expand Up @@ -339,7 +349,10 @@ func (p *MessagePipe) initPlugins(ctx context.Context) {
}
}

func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
func (p *MessagePipe) createDataPlaneResponse(
correlationID string,
status mpi.CommandResponse_CommandStatus,
requestType mpi.DataPlaneResponse_RequestType,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
Expand All @@ -353,6 +366,7 @@ func (p *MessagePipe) createDataPlaneResponse(correlationID string, status mpi.C
Message: message,
Error: err,
},
RequestType: requestType,
}
}

Expand Down
58 changes: 31 additions & 27 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
return errors.New("OTel collector already running")
}

slog.InfoContext(ctx, "Starting OTel collector")
bootErr := oc.bootup(runCtx)
if bootErr != nil {
slog.ErrorContext(runCtx, "Unable to start OTel Collector", "error", bootErr)
Expand All @@ -163,30 +162,7 @@ func (oc *Collector) Info() *bus.Info {
func (oc *Collector) Close(ctx context.Context) error {
slog.InfoContext(ctx, "Closing OTel Collector plugin")

if !oc.stopped {
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
oc.service.Shutdown()
oc.cancel()

settings := *oc.config.Client.Backoff
settings.MaxElapsedTime = maxTimeToWaitForShutdown
err := backoff.WaitUntil(ctx, &settings, func() error {
if oc.service.GetState() == otelcol.StateClosed {
return nil
}

return errors.New("OTel Collector not in a closed state yet")
})

if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
} else {
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
oc.stopped = true
}
}

return nil
return oc.shutdownCollector(ctx)
}

// Process an incoming Message Bus message in the plugin
Expand Down Expand Up @@ -236,6 +212,33 @@ func (oc *Collector) Reconfigure(ctx context.Context, agentConfig *config.Config
return nil
}

func (oc *Collector) shutdownCollector(ctx context.Context) error {
if !oc.stopped {
slog.InfoContext(ctx, "Shutting down OTel Collector", "state", oc.service.GetState())
oc.service.Shutdown()
oc.cancel()

settings := *oc.config.Client.Backoff
settings.MaxElapsedTime = maxTimeToWaitForShutdown
err := backoff.WaitUntil(ctx, &settings, func() error {
if oc.service.GetState() == otelcol.StateClosed {
return nil
}

return errors.New("OTel Collector not in a closed state yet")
})

if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err, "state", oc.service.GetState())
} else {
slog.InfoContext(ctx, "OTel Collector shutdown", "state", oc.service.GetState())
oc.stopped = true
}
}

return nil
}

// Process receivers and log warning for sub-optimal configurations
func (oc *Collector) processReceivers(ctx context.Context, receivers map[string]*config.OtlpReceiver) {
for _, receiver := range receivers {
Expand Down Expand Up @@ -275,11 +278,12 @@ func (oc *Collector) bootup(ctx context.Context) error {
oc.setProxyIfNeeded(ctx)
}

slog.InfoContext(ctx, "Starting OTel collector")
appErr := oc.service.Run(ctx)
if appErr != nil {
errChan <- appErr
}
slog.InfoContext(ctx, "OTel collector run finished")
slog.InfoContext(ctx, "OTel collector has stopped running")
}()

for {
Expand Down Expand Up @@ -453,7 +457,7 @@ func (oc *Collector) writeRunningConfig(ctx context.Context, settings otelcol.Co
}

func (oc *Collector) restartCollector(ctx context.Context) {
err := oc.Close(ctx)
err := oc.shutdownCollector(ctx)
if err != nil {
slog.ErrorContext(ctx, "Failed to shutdown OTel Collector", "error", err)
return
Expand Down
8 changes: 4 additions & 4 deletions internal/collector/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,20 @@ func createFile(confPath string) error {

// Generates an OTel Collector config to a file by injecting the Metrics Config to a Go template.
func writeCollectorConfig(conf *config.Collector) error {
confPath := filepath.Clean(conf.ConfigPath)

slog.Info("Writing OTel collector config", "config_path", confPath)

if conf.Processors.Resource["default"] != nil {
addDefaultResourceProcessor(conf.Pipelines.Metrics)
addDefaultResourceProcessor(conf.Pipelines.Logs)
}

slog.Info("Writing OTel collector config")

otelcolTemplate, templateErr := template.New(otelTemplatePath).Parse(otelcolTemplate)
if templateErr != nil {
return templateErr
}

confPath := filepath.Clean(conf.ConfigPath)

// Ensure file exists and has correct permissions
if err := ensureFileExists(confPath); err != nil {
return err
Expand Down
48 changes: 40 additions & 8 deletions internal/command/command_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,24 @@ func (cp *CommandPlugin) processDataPlaneHealth(ctx context.Context, msg *bus.Me

cp.processDataPlaneResponse(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_FAILURE,
"Failed to send the health status update", err.Error()),
Data: cp.createDataPlaneResponse(
correlationID,
mpi.CommandResponse_COMMAND_STATUS_FAILURE,
mpi.DataPlaneResponse_HEALTH_REQUEST,
"Failed to send the health status update",
err.Error(),
),
})
}
cp.processDataPlaneResponse(ctx, &bus.Message{
Topic: bus.DataPlaneResponseTopic,
Data: cp.createDataPlaneResponse(correlationID, mpi.CommandResponse_COMMAND_STATUS_OK,
"Successfully sent health status update", ""),
Data: cp.createDataPlaneResponse(
correlationID,
mpi.CommandResponse_COMMAND_STATUS_OK,
mpi.DataPlaneResponse_HEALTH_REQUEST,
"Successfully sent health status update",
"",
),
})
}
}
Expand All @@ -242,8 +252,25 @@ func (cp *CommandPlugin) processInstanceHealth(ctx context.Context, msg *bus.Mes
func (cp *CommandPlugin) processDataPlaneResponse(ctx context.Context, msg *bus.Message) {
slog.DebugContext(ctx, "Command plugin received data plane response message")
if response, ok := msg.Data.(*mpi.DataPlaneResponse); ok {
slog.InfoContext(ctx, "Sending data plane response message", "message",
response.GetCommandResponse().GetMessage(), "status", response.GetCommandResponse().GetStatus())
// To prevent this type of request from spamming the logs too much, we use debug level
if response.GetRequestType() != mpi.DataPlaneResponse_HEALTH_REQUEST {
slog.InfoContext(
ctx,
"Sending data plane response message",
"message", response.GetCommandResponse().GetMessage(),
"status", response.GetCommandResponse().GetStatus(),
"error", response.GetCommandResponse().GetError(),
)
} else {
slog.DebugContext(
ctx,
"Sending data plane response message",
"message", response.GetCommandResponse().GetMessage(),
"status", response.GetCommandResponse().GetStatus(),
"error", response.GetCommandResponse().GetError(),
)
}

err := cp.commandService.SendDataPlaneResponse(ctx, response)
if err != nil {
slog.ErrorContext(ctx, "Unable to send data plane response", "error", err)
Expand Down Expand Up @@ -318,7 +345,8 @@ func (cp *CommandPlugin) monitorSubscribeChannel(ctx context.Context) {
slog.InfoContext(ctx, "Received management plane config apply request")
cp.handleConfigApplyRequest(newCtx, message)
case *mpi.ManagementPlaneRequest_HealthRequest:
slog.InfoContext(ctx, "Received management plane health request")
// To prevent this type of request from spamming the logs too much, we use debug level
slog.DebugContext(ctx, "Received management plane health request")
cp.handleHealthRequest(newCtx)
case *mpi.ManagementPlaneRequest_ActionRequest:
if cp.commandServerType != model.Command {
Expand Down Expand Up @@ -445,7 +473,10 @@ func (cp *CommandPlugin) handleInvalidRequest(ctx context.Context,
}
}

func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
func (cp *CommandPlugin) createDataPlaneResponse(
correlationID string,
status mpi.CommandResponse_CommandStatus,
requestType mpi.DataPlaneResponse_RequestType,
message, err string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
Expand All @@ -459,5 +490,6 @@ func (cp *CommandPlugin) createDataPlaneResponse(correlationID string, status mp
Message: message,
Error: err,
},
RequestType: requestType,
}
}
1 change: 1 addition & 0 deletions internal/command/command_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ func Test_createDataPlaneResponse(t *testing.T) {
commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}, model.Command)
result := commandPlugin.createDataPlaneResponse(expected.GetMessageMeta().GetCorrelationId(),
expected.GetCommandResponse().GetStatus(),
expected.GetRequestType(),
expected.GetCommandResponse().GetMessage(), expected.GetCommandResponse().GetError())

assert.Equal(t, expected.GetCommandResponse(), result.GetCommandResponse())
Expand Down
2 changes: 1 addition & 1 deletion internal/command/command_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (cs *CommandService) handleSubscribeError(ctx context.Context, err error, e
return nil
}

slog.ErrorContext(ctx, "Failed to"+errorMsg, "error", err)
slog.ErrorContext(ctx, "Failed to "+errorMsg, "error", err)

return err
}
Expand Down
16 changes: 8 additions & 8 deletions internal/datasource/proto/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,20 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
)

func CreateDataPlaneResponse(correlationID string, status mpi.CommandResponse_CommandStatus,
message, instanceID, err string,
func CreateDataPlaneResponse(
correlationID string,
commandResponse *mpi.CommandResponse,
requestType mpi.DataPlaneResponse_RequestType,
instanceID string,
) *mpi.DataPlaneResponse {
return &mpi.DataPlaneResponse{
MessageMeta: &mpi.MessageMeta{
MessageId: agentid.GenerateMessageID(),
CorrelationId: correlationID,
Timestamp: timestamppb.Now(),
},
CommandResponse: &mpi.CommandResponse{
Status: status,
Message: message,
Error: err,
},
InstanceId: instanceID,
CommandResponse: commandResponse,
InstanceId: instanceID,
RequestType: requestType,
}
}
Loading
Loading