Skip to content

Commit

Permalink
observe: warn on unknown field while JSON decoding
Browse files Browse the repository at this point in the history
Before this patch, when parsing JSON flows and encountering an unknown
field, the CLI would log a warning and skip the flow.

Unknown fields is not uncommon however, e.g. when using the Hubble CLI
against a more recent Hubble server version.

This patch

* emit the flow if successfully parsed while ignoring unknown fields,
* improve the warning message by hinting at upgrading the Hubble CLI,
* make it so the warning message is logged only once.

Signed-off-by: Alexandre Perrin <[email protected]>
  • Loading branch information
kaworu committed Apr 3, 2023
1 parent 4656704 commit c237c6c
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 5 deletions.
28 changes: 23 additions & 5 deletions cmd/observe/io_reader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (o *IOReaderObserver) ServerStatus(_ context.Context, _ *observerpb.ServerS
type ioReaderClient struct {
grpc.ClientStream

scanner *bufio.Scanner
request *observerpb.GetFlowsRequest
allow filters.FilterFuncs
deny filters.FilterFuncs
scanner *bufio.Scanner
discardUnknown bool
request *observerpb.GetFlowsRequest
allow filters.FilterFuncs
deny filters.FilterFuncs

// Used for --last
buffer *container.RingBuffer
Expand Down Expand Up @@ -176,7 +177,24 @@ func (c *ioReaderClient) popFromLastBuffer() *observerpb.GetFlowsResponse {

func (c *ioReaderClient) unmarshalNext() *observerpb.GetFlowsResponse {
var res observerpb.GetFlowsResponse
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
err := protojson.UnmarshalOptions{DiscardUnknown: c.discardUnknown}.Unmarshal(c.scanner.Bytes(), &res)
if err != nil && !c.discardUnknown {
prevErr := err
// the error might be that the JSON data contains an unknown field.
// This can happen we attempting to decode flows generated from a newer
// Hubble version than the CLI (having introduced a new field). Retry
// parsing discarding unknown fields and see whether the decoding is
// successful.
err = protojson.UnmarshalOptions{DiscardUnknown: true}.Unmarshal(c.scanner.Bytes(), &res)
if err == nil {
// The error was indeed about a unknown field since we were able to
// unmarshall without error when discarding unknown fields. Emit a
// warning message and continue processing discarding unknown
// fields to avoid logging more than once.
c.discardUnknown = true
logger.Logger.WithError(prevErr).Warn("unknown field detected, upgrade the Hubble CLI to get rid of this warning")
}
}
if err != nil {
line := c.scanner.Text()
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
Expand Down
50 changes: 50 additions & 0 deletions cmd/observe/io_reader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (

flowpb "github.com/cilium/cilium/api/v1/flow"
observerpb "github.com/cilium/cilium/api/v1/observer"
"github.com/cilium/hubble/pkg/logger"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -186,3 +189,50 @@ func Test_getFlowsFilter(t *testing.T) {
_, err = client.Recv()
assert.Equal(t, io.EOF, err)
}

func Test_UnknownField(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}},
Time: &timestamppb.Timestamp{Seconds: 0},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}},
Time: &timestamppb.Timestamp{Seconds: 100},
},
}

var sb strings.Builder
for _, f := range flows {
b, err := f.MarshalJSON()
require.NoError(t, err)
s := strings.Replace(string(b), `"flow":{`, `"flow":{"foo":42,`, 1)
sb.WriteString(s + "\n")
}
// server and client setup.
server := NewIOReaderObserver(strings.NewReader(sb.String()))
client, err := server.GetFlows(context.Background(), &observerpb.GetFlowsRequest{})
require.NoError(t, err)
// logger setup.
logger.Initialize(viper.New())
sb.Reset()
logger.Logger.SetOutput(&sb)

// ensure that we see the first flow.
res, err := client.Recv()
require.NoError(t, err)
require.Equal(t, flows[0], res)
// check that we logged something the first time we've seen an unknown
// field.
require.Contains(t, sb.String(), "unknown field detected")
sb.Reset()
// ensure that we see the second flow.
res, err = client.Recv()
require.NoError(t, err)
require.Equal(t, flows[1], res)
// check that we didn't log the second time we've seen an unknown field.
require.Empty(t, sb.String())
// ensure we're at the end of the stream.
_, err = client.Recv()
require.Equal(t, io.EOF, err)
}

0 comments on commit c237c6c

Please sign in to comment.