Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add k8sattributesprocessor to otlp pipeline with workload type detection #1524

Open
wants to merge 12 commits into
base: feature-custom-metrics-entity
Choose a base branch
from
1 change: 1 addition & 0 deletions cmd/config-translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func initFlags() {
mode := translatorUtil.DetectAgentMode(*inputMode)
ctx.SetMode(mode)
ctx.SetKubernetesMode(translatorUtil.DetectKubernetesMode(mode))
ctx.SetWorkloadType(translatorUtil.DetectWorkloadType())
}

/**
Expand Down
6 changes: 6 additions & 0 deletions translator/config/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ const (
ShortModeK8sEC2 = "K8E"
ShortModeK8sOnPrem = "K8OP"
)

const (
DaemonSet = "DaemonSet"
Deployment = "Deployment"
StatefulSet = "StatefulSet"
)
18 changes: 18 additions & 0 deletions translator/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Context struct {
outputTomlFilePath string
mode string
kubernetesMode string
workloadType string
shortMode string
credentials map[string]string
proxy map[string]string
Expand Down Expand Up @@ -99,6 +100,10 @@ func (ctx *Context) KubernetesMode() string {
return ctx.kubernetesMode
}

func (ctx *Context) WorkloadType() string {
return ctx.workloadType
}

func (ctx *Context) ShortMode() string {
return ctx.shortMode
}
Expand Down Expand Up @@ -150,6 +155,19 @@ func (ctx *Context) SetKubernetesMode(mode string) {
}
}

func (ctx *Context) SetWorkloadType(workloadType string) {
switch workloadType {
case config.DaemonSet:
ctx.workloadType = config.DaemonSet
case config.Deployment:
ctx.workloadType = config.Deployment
case config.StatefulSet:
ctx.workloadType = config.StatefulSet
default:
ctx.workloadType = ""
}
}

func (ctx *Context) SetCredentials(creds map[string]string) {
ctx.credentials = creds
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,34 @@ processors:
match_type: ""
initial_value: 2
max_staleness: 0s
k8sattributes/hostOtlpMetrics/cloudwatchlogs:
auth_type: serviceAccount
context: ""
exclude:
pods:
- name: jaeger-agent
- name: jaeger-collector
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
filter:
namespace: ""
node: ""
node_from_env_var: K8S_NODE_NAME
kube_config_path: ""
passthrough: false
pod_association:
- sources:
- from: connection
name: ""
receivers:
otlp/metrics:
protocols:
Expand Down Expand Up @@ -121,8 +149,9 @@ service:
exporters:
- awsemf
processors:
- awsentity/service/otlp
- cumulativetodelta/hostOtlpMetrics/cloudwatchlogs
- k8sattributes/hostOtlpMetrics/cloudwatchlogs
- awsentity/service/otlp
- batch/hostOtlpMetrics/cloudwatchlogs
receivers:
- otlp/metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,34 @@ processors:
imds_retries: 1
middleware: agenthealth/statuscode
refresh_interval_seconds: 0s
k8sattributes/hostOtlpMetrics:
auth_type: serviceAccount
context: ""
exclude:
pods:
- name: jaeger-agent
- name: jaeger-collector
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
filter:
namespace: ""
node: ""
node_from_env_var: K8S_NODE_NAME
kube_config_path: ""
passthrough: false
pod_association:
- sources:
- from: connection
name: ""
receivers:
otlp/metrics:
protocols:
Expand Down Expand Up @@ -106,9 +134,10 @@ service:
exporters:
- awscloudwatch
processors:
- awsentity/service/otlp
- cumulativetodelta/hostOtlpMetrics
- ec2tagger
- k8sattributes/hostOtlpMetrics
- awsentity/service/otlp
receivers:
- otlp/metrics
telemetry:
Expand Down
2 changes: 2 additions & 0 deletions translator/tocwconfig/tocwconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func TestOtlpMetricsConfigKubernetes(t *testing.T) {
resetContext(t)
context.CurrentContext().SetMode(config.ModeEC2)
context.CurrentContext().SetKubernetesMode(config.ModeK8sEC2)
context.CurrentContext().SetWorkloadType(config.DaemonSet)
context.CurrentContext().SetRunInContainer(true)
checkTranslation(t, "otlp_metrics_eks_config", "linux", nil, "")
checkTranslation(t, "otlp_metrics_eks_config", "darwin", nil, "")
Expand All @@ -315,6 +316,7 @@ func TestOtlpMetricsEmfConfigKubernetes(t *testing.T) {
resetContext(t)
context.CurrentContext().SetMode(config.ModeEC2)
context.CurrentContext().SetKubernetesMode(config.ModeK8sEC2)
context.CurrentContext().SetWorkloadType(config.DaemonSet)
context.CurrentContext().SetRunInContainer(true)
t.Setenv(config.HOST_NAME, "host_name_from_env")
checkTranslation(t, "otlp_metrics_cloudwatchlogs_eks_config", "linux", nil, "")
Expand Down
2 changes: 2 additions & 0 deletions translator/translate/otel/pipeline/host/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/batchprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/cumulativetodeltaprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/ec2taggerprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/k8sattributesprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/metricsdecorator"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/processor/rollupprocessor"
"github.com/aws/amazon-cloudwatch-agent/translator/util/ecsutil"
Expand Down Expand Up @@ -106,6 +107,7 @@ func (t translator) Translate(conf *confmap.Conf) (*common.ComponentTranslators,
case common.PipelineNameHostOtlpMetrics:
// TODO: For OTLP, the entity processor is only on K8S for now. Eventually this should be added to EC2
if currentContext.KubernetesMode() != "" {
translators.Processors.Set(k8sattributesprocessor.NewTranslatorWithName(t.name))
entityProcessor = awsentity.NewTranslatorWithEntityType(awsentity.Service, common.OtlpKey, false)
}
case common.PipelineNameHostCustomMetrics:
Expand Down
4 changes: 2 additions & 2 deletions translator/translate/otel/pipeline/host/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/hostOtlpMetrics",
receivers: []string{"nop", "other"},
processors: []string{"cumulativetodelta/hostOtlpMetrics", "awsentity/service/otlp"},
processors: []string{"cumulativetodelta/hostOtlpMetrics", "k8sattributes/hostOtlpMetrics", "awsentity/service/otlp"},
exporters: []string{"awscloudwatch"},
extensions: []string{"agenthealth/metrics", "agenthealth/statuscode"},
},
Expand Down Expand Up @@ -190,7 +190,7 @@ func TestTranslator(t *testing.T) {
want: &want{
pipelineID: "metrics/hostOtlpMetrics/cloudwatchlogs",
receivers: []string{"nop", "other"},
processors: []string{"cumulativetodelta/hostOtlpMetrics/cloudwatchlogs", "awsentity/service/otlp", "batch/hostOtlpMetrics/cloudwatchlogs"},
processors: []string{"cumulativetodelta/hostOtlpMetrics/cloudwatchlogs", "k8sattributes/hostOtlpMetrics/cloudwatchlogs", "awsentity/service/otlp", "batch/hostOtlpMetrics/cloudwatchlogs"},
exporters: []string{"awsemf"},
extensions: []string{"agenthealth/logs", "agenthealth/statuscode"},
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pod_association:
- sources:
- from: connection
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
filter:
node_from_env_var: K8S_NODE_NAME
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pod_association:
- sources:
- from: connection
extract:
metadata:
- k8s.namespace.name
- k8s.pod.name
- k8s.replicaset.name
- k8s.deployment.name
- k8s.daemonset.name
- k8s.statefulset.name
- k8s.cronjob.name
- k8s.job.name
- k8s.node.name
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT

package k8sattributesprocessor

import (
_ "embed"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/processor"

"github.com/aws/amazon-cloudwatch-agent/translator/config"
"github.com/aws/amazon-cloudwatch-agent/translator/context"
"github.com/aws/amazon-cloudwatch-agent/translator/translate/otel/common"
)

//go:embed k8sattributes_agent.yaml
var k8sAttributesAgentConfig string

//go:embed k8sattributes_gateway.yaml
var k8sAttributesGatewayConfig string

type translator struct {
name string
factory processor.Factory
}

var _ common.Translator[component.Config] = (*translator)(nil)

func NewTranslatorWithName(name string) common.Translator[component.Config] {
return &translator{name, k8sattributesprocessor.NewFactory()}
}

func (t *translator) ID() component.ID {
return component.NewIDWithName(t.factory.Type(), t.name)
}

func (t *translator) Translate(_ *confmap.Conf) (component.Config, error) {
cfg := t.factory.CreateDefaultConfig().(*k8sattributesprocessor.Config)
currentContext := context.CurrentContext()

if currentContext.KubernetesMode() == "" {
return nil, fmt.Errorf("k8sattributesprocessor is not supported in this context")
}

switch workloadType := currentContext.WorkloadType(); workloadType {
case config.DaemonSet:
return common.GetYamlFileToYamlConfig(cfg, k8sAttributesAgentConfig)
case config.Deployment, config.StatefulSet:
return common.GetYamlFileToYamlConfig(cfg, k8sAttributesGatewayConfig)
default:
return nil, fmt.Errorf("k8sattributesprocessor is not supported for workload type: %s", workloadType)
}
}
Loading