Skip to content

Commit a67172e

Browse files
committed
Refactor ioReaderClient separate logic for --first/--last/--all
Signed-off-by: Chance Zibolski <[email protected]>
1 parent ade29f2 commit a67172e

File tree

1 file changed

+55
-24
lines changed

1 file changed

+55
-24
lines changed

cmd/observe/io_reader_observer.go

Lines changed: 55 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -96,46 +96,57 @@ func newIOReaderClient(ctx context.Context, scanner *bufio.Scanner, request *obs
9696
}
9797

9898
func (c *ioReaderClient) Recv() (*observer.GetFlowsResponse, error) {
99-
if c.request.GetNumber() > 0 && c.flowsReturned >= c.request.GetNumber() {
99+
if c.returnedEnoughFlows() {
100100
return nil, io.EOF
101101
}
102+
102103
for c.scanner.Scan() {
103-
line := c.scanner.Text()
104-
var res observer.GetFlowsResponse
105-
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
106-
if err != nil {
107-
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
108-
continue
109-
}
110-
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
111-
continue
112-
}
113-
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
114-
continue
115-
}
116-
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
104+
res, ok := c.unmarshalNext()
105+
if !ok {
117106
continue
118107
}
108+
119109
switch {
120-
case c.buffer != nil && c.request.GetNumber() != math.MaxUint64: // --last
110+
case c.isLast():
121111
// store flows in a FIFO buffer, effectively keeping the last N flows
122112
// until we finish reading from the stream
123-
c.buffer.Add(&res)
113+
c.buffer.Add(res)
124114
continue
125-
case c.request.GetFirst() && c.request.GetNumber() != 0 && c.request.GetNumber() != math.MaxUint64: // --first
115+
case c.isFirst():
126116
// track number of flows returned, so we can exit once we've given back N flows
127117
c.flowsReturned++
128-
return &res, nil
129-
default:
130-
return &res, nil
118+
return res, nil
119+
default: // --all
120+
return res, nil
131121
}
132122
}
123+
133124
if err := c.scanner.Err(); err != nil {
134125
return nil, err
135126
}
136127

128+
if res := c.popFromLastBuffer(); res != nil {
129+
return res, nil
130+
}
131+
132+
return nil, io.EOF
133+
}
134+
135+
func (c *ioReaderClient) isFirst() bool {
136+
return c.request.GetFirst() && c.request.GetNumber() != 0 && c.request.GetNumber() != math.MaxUint64
137+
}
138+
139+
func (c *ioReaderClient) isLast() bool {
140+
return c.buffer != nil && c.request.GetNumber() != math.MaxUint64
141+
}
142+
143+
func (c *ioReaderClient) returnedEnoughFlows() bool {
144+
return c.request.GetNumber() > 0 && c.flowsReturned >= c.request.GetNumber()
145+
}
146+
147+
func (c *ioReaderClient) popFromLastBuffer() *observer.GetFlowsResponse {
137148
// Handle --last by iterating over our FIFO and returning one item each time.
138-
if c.buffer != nil && c.request.GetNumber() != math.MaxUint64 {
149+
if c.isLast() {
139150
if len(c.resps) == 0 {
140151
// Iterate over the buffer and store them in a slice, because we cannot
141152
// index into the ring buffer itself
@@ -150,8 +161,28 @@ func (c *ioReaderClient) Recv() (*observer.GetFlowsResponse, error) {
150161
if len(c.resps) > int(c.flowsReturned) {
151162
resp := c.resps[c.flowsReturned]
152163
c.flowsReturned++
153-
return resp, nil
164+
return resp
154165
}
155166
}
156-
return nil, io.EOF
167+
return nil
168+
}
169+
170+
func (c *ioReaderClient) unmarshalNext() (*observer.GetFlowsResponse, bool) {
171+
line := c.scanner.Text()
172+
var res observer.GetFlowsResponse
173+
err := protojson.Unmarshal(c.scanner.Bytes(), &res)
174+
if err != nil {
175+
logger.Logger.WithError(err).WithField("line", line).Warn("Failed to unmarshal json to flow")
176+
return nil, false
177+
}
178+
if c.request.Since != nil && c.request.Since.AsTime().After(res.Time.AsTime()) {
179+
return nil, false
180+
}
181+
if c.request.Until != nil && c.request.Until.AsTime().Before(res.Time.AsTime()) {
182+
return nil, false
183+
}
184+
if !filters.Apply(c.allow, c.deny, &v1.Event{Timestamp: res.Time, Event: res.GetFlow()}) {
185+
return nil, false
186+
}
187+
return &res, true
157188
}

0 commit comments

Comments
 (0)