Skip to content

Commit

Permalink
Network: Provide a default set of allowed_attributes (#682)
Browse files Browse the repository at this point in the history
* Network: Provide a default set of allowed_attributes

* Fix integration tests

* fix integration tests

* fix again integration tests
  • Loading branch information
mariomac authored Mar 15, 2024
1 parent 28cb938 commit f205789
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 95 deletions.
2 changes: 2 additions & 0 deletions docs/sources/network/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ By default, only the following attributes are reported: `k8s.src.owner.name`, `k
| `k8s.dst.namespace` | Kubernetes namespace of the destination of the flow |
| `k8s.src.name` | Name of the source Pod, Service, or Node |
| `k8s.dst.name` | Name of the destination Pod, Service, or Node |
| `k8s.src.type` | Type of the source: `Pod`, `Node`, or `Service` |
| `k8s.src.type` | Type of the destination: `Pod`, `Node`, or `Service` |
| `k8s.src.owner.name` | Name of the owner of the source Pod. If there is no owner, the Pod name is used |
| `k8s.dst.owner.name` | Name of the owner of the destination Pod. If there is no owner, the Pod name is used |
| `k8s.src.owner.type` | Type of the owner of the source Pod: `Deployment`, `DaemonSet`, `ReplicaSet`, `StatefulSet`, or `Pod` if there is no owner |
Expand Down
5 changes: 5 additions & 0 deletions pkg/beyla/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ func (c *Config) Validate() error {
return ConfigError("you need to define at least one exporter: print_traces," +
" grafana, otel_metrics_export, otel_traces_export or prometheus_export")
}

if c.Enabled(FeatureNetO11y) {
return c.NetworkFlows.Validate(c.Attributes.Kubernetes.Enabled())
}

return nil
}

Expand Down
46 changes: 46 additions & 0 deletions pkg/beyla/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,52 @@ discovery:
}
}

func TestConfigValidate_Network_Kube(t *testing.T) {
userConfig := bytes.NewBufferString(`
otel_metrics_export:
endpoint: http://otelcol:4318
attributes:
kubernetes:
enable: true
network:
enable: true
allowed_attributes:
- k8s.src.name
- k8s.dst.name
`)
cfg, err := LoadConfig(userConfig)
require.NoError(t, err)
require.NoError(t, cfg.Validate())
}

func TestConfigValidate_Network_Empty_Attrs(t *testing.T) {
userConfig := bytes.NewBufferString(`
otel_metrics_export:
endpoint: http://otelcol:4318
network:
enable: true
allowed_attributes: []
`)
cfg, err := LoadConfig(userConfig)
require.NoError(t, err)
require.Error(t, cfg.Validate())
}

func TestConfigValidate_Network_NotKube(t *testing.T) {
userConfig := bytes.NewBufferString(`
otel_metrics_export:
endpoint: http://otelcol:4318
network:
enable: true
allowed_attributes:
- k8s.src.name
- k8s.dst.name
`)
cfg, err := LoadConfig(userConfig)
require.NoError(t, err)
require.Error(t, cfg.Validate())
}

func loadConfig(t *testing.T, env map[string]string) *Config {
for k, v := range env {
require.NoError(t, os.Setenv(k, v))
Expand Down
35 changes: 35 additions & 0 deletions pkg/beyla/network_cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package beyla

import (
"errors"
"log/slog"
"strings"
"time"

"github.com/grafana/beyla/pkg/internal/netolly/flow"
Expand Down Expand Up @@ -122,9 +125,41 @@ var defaultNetworkConfig = NetworkConfig{
Direction: "both",
ListenInterfaces: "watch",
ListenPollPeriod: 10 * time.Second,
AllowedAttributes: []string{
"k8s.src.owner.name",
"k8s.src.namespace",
"k8s.dst.owner.name",
"k8s.dst.namespace",
"k8s.cluster.name",
},
ReverseDNS: flow.ReverseDNS{
Type: flow.ReverseDNSNone,
CacheLen: 256,
CacheTTL: time.Hour,
},
}

func (nc *NetworkConfig) Validate(isKubeEnabled bool) error {
if len(nc.AllowedAttributes) == 0 {
return errors.New("you must define some attributes in the allowed_attributes section. Please ceck documentation")
}
if isKubeEnabled {
return nil
}

actualAllowed := 0
for _, attr := range nc.AllowedAttributes {
if !strings.HasPrefix(attr, "k8s.") {
actualAllowed++
}
}
if actualAllowed == 0 {
return errors.New("allowed_attributes section (or its default) is only allowing Kubernetes metric attributes. " +
" You must define non-Kubernetes attributes there, or set BEYLA_KUBE_METADATA_ENABLE to true. Please check documentation")
}
if actualAllowed < len(nc.AllowedAttributes) {
slog.Warn("Network configuration allowed_attributes section is defining some Kubernetes attributes but " +
" Kubernetes metadata is disabled. Maybe you forgot to set BEYLA_KUBE_METADATA_ENABLE to true?")
}
return nil
}
4 changes: 0 additions & 4 deletions pkg/internal/netolly/ebpf/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ type RecordAttrs struct {
// - IP
SrcName string
DstName string
// SrcNamespace and DstNamespace might be empty, but they are required by
// asserts. TODO: let user override them
SrcNamespace string
DstNamespace string

Interface string
// BeylaIP provides information about the source of the flow (the Agent that traced it)
Expand Down
62 changes: 16 additions & 46 deletions pkg/internal/netolly/export/attributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,70 +5,40 @@ import "go.opentelemetry.io/otel/attribute"
// AttributesFilter controls which attributes are added
// to a metric
type AttributesFilter struct {
newSet func() Attributes
allowed map[string]struct{}
}

// Attributes set. Each metric must create its own instance
// Attributes filtered set. Each metric instance must create its own instance
// by means of AttributesFilter.New()
type Attributes interface {
PutString(key, value string)
Slice() []attribute.KeyValue
type Attributes struct {
allowed map[string]struct{}
list []attribute.KeyValue
}

// NewAttributesFilter creates an AttributesFilter that would filter
// the attributes not contained in the allowed list.
// If the allowed list is empty, it won't filter any attribute.
func NewAttributesFilter(allowed []string) AttributesFilter {
if len(allowed) == 0 {
return AttributesFilter{newSet: newUnfilteredSet}
}
return AttributesFilter{newSet: newFilteredSet(allowed)}
}

func (a *AttributesFilter) New() Attributes {
return a.newSet()
}

func newFilteredSet(allowed []string) func() Attributes {
allowedSet := make(map[string]struct{}, len(allowed))
for _, n := range allowed {
allowedSet[n] = struct{}{}
}
return func() Attributes {
return &filteredSet{
allowed: allowedSet,
list: make([]attribute.KeyValue, 0, len(allowed)),
}
}
}

type filteredSet struct {
allowed map[string]struct{}
list []attribute.KeyValue
return AttributesFilter{allowed: allowedSet}
}

func (f *filteredSet) PutString(key, value string) {
if _, ok := f.allowed[key]; ok {
f.list = append(f.list, attribute.String(key, value))
func (af *AttributesFilter) New() Attributes {
return Attributes{
allowed: af.allowed,
list: make([]attribute.KeyValue, 0, len(af.allowed)),
}
}

func (f *filteredSet) Slice() []attribute.KeyValue {
return f.list
}

func newUnfilteredSet() Attributes {
return &unfilteredSet{}
}

type unfilteredSet struct {
list []attribute.KeyValue
}

func (u *unfilteredSet) PutString(key, value string) {
u.list = append(u.list, attribute.String(key, value))
func (a *Attributes) PutString(key, value string) {
if _, ok := a.allowed[key]; ok {
a.list = append(a.list, attribute.String(key, value))
}
}

func (u *unfilteredSet) Slice() []attribute.KeyValue {
return u.list
func (a *Attributes) Slice() []attribute.KeyValue {
return a.list
}
6 changes: 1 addition & 5 deletions pkg/internal/netolly/export/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ func (me *metricsExporter) attributes(m *ebpf.Record) []attribute.KeyValue {
attrs.PutString("src.address", m.Id.SrcIP().IP().String())
attrs.PutString("dst.address", m.Id.DstIP().IP().String())
attrs.PutString("src.name", m.Attrs.SrcName)
attrs.PutString("src.namespace", m.Attrs.SrcNamespace)
attrs.PutString("dst.name", m.Attrs.DstName)
attrs.PutString("dst.namespace", m.Attrs.DstNamespace)

// direction and interface will be only set if the user disabled
// the flow deduplication node
Expand Down Expand Up @@ -130,9 +128,7 @@ func MetricsExporterProvider(cfg MetricsConfig) (node.TerminalFunc[[]*ebpf.Recor
log.Error("", "error", err)
return nil, err
}
if len(cfg.AllowedAttributes) > 0 {
log.Debug("restricting attributes not in this list", "attributes", cfg.AllowedAttributes)
}
log.Debug("restricting attributes not in this list", "attributes", cfg.AllowedAttributes)
return (&metricsExporter{
flowBytes: flowBytes,
attrs: NewAttributesFilter(cfg.AllowedAttributes),
Expand Down
21 changes: 8 additions & 13 deletions pkg/internal/netolly/export/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ func TestMetricAttributes(t *testing.T) {
},
},
Attrs: ebpf.RecordAttrs{
SrcName: "srcname",
SrcNamespace: "srcnamespace",
DstName: "dstname",
DstNamespace: "dstnamespace",
SrcName: "srcname",
DstName: "dstname",
Metadata: map[string]string{
"k8s.src.name": "srcname",
"k8s.src.namespace": "srcnamespace",
Expand All @@ -33,15 +31,16 @@ func TestMetricAttributes(t *testing.T) {
in.Id.SrcIp.In6U.U6Addr8 = [16]uint8{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 12, 34, 56, 78}
in.Id.DstIp.In6U.U6Addr8 = [16]uint8{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 33, 22, 11, 1}

me := &metricsExporter{attrs: NewAttributesFilter(nil)}
me := &metricsExporter{attrs: NewAttributesFilter([]string{
"src.address", "dst.address", "src.name", "dst.name",
"k8s.src.name", "k8s.src.namespace", "k8s.dst.name", "k8s.dst.namespace",
})}
reportedAttributes := me.attributes(in)
for _, mustContain := range []attribute.KeyValue{
attribute.String("src.address", "12.34.56.78"),
attribute.String("dst.address", "33.22.11.1"),
attribute.String("src.name", "srcname"),
attribute.String("src.namespace", "srcnamespace"),
attribute.String("dst.name", "dstname"),
attribute.String("dst.namespace", "dstnamespace"),

attribute.String("k8s.src.name", "srcname"),
attribute.String("k8s.src.namespace", "srcnamespace"),
Expand All @@ -62,10 +61,8 @@ func TestMetricAttributes_Filter(t *testing.T) {
},
},
Attrs: ebpf.RecordAttrs{
SrcName: "srcname",
SrcNamespace: "srcnamespace",
DstName: "dstname",
DstNamespace: "dstnamespace",
SrcName: "srcname",
DstName: "dstname",
Metadata: map[string]string{
"k8s.src.name": "srcname",
"k8s.src.namespace": "srcnamespace",
Expand Down Expand Up @@ -97,9 +94,7 @@ func TestMetricAttributes_Filter(t *testing.T) {
for _, mustNotContain := range []string{
"dst.address",
"src.name",
"src.namespace",
"dst.name",
"dst.namespace",
"k8s.src.namespace",
"k8s.dst.namespace",
} {
Expand Down
4 changes: 0 additions & 4 deletions pkg/internal/netolly/export/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@ func printFlow(f *ebpf.Record) {
sb.WriteString(f.Id.DstIP().IP().String())
sb.WriteString(" src.name=")
sb.WriteString(f.Attrs.SrcName)
sb.WriteString(" src.namespace=")
sb.WriteString(f.Attrs.SrcNamespace)
sb.WriteString(" dst.name=")
sb.WriteString(f.Attrs.DstName)
sb.WriteString(" dst.namespace=")
sb.WriteString(f.Attrs.DstNamespace)

for k, v := range f.Attrs.Metadata {
sb.WriteString(" ")
Expand Down
8 changes: 1 addition & 7 deletions pkg/internal/netolly/transform/k8s/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,10 @@ func (n *decorator) decorate(flow *ebpf.Record, prefix, ip string) bool {
if flow.Attrs.DstName == "" {
flow.Attrs.DstName = kubeInfo.Name
}
if flow.Attrs.DstNamespace == "" {
flow.Attrs.DstNamespace = kubeInfo.Namespace
}
} else {
if flow.Attrs.SrcName == "" {
flow.Attrs.SrcName = kubeInfo.Name
}
if flow.Attrs.SrcNamespace == "" {
flow.Attrs.SrcNamespace = kubeInfo.Namespace
}
}
return true
}
Expand Down Expand Up @@ -218,7 +212,7 @@ func kubeClusterName(ctx context.Context, cfg *MetadataDecorator) string {
}
}
log.Warn("can't fetch Kubernetes Cluster Name." +
" Network metrics won't contain that field unless you explicitly set " +
" Network metrics won't contain k8s.cluster.name attribute unless you explicitly set " +
" the BEYLA_KUBE_CLUSTER_NAME environment variable")
return ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ data:
log_level: debug
otel_metrics_export:
endpoint: http://otelcol.default:4317
network:
allowed_attributes:
- src.name
- dst.name
- k8s.src.owner.name
- k8s.dst.owner.name
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
26 changes: 26 additions & 0 deletions test/integration/k8s/manifests/06-beyla-netolly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,32 @@ data:
- fd00:10:244::/56
- 10.96.0.0/16
- fd00:10:96::/112
allowed_attributes:
# assured cardinality explosion. Don't try in production!
- beyla.ip
- src.address
- dst.address
- src.name
- dst.name
- src.namespace
- dst.namespace
- src.cidr
- dst.cidr
- k8s.src.namespace
- k8s.dst.namespace
- k8s.src.name
- k8s.dst.name
- k8s.src.type
- k8s.dst.type
- k8s.src.owner.name
- k8s.dst.owner.name
- k8s.src.owner.type
- k8s.dst.owner.type
- k8s.src.node.ip
- k8s.dst.node.ip
- k8s.src.node.name
- k8s.dst.node.name
- k8s.cluster.name
---
apiVersion: apps/v1
kind: DaemonSet
Expand Down
Loading

0 comments on commit f205789

Please sign in to comment.