Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observe: warn on unknown field while JSON decoding #962

Merged
merged 1 commit into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions cmd/observe/io_reader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (o *IOReaderObserver) ServerStatus(_ context.Context, _ *observerpb.ServerS
type ioReaderClient struct {
grpc.ClientStream

scanner *bufio.Scanner
request *observerpb.GetFlowsRequest
allow filters.FilterFuncs
deny filters.FilterFuncs
scanner *bufio.Scanner
discardUnknown bool
request *observerpb.GetFlowsRequest
allow filters.FilterFuncs
deny filters.FilterFuncs

// Used for --last
buffer *container.RingBuffer
Expand Down Expand Up @@ -176,7 +177,24 @@ func (c *ioReaderClient) popFromLastBuffer() *observerpb.GetFlowsResponse {

func (c *ioReaderClient) unmarshalNext() *observerpb.GetFlowsResponse {
var res observerpb.GetFlowsResponse
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
err := protojson.UnmarshalOptions{DiscardUnknown: c.discardUnknown}.Unmarshal(c.scanner.Bytes(), &res)
if err != nil && !c.discardUnknown {
prevErr := err
// the error might be that the JSON data contains an unknown field.
// This can happen we attempting to decode flows generated from a newer
// Hubble version than the CLI (having introduced a new field). Retry
// parsing discarding unknown fields and see whether the decoding is
// successful.
err = protojson.UnmarshalOptions{DiscardUnknown: true}.Unmarshal(c.scanner.Bytes(), &res)
if err == nil {
// The error was indeed about a unknown field since we were able to
// unmarshall without error when discarding unknown fields. Emit a
// warning message and continue processing discarding unknown
// fields to avoid logging more than once.
c.discardUnknown = true
logger.Logger.WithError(prevErr).Warn("unknown field detected, upgrade the Hubble CLI to get rid of this warning")
}
}
if err != nil {
line := c.scanner.Text()
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
Expand Down
50 changes: 50 additions & 0 deletions cmd/observe/io_reader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (

flowpb "github.com/cilium/cilium/api/v1/flow"
observerpb "github.com/cilium/cilium/api/v1/observer"
"github.com/cilium/hubble/pkg/logger"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -186,3 +189,50 @@ func Test_getFlowsFilter(t *testing.T) {
_, err = client.Recv()
assert.Equal(t, io.EOF, err)
}

func Test_UnknownField(t *testing.T) {
flows := []*observerpb.GetFlowsResponse{
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_FORWARDED}},
Time: &timestamppb.Timestamp{Seconds: 0},
},
{
ResponseTypes: &observerpb.GetFlowsResponse_Flow{Flow: &flowpb.Flow{Verdict: flowpb.Verdict_DROPPED}},
Time: &timestamppb.Timestamp{Seconds: 100},
},
}

var sb strings.Builder
for _, f := range flows {
b, err := f.MarshalJSON()
require.NoError(t, err)
s := strings.Replace(string(b), `"flow":{`, `"flow":{"foo":42,`, 1)
sb.WriteString(s + "\n")
}
// server and client setup.
server := NewIOReaderObserver(strings.NewReader(sb.String()))
client, err := server.GetFlows(context.Background(), &observerpb.GetFlowsRequest{})
require.NoError(t, err)
// logger setup.
logger.Initialize(viper.New())
sb.Reset()
logger.Logger.SetOutput(&sb)

// ensure that we see the first flow.
res, err := client.Recv()
require.NoError(t, err)
require.Equal(t, flows[0], res)
// check that we logged something the first time we've seen an unknown
// field.
require.Contains(t, sb.String(), "unknown field detected")
sb.Reset()
// ensure that we see the second flow.
res, err = client.Recv()
require.NoError(t, err)
require.Equal(t, flows[1], res)
// check that we didn't log the second time we've seen an unknown field.
require.Empty(t, sb.String())
// ensure we're at the end of the stream.
_, err = client.Recv()
require.Equal(t, io.EOF, err)
}