From 7d8ffb4c1524ab49750e358170c3f9dc82088e13 Mon Sep 17 00:00:00 2001 From: Mario Macias Date: Tue, 29 Oct 2024 15:01:23 +0100 Subject: [PATCH] Fix network flows flaky test (#1282) --- .../k8s_informer_cache_main_test.go | 11 +---- .../k8s/netolly/k8s_netolly_main_test.go | 17 +------- .../netolly/k8s_netolly_network_metrics.go | 40 ++++++++++++++----- .../k8s_netolly_prom_main_test.go | 16 +------- .../k8s_netolly_prom_main_test.go | 16 +------- 5 files changed, 33 insertions(+), 67 deletions(-) diff --git a/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go b/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go index 1dd91a3c2..5b4840c05 100644 --- a/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go +++ b/test/integration/k8s/informer_cache/k8s_informer_cache_main_test.go @@ -7,8 +7,6 @@ import ( "os" "testing" - "sigs.k8s.io/e2e-framework/pkg/features" - "github.com/grafana/beyla/test/integration/components/docker" "github.com/grafana/beyla/test/integration/components/kube" k8s "github.com/grafana/beyla/test/integration/k8s/common" @@ -23,7 +21,6 @@ func TestMain(m *testing.M) { docker.ImageBuild{Tag: "beyla:dev", Dockerfile: k8s.DockerfileBeyla}, docker.ImageBuild{Tag: "testserver:dev", Dockerfile: k8s.DockerfileTestServer}, docker.ImageBuild{Tag: "httppinger:dev", Dockerfile: k8s.DockerfileHTTPPinger}, - docker.ImageBuild{Tag: "grpcpinger:dev", Dockerfile: k8s.DockerfilePinger}, docker.ImageBuild{Tag: "beyla-k8s-cache:dev", Dockerfile: k8s.DockerfileBeylaK8sCache}, docker.ImageBuild{Tag: "quay.io/prometheus/prometheus:v2.53.0"}, ); err != nil { @@ -36,7 +33,6 @@ func TestMain(m *testing.M) { kube.KindConfig(k8s.PathManifests+"/00-kind.yml"), kube.LocalImage("beyla:dev"), kube.LocalImage("testserver:dev"), - kube.LocalImage("grpcpinger:dev"), kube.LocalImage("httppinger:dev"), kube.LocalImage("beyla-k8s-cache:dev"), kube.LocalImage("quay.io/prometheus/prometheus:v2.53.0"), @@ -72,10 +68,5 @@ func TestInformersCache_ProcessMetrics(t *testing.T) { } func TestInformersCache_NetworkMetrics(t *testing.T) { - // we don't deploy any pinger - // it will reuse internal-pinger from MetricsDecoration_HTTP test - cluster.TestEnv().Test(t, features.New("network flow bytes"). - Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections). - Feature(), - ) + cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes()) } diff --git a/test/integration/k8s/netolly/k8s_netolly_main_test.go b/test/integration/k8s/netolly/k8s_netolly_main_test.go index f56861aba..b5be8dfd8 100644 --- a/test/integration/k8s/netolly/k8s_netolly_main_test.go +++ b/test/integration/k8s/netolly/k8s_netolly_main_test.go @@ -7,8 +7,6 @@ import ( "os" "testing" - "sigs.k8s.io/e2e-framework/pkg/features" - "github.com/grafana/beyla/test/integration/components/docker" "github.com/grafana/beyla/test/integration/components/kube" k8s "github.com/grafana/beyla/test/integration/k8s/common" @@ -49,18 +47,5 @@ func TestMain(m *testing.M) { } func TestNetworkFlowBytes(t *testing.T) { - pinger := kube.Template[k8s.Pinger]{ - TemplateFile: k8s.UninstrumentedPingerManifest, - Data: k8s.Pinger{ - PodName: "internal-pinger", - TargetURL: "http://testserver:8080/iping", - }, - } - cluster.TestEnv().Test(t, features.New("network flow bytes"). - Setup(pinger.Deploy()). - Teardown(pinger.Delete()). - Assess("catches network metrics between connected pods", DoTestNetFlowBytesForExistingConnections). - Assess("catches external traffic", testNetFlowBytesForExternalTraffic). - Feature(), - ) + cluster.TestEnv().Test(t, FeatureNetworkFlowBytes()) } diff --git a/test/integration/k8s/netolly/k8s_netolly_network_metrics.go b/test/integration/k8s/netolly/k8s_netolly_network_metrics.go index 82ab51c53..f7b5269fc 100644 --- a/test/integration/k8s/netolly/k8s_netolly_network_metrics.go +++ b/test/integration/k8s/netolly/k8s_netolly_network_metrics.go @@ -13,8 +13,11 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "sigs.k8s.io/e2e-framework/pkg/envconf" + "sigs.k8s.io/e2e-framework/pkg/features" + "github.com/grafana/beyla/test/integration/components/kube" "github.com/grafana/beyla/test/integration/components/prom" + k8s "github.com/grafana/beyla/test/integration/k8s/common" ) const ( @@ -26,12 +29,27 @@ const ( var podSubnets = []string{"10.244.0.0/16", "fd00:10:244::/56"} var svcSubnets = []string{"10.96.0.0/16", "fd00:10:96::/112"} -func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { - pq := prom.Client{HostPort: prometheusHostPort} +func FeatureNetworkFlowBytes() features.Feature { + pinger := kube.Template[k8s.Pinger]{ + TemplateFile: k8s.UninstrumentedPingerManifest, + Data: k8s.Pinger{ + PodName: "internal-pinger-net", + TargetURL: "http://testserver:8080/iping", + }, + } + return features.New("network flow bytes"). + Setup(pinger.Deploy()). + Teardown(pinger.Delete()). + Assess("catches network metrics between connected pods", testNetFlowBytesForExistingConnections). + Assess("catches external traffic", testNetFlowBytesForExternalTraffic). + Feature() +} +func testNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, _ *envconf.Config) context.Context { + pq := prom.Client{HostPort: prometheusHostPort} // testing request flows (to testserver as Service) test.Eventually(t, testTimeout, func(t require.TestingT) { - results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name="testserver"}`) + results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name="testserver"}`) require.NoError(t, err) require.NotEmpty(t, results) @@ -43,7 +61,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assert.Equal(t, "beyla-network-flows", metric["job"]) assert.Equal(t, "my-kube", metric["k8s_cluster_name"]) assert.Equal(t, "default", metric["k8s_src_namespace"]) - assert.Equal(t, "internal-pinger", metric["k8s_src_name"]) + assert.Equal(t, "internal-pinger-net", metric["k8s_src_name"]) assert.Equal(t, "Pod", metric["k8s_src_owner_type"]) assert.Equal(t, "Pod", metric["k8s_src_type"]) assert.Regexp(t, @@ -62,7 +80,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, }) // testing request flows (to testserver as Pod) test.Eventually(t, testTimeout, func(t require.TestingT) { - results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name=~"testserver-.*"}`) + results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name=~"testserver-.*"}`) require.NoError(t, err) require.NotEmpty(t, results) @@ -73,7 +91,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assertIsIP(t, metric["dst_address"]) assert.Equal(t, "beyla-network-flows", metric["job"]) assert.Equal(t, "default", metric["k8s_src_namespace"]) - assert.Equal(t, "internal-pinger", metric["k8s_src_name"]) + assert.Equal(t, "internal-pinger-net", metric["k8s_src_name"]) assert.Equal(t, "Pod", metric["k8s_src_owner_type"]) assert.Equal(t, "Pod", metric["k8s_src_type"]) assert.Regexp(t, @@ -97,7 +115,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, // testing response flows (from testserver Pod) test.Eventually(t, testTimeout, func(t require.TestingT) { - results, err := pq.Query(`beyla_network_flow_bytes_total{src_name=~"testserver-.*",dst_name="internal-pinger"}`) + results, err := pq.Query(`beyla_network_flow_bytes_total{src_name=~"testserver-.*",dst_name="internal-pinger-net"}`) require.NoError(t, err) require.NotEmpty(t, results) @@ -116,7 +134,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, metric["k8s_src_node_name"]) assertIsIP(t, metric["k8s_src_node_ip"]) assert.Equal(t, "default", metric["k8s_dst_namespace"]) - assert.Equal(t, "internal-pinger", metric["k8s_dst_name"]) + assert.Equal(t, "internal-pinger-net", metric["k8s_dst_name"]) assert.Equal(t, "Pod", metric["k8s_dst_owner_type"]) assert.Equal(t, "Pod", metric["k8s_dst_type"]) assert.Regexp(t, @@ -132,7 +150,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, // testing response flows (from testserver Service) test.Eventually(t, testTimeout, func(t require.TestingT) { - results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="testserver",dst_name="internal-pinger"}`) + results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="testserver",dst_name="internal-pinger-net"}`) require.NoError(t, err) require.NotEmpty(t, results) @@ -148,7 +166,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, assert.Equal(t, "Service", metric["k8s_src_type"]) // services don't have host IP or name assert.Equal(t, "default", metric["k8s_dst_namespace"]) - assert.Equal(t, "internal-pinger", metric["k8s_dst_name"]) + assert.Equal(t, "internal-pinger-net", metric["k8s_dst_name"]) assert.Equal(t, "Pod", metric["k8s_dst_owner_type"]) assert.Equal(t, "Pod", metric["k8s_dst_type"]) assert.Regexp(t, @@ -162,7 +180,7 @@ func DoTestNetFlowBytesForExistingConnections(ctx context.Context, t *testing.T, }) // check that there aren't captured flows if there is no communication - results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger",dst_name="otherinstance"}`) + results, err := pq.Query(`beyla_network_flow_bytes_total{src_name="internal-pinger-net",dst_name="otherinstance"}`) require.NoError(t, err) require.Empty(t, results) diff --git a/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go b/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go index 8fef9b6a6..5f62caadb 100644 --- a/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go +++ b/test/integration/k8s/netolly_promexport/k8s_netolly_prom_main_test.go @@ -7,8 +7,6 @@ import ( "os" "testing" - "sigs.k8s.io/e2e-framework/pkg/features" - "github.com/grafana/beyla/test/integration/components/docker" "github.com/grafana/beyla/test/integration/components/kube" k8s "github.com/grafana/beyla/test/integration/k8s/common" @@ -49,17 +47,5 @@ func TestMain(m *testing.M) { } func TestNetworkFlowBytes_Prom(t *testing.T) { - pinger := kube.Template[k8s.Pinger]{ - TemplateFile: k8s.UninstrumentedPingerManifest, - Data: k8s.Pinger{ - PodName: "internal-pinger", - TargetURL: "http://testserver:8080/iping", - }, - } - cluster.TestEnv().Test(t, features.New("network flow bytes"). - Setup(pinger.Deploy()). - Teardown(pinger.Delete()). - Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections). - Feature(), - ) + cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes()) } diff --git a/test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go b/test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go index a731e0248..b6d2edc0e 100644 --- a/test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go +++ b/test/integration/k8s/netolly_tc_promexport/k8s_netolly_prom_main_test.go @@ -7,8 +7,6 @@ import ( "os" "testing" - "sigs.k8s.io/e2e-framework/pkg/features" - "github.com/grafana/beyla/test/integration/components/docker" "github.com/grafana/beyla/test/integration/components/kube" k8s "github.com/grafana/beyla/test/integration/k8s/common" @@ -49,17 +47,5 @@ func TestMain(m *testing.M) { } func TestNetworkSKFlowBytes_Prom(t *testing.T) { - pinger := kube.Template[k8s.Pinger]{ - TemplateFile: k8s.UninstrumentedPingerManifest, - Data: k8s.Pinger{ - PodName: "internal-pinger", - TargetURL: "http://testserver:8080/iping", - }, - } - cluster.TestEnv().Test(t, features.New("network flow bytes"). - Setup(pinger.Deploy()). - Teardown(pinger.Delete()). - Assess("catches network metrics between connected pods", otel.DoTestNetFlowBytesForExistingConnections). - Feature(), - ) + cluster.TestEnv().Test(t, otel.FeatureNetworkFlowBytes()) }