Skip to content

Commit

Permalink
Fix network flows flaky test (#1282)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariomac authored Oct 29, 2024
1 parent 4f7e880 commit 71a0767
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"testing"

"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
Expand All @@ -23,7 +21,6 @@ func TestMain(m *testing.M) {
docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla},
docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer},
docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger},
docker.ImageBuild{Tag: "grpcpinger:dev", Dockerfile: k8s.DockerfilePinger},
docker.ImageBuild{Tag: "beyla-k8s-cache:dev", Dockerfile: k8s.DockerfileBeylaK8sCache},
docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"},
); err != nil {
Expand All @@ -36,7 +33,6 @@ func TestMain(m *testing.M) {
kube.KindConfig(k8s.PathManifests+"/00-kind.yml"),
kube.LocalImage("beyla:dev"),
kube.LocalImage("testserver:dev"),
kube.LocalImage("grpcpinger:dev"),
kube.LocalImage("httppinger:dev"),
kube.LocalImage("beyla-k8s-cache:dev"),
kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"),
Expand Down Expand Up @@ -72,10 +68,5 @@ func TestInformersCache_ProcessMetrics(t *testing.T) {
}

func TestInformersCache_NetworkMetrics(t *testing.T) {
// we don't deploy any pinger
// it will reuse internal-pinger from MetricsDecoration_HTTP test
cluster.TestEnv().Test(t, features.New("network flow bytes").
Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections).
Feature(),
)
cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes())
}
17 changes: 1 addition & 16 deletions test/integration/k8s/netolly/k8s_netolly_main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"testing"

"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
Expand Down Expand Up @@ -49,18 +47,5 @@ func TestMain(m *testing.M) {
}

func TestNetworkFlowBytes(t *testing.T) {
pinger := kube.Template[k8s.Pinger]{
TemplateFile: k8s.UninstrumentedPingerManifest,
Data: k8s.Pinger{
PodName: "internal-pinger",
TargetURL: "http://testserver:8080/iping",
},
}
cluster.TestEnv().Test(t, features.New("network flow bytes").
Setup(pinger.Deploy()).
Teardown(pinger.Delete()).
Assess("catches network metrics between connected pods", DoTestNetFlowBytesForExistingConnections).
Assess("catches external traffic", testNetFlowBytesForExternalTraffic).
Feature(),
)
cluster.TestEnv().Test(t, FeatureNetworkFlowBytes())
}
40 changes: 29 additions & 11 deletions test/integration/k8s/netolly/k8s_netolly_network_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"sigs.k8s.io/e2e-framework/pkg/envconf"
"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/kube"
"github.com/grafana/beyla/test/integration/components/prom"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
)

const (
Expand All @@ -26,12 +29,27 @@ const (
var podSubnets = []string{"10.244.0.0/16", "fd00:10:244::/56"}
var svcSubnets = []string{"10.96.0.0/16", "fd00:10:96::/112"}

func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context {
pq := prom.Client{HostPort: prometheusHostPort}
func FeatureNetworkFlowBytes() features.Feature {
pinger := kube.Template[k8s.Pinger]{
TemplateFile: k8s.UninstrumentedPingerManifest,
Data: k8s.Pinger{
PodName: "internal-pinger-net",
TargetURL: "http://testserver:8080/iping",
},
}
return features.New("network flow bytes").
Setup(pinger.Deploy()).
Teardown(pinger.Delete()).
Assess("catches network metrics between connected pods", testNetFlowBytesForExistingConnections).
Assess("catches external traffic", testNetFlowBytesForExternalTraffic).
Feature()
}

func testNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context {
pq := prom.Client{HostPort: prometheusHostPort}
// testing request flows (to testserver as Service)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name="testserver"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name="testserver"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -43,7 +61,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
assert.Equal(t, "beyla-network-flows", metric["job"])
assert.Equal(t, "my-kube", metric["k8s_cluster_name"])
assert.Equal(t, "default", metric["k8s_src_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_src_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_src_name"])
assert.Equal(t, "Pod", metric["k8s_src_owner_type"])
assert.Equal(t, "Pod", metric["k8s_src_type"])
assert.Regexp(t,
Expand All @@ -62,7 +80,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
})
// testing request flows (to testserver as Pod)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name=~"testserver-.*"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name=~"testserver-.*"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -73,7 +91,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
assertIsIP(t, metric["dst_address"])
assert.Equal(t, "beyla-network-flows", metric["job"])
assert.Equal(t, "default", metric["k8s_src_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_src_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_src_name"])
assert.Equal(t, "Pod", metric["k8s_src_owner_type"])
assert.Equal(t, "Pod", metric["k8s_src_type"])
assert.Regexp(t,
Expand All @@ -97,7 +115,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,

// testing response flows (from testserver Pod)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name=~"testserver-.*",dst_name="internal-pinger"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name=~"testserver-.*",dst_name="internal-pinger-net"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -116,7 +134,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
metric["k8s_src_node_name"])
assertIsIP(t, metric["k8s_src_node_ip"])
assert.Equal(t, "default", metric["k8s_dst_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_dst_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_dst_name"])
assert.Equal(t, "Pod", metric["k8s_dst_owner_type"])
assert.Equal(t, "Pod", metric["k8s_dst_type"])
assert.Regexp(t,
Expand All @@ -132,7 +150,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,

// testing response flows (from testserver Service)
test.Eventually(t, testTimeout, func(t require.TestingT) {
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="testserver",dst_name="internal-pinger"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="testserver",dst_name="internal-pinger-net"}`)
require.NoError(t, err)
require.NotEmpty(t, results)

Expand All @@ -148,7 +166,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
assert.Equal(t, "Service", metric["k8s_src_type"])
// services don't have host IP or name
assert.Equal(t, "default", metric["k8s_dst_namespace"])
assert.Equal(t, "internal-pinger", metric["k8s_dst_name"])
assert.Equal(t, "internal-pinger-net", metric["k8s_dst_name"])
assert.Equal(t, "Pod", metric["k8s_dst_owner_type"])
assert.Equal(t, "Pod", metric["k8s_dst_type"])
assert.Regexp(t,
Expand All @@ -162,7 +180,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T,
})

// check that there aren't captured flows if there is no communication
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name="otherinstance"}`)
results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name="otherinstance"}`)
require.NoError(t, err)
require.Empty(t, results)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"testing"

"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
Expand Down Expand Up @@ -49,17 +47,5 @@ func TestMain(m *testing.M) {
}

func TestNetworkFlowBytes_Prom(t *testing.T) {
pinger := kube.Template[k8s.Pinger]{
TemplateFile: k8s.UninstrumentedPingerManifest,
Data: k8s.Pinger{
PodName: "internal-pinger",
TargetURL: "http://testserver:8080/iping",
},
}
cluster.TestEnv().Test(t, features.New("network flow bytes").
Setup(pinger.Deploy()).
Teardown(pinger.Delete()).
Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections).
Feature(),
)
cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes())
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os"
"testing"

"sigs.k8s.io/e2e-framework/pkg/features"

"github.com/grafana/beyla/test/integration/components/docker"
"github.com/grafana/beyla/test/integration/components/kube"
k8s "github.com/grafana/beyla/test/integration/k8s/common"
Expand Down Expand Up @@ -49,17 +47,5 @@ func TestMain(m *testing.M) {
}

func TestNetworkSKFlowBytes_Prom(t *testing.T) {
pinger := kube.Template[k8s.Pinger]{
TemplateFile: k8s.UninstrumentedPingerManifest,
Data: k8s.Pinger{
PodName: "internal-pinger",
TargetURL: "http://testserver:8080/iping",
},
}
cluster.TestEnv().Test(t, features.New("network flow bytes").
Setup(pinger.Deploy()).
Teardown(pinger.Delete()).
Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections).
Feature(),
)
cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes())
}

0 comments on commit 71a0767

Please sign in to comment.