From c237c6c4e548a355dc5229f7524cc7fa7d04f2e8 Mon Sep 17 00:00:00 2001 From: Alexandre Perrin Date: Mon, 27 Mar 2023 11:03:13 +0200 Subject: [PATCH] observe: warn on unknown field while JSON decoding Before this patch, when parsing JSON flows and encountering an unknown field, the CLI would log a warning and skip the flow. Unknown fields is not uncommon however, e.g. when using the Hubble CLI against a more recent Hubble server version. This patch * emit the flow if successfully parsed while ignoring unknown fields, * improve the warning message by hinting at upgrading the Hubble CLI, * make it so the warning message is logged only once. Signed-off-by: Alexandre Perrin --- cmd/observe/io_reader_observer.go | 28 ++++++++++++--- cmd/observe/io_reader_observer_test.go | 50 ++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 5 deletions(-) diff --git a/cmd/observe/io_reader_observer.go b/cmd/observe/io_reader_observer.go index 4a171b14e..535cc370a 100644 --- a/cmd/observe/io_reader_observer.go +++ b/cmd/observe/io_reader_observer.go @@ -64,10 +64,11 @@ func (o *IOReaderObserver) ServerStatus(_ context.Context, _ *observerpb.ServerS type ioReaderClient struct { grpc.ClientStream - scanner *bufio.Scanner - request *observerpb.GetFlowsRequest - allow filters.FilterFuncs - deny filters.FilterFuncs + scanner *bufio.Scanner + discardUnknown bool + request *observerpb.GetFlowsRequest + allow filters.FilterFuncs + deny filters.FilterFuncs // Used for --last buffer *container.RingBuffer @@ -176,7 +177,24 @@ func (c *ioReaderClient) popFromLastBuffer() *observerpb.GetFlowsResponse { func (c *ioReaderClient) unmarshalNext() *observerpb.GetFlowsResponse { var res observerpb.GetFlowsResponse - err := protojson.Unmarshal(c.scanner.Bytes(), &res) + err := protojson.UnmarshalOptions{DiscardUnknown: c.discardUnknown}.Unmarshal(c.scanner.Bytes(), &res) + if err != nil && !c.discardUnknown { + prevErr := err + // the error might be that the JSON data contains an unknown field. + // This can happen we attempting to decode flows generated from a newer + // Hubble version than the CLI (having introduced a new field). Retry + // parsing discarding unknown fields and see whether the decoding is + // successful. + err = protojson.UnmarshalOptions{DiscardUnknown: true}.Unmarshal(c.scanner.Bytes(), &res) + if err == nil { + // The error was indeed about a unknown field since we were able to + // unmarshall without error when discarding unknown fields. Emit a + // warning message and continue processing discarding unknown + // fields to avoid logging more than once. + c.discardUnknown = true + logger.Logger.WithError(prevErr).Warn("unknown field detected, upgrade the Hubble CLI to get rid of this warning") + } + } if err != nil { line := c.scanner.Text() logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow") diff --git a/cmd/observe/io_reader_observer_test.go b/cmd/observe/io_reader_observer_test.go index ed0199b47..98fb1ec33 100644 --- a/cmd/observe/io_reader_observer_test.go +++ b/cmd/observe/io_reader_observer_test.go @@ -11,7 +11,10 @@ import ( flowpb "github.com/cilium/cilium/api/v1/flow" observerpb "github.com/cilium/cilium/api/v1/observer" + "github.com/cilium/hubble/pkg/logger" + "github.com/spf13/viper" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -186,3 +189,50 @@ func Test_getFlowsFilter(t *testing.T) { _, err = client.Recv() assert.Equal(t, io.EOF, err) } + +func Test_UnknownField(t *testing.T) { + flows := []*observerpb.GetFlowsResponse{ + { + ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}}, + Time: ×tamppb.Timestamp{Seconds: 0}, + }, + { + ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}}, + Time: ×tamppb.Timestamp{Seconds: 100}, + }, + } + + var sb strings.Builder + for _, f := range flows { + b, err := f.MarshalJSON() + require.NoError(t, err) + s := strings.Replace(string(b), `"flow":{`, `"flow":{"foo":42,`, 1) + sb.WriteString(s + "\n") + } + // server and client setup. + server := NewIOReaderObserver(strings.NewReader(sb.String())) + client, err := server.GetFlows(context.Background(), &observerpb.GetFlowsRequest{}) + require.NoError(t, err) + // logger setup. + logger.Initialize(viper.New()) + sb.Reset() + logger.Logger.SetOutput(&sb) + + // ensure that we see the first flow. + res, err := client.Recv() + require.NoError(t, err) + require.Equal(t, flows[0], res) + // check that we logged something the first time we've seen an unknown + // field. + require.Contains(t, sb.String(), "unknown field detected") + sb.Reset() + // ensure that we see the second flow. + res, err = client.Recv() + require.NoError(t, err) + require.Equal(t, flows[1], res) + // check that we didn't log the second time we've seen an unknown field. + require.Empty(t, sb.String()) + // ensure we're at the end of the stream. + _, err = client.Recv() + require.Equal(t, io.EOF, err) +}