Skip to content

Commit 20627e7

Browse files
committed
package bes: refactor, closer to bb-portal's implementation
PublishBuildToolEventStream is copied verbatim from bb-portal, and BuildEventHandler and BuildEventChannel interfaces/structs are created to be compatible with that code. This also resolves a bug in the earlier WIP implementation where most event sequence IDs were not acknowledged, resulting in a warning shown on the Bazel side sending the events.
1 parent 16c6a17 commit 20627e7

File tree

4 files changed

+258
-54
lines changed

4 files changed

+258
-54
lines changed

internal/bes/bes.go

Lines changed: 109 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
// Package bes implements a Bazel Build Event Service gRPC listener:
2+
// https://bazel.build/remote/bep#build-event-service
3+
// It listens for TestResult events, and uploads their XML report to Test
4+
// Engine.
15
package bes
26

37
import (
48
"context"
59
"fmt"
610
"io"
7-
"log/slog"
811
"net"
12+
"sort"
913

10-
bb_bes "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
14+
slog "github.com/buildkite/test-engine-client/internal/bes/quietslog"
1115

1216
"google.golang.org/genproto/googleapis/devtools/build/v1"
1317
"google.golang.org/grpc"
@@ -21,6 +25,7 @@ var host = "127.0.0.1"
2125
var port = 60242 // 0 for OS-allocated
2226

2327
type BuildEventServer struct {
28+
handler *BuildEventHandler
2429
}
2530

2631
func Listen() error {
@@ -44,79 +49,129 @@ func newServer() build.PublishBuildEventServer {
4449
return BuildEventServer{}
4550
}
4651

52+
// PublishLifecycleEvent is copied verbatim from:
53+
// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go
54+
//
4755
// PublishLifecycleEvent handles life cycle events.
4856
func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) {
49-
slog.DebugContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent()))
57+
slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent()))
5058
return &emptypb.Empty{}, nil
5159
}
5260

61+
// PublishBuildToolEventStream is copied verbatim from:
62+
// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go
63+
// The BuildEventHandler and BuildEventChannel that it passes events to mimicks
64+
// the expected interfaces, but provide a bktec-specific implementation.
65+
//
5366
// PublishBuildToolEventStream handles a build tool event stream.
5467
// bktec thanks buildbarn/bb-portal for the basis of this :D
5568
func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error {
56-
ctx := stream.Context()
57-
58-
slog.InfoContext(ctx, "Stream started")
69+
slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context())
70+
71+
// List of SequenceIds we've received.
72+
// We'll want to ack these once all events are received, as we don't support resumption.
73+
seqNrs := make([]int64, 0)
74+
75+
ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) {
76+
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
77+
StreamId: streamID,
78+
SequenceNumber: sequenceNumber,
79+
}); err != nil {
80+
81+
// with the option --bes_upload_mode=fully_async or nowait_for_upload_complete
82+
// its not an error when the send fails. the bes gracefully terminated the close
83+
// i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s)
84+
// the context is processed in the background, so by the time we are acknowledging these
85+
// requests, the client connection may have already timed out and these errors can be
86+
// safely ignored
87+
grpcErr := status.Convert(err)
88+
if isClosing &&
89+
grpcErr.Code() == codes.Unavailable &&
90+
grpcErr.Message() == "transport is closing" {
91+
return
92+
}
5993

60-
for {
61-
req, err := stream.Recv()
62-
if err == io.EOF {
63-
slog.InfoContext(ctx, "Stream finished")
64-
return nil
65-
} else if err != nil {
66-
slog.ErrorContext(ctx, "Recv failed", "err", err)
67-
return err
94+
slog.ErrorContext(
95+
stream.Context(),
96+
"Send failed",
97+
"err",
98+
err,
99+
"streamid",
100+
streamID,
101+
"sequenceNumber",
102+
sequenceNumber,
103+
)
68104
}
105+
}
69106

70-
streamID := req.OrderedBuildEvent.GetStreamId()
71-
seq := req.OrderedBuildEvent.GetSequenceNumber()
107+
var streamID *build.StreamId
108+
reqCh := make(chan *build.PublishBuildToolEventStreamRequest)
109+
errCh := make(chan error)
110+
var eventCh BuildEventChannel
111+
112+
go func() {
113+
for {
114+
req, err := stream.Recv()
115+
if err != nil {
116+
errCh <- err
117+
return
118+
}
119+
reqCh <- req
120+
}
121+
}()
72122

73-
event := req.GetOrderedBuildEvent().GetEvent()
74-
slog.DebugContext(ctx, "stream event", "seq", seq, "event", event)
123+
for {
124+
select {
125+
case err := <-errCh:
126+
if err == io.EOF {
127+
slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context())
128+
129+
if eventCh == nil {
130+
slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context())
131+
return nil
132+
}
75133

76-
if event.GetBazelEvent() == nil {
77-
slog.DebugContext(ctx, "not a bazel event", seq, seq)
78-
continue
79-
}
134+
// Validate that all events were received
135+
sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] })
80136

81-
var bazelEvent bb_bes.BuildEvent
82-
if err = event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil {
83-
//return fmt.Errorf("unmarshaling bazel event: %w", err)
84-
slog.InfoContext(ctx, "error unmarshalling")
85-
}
137+
// TODO: Find out if initial sequence number can be != 1
138+
expected := int64(1)
139+
for _, seqNr := range seqNrs {
140+
if seqNr != expected {
141+
return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected))
142+
}
143+
expected++
144+
}
86145

87-
// slog.InfoContext(ctx, "unmarshalled bazel event", "event", &bazelEvent)
146+
err := eventCh.Finalize()
147+
if err != nil {
148+
return err
149+
}
88150

89-
payload := bazelEvent.GetPayload()
90-
if testResult, ok := payload.(*bb_bes.BuildEvent_TestResult); ok {
91-
r := testResult.TestResult
92-
files := []string{}
93-
for _, x := range r.GetTestActionOutput() {
94-
if x.GetName() == "test.xml" {
95-
files = append(files, x.GetUri())
151+
// Ack all events
152+
for _, seqNr := range seqNrs {
153+
ack(streamID, seqNr, true)
96154
}
97-
}
98-
slog.InfoContext(ctx, "TestResult",
99-
"status", r.GetStatus(),
100-
"cached", r.GetCachedLocally(),
101-
"dur", r.GetTestAttemptDuration().AsDuration().String(),
102-
"files", files,
103-
)
104-
}
105155

106-
// ack
107-
ack := &build.PublishBuildToolEventStreamResponse{StreamId: streamID, SequenceNumber: seq}
108-
if err := stream.Send(ack); err != nil {
109-
grpcErr := status.Convert(err)
110-
if grpcErr.Code() == codes.Unavailable &&
111-
grpcErr.Message() == "transport is closing" {
112156
return nil
113157
}
114158

115-
slog.ErrorContext(ctx, "ack failed",
116-
"err", err,
117-
"stream", streamID,
118-
"seq", seq,
119-
)
159+
slog.ErrorContext(stream.Context(), "Recv failed", "err", err)
160+
return err
161+
162+
case req := <-reqCh:
163+
// First event
164+
if streamID == nil {
165+
streamID = req.OrderedBuildEvent.GetStreamId()
166+
eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent)
167+
}
168+
169+
seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber())
170+
171+
if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil {
172+
slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err)
173+
return err
174+
}
120175
}
121176
}
122177
}

internal/bes/channel.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package bes
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
"time"
7+
8+
"github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
9+
"google.golang.org/genproto/googleapis/devtools/build/v1"
10+
)
11+
12+
// BuildEventChannel in bktec mimics the bb-portal interface so that the
13+
// BuildEventServer.PublishBuildEventServer code can be used verbatim.
14+
//
15+
// BuildEventChannel handles a single BuildEvent stream
16+
type BuildEventChannel interface {
17+
// HandleBuildEvent processes a single BuildEvent
18+
// This method should be called for each received event.
19+
HandleBuildEvent(event *build.BuildEvent) error
20+
21+
// Finalize does post-processing of a stream of BuildEvents.
22+
// This method should be called after receiving the EOF event.
23+
Finalize() error
24+
}
25+
26+
type buildEventChannel struct {
27+
ctx context.Context
28+
streamID *build.StreamId
29+
}
30+
31+
// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
32+
func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
33+
if event.GetBazelEvent() == nil {
34+
return nil
35+
}
36+
var bazelEvent bes.BuildEvent
37+
if err := event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil {
38+
slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err)
39+
return err
40+
}
41+
42+
payload := bazelEvent.GetPayload()
43+
if testResult, ok := payload.(*bes.BuildEvent_TestResult); ok {
44+
r := testResult.TestResult
45+
files := []string{}
46+
for _, x := range r.GetTestActionOutput() {
47+
if x.GetName() == "test.xml" {
48+
files = append(files, x.GetUri())
49+
}
50+
}
51+
slog.Info("TestResult",
52+
"status", r.GetStatus(),
53+
"cached", r.GetCachedLocally(),
54+
"dur", r.GetTestAttemptDuration().AsDuration().String(),
55+
"files", files,
56+
)
57+
}
58+
59+
return nil
60+
}
61+
62+
// Finalize implements BuildEventChannel.Finalize.
63+
func (c *buildEventChannel) Finalize() error {
64+
// defer the ctx so its not reaped when the client closes the connection
65+
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24)
66+
defer cancel()
67+
68+
slog.Debug("finalizing build event channel")
69+
_ = ctx
70+
// TODO: finalize anything that needs finalizing?
71+
72+
cancel()
73+
return nil
74+
}
75+
76+
// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events.
77+
// It is used when receiving a stream of events that we wish to ack without processing.
78+
type noOpBuildEventChannel struct{}
79+
80+
// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
81+
func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
82+
return nil
83+
}
84+
85+
// Finalize implements BuildEventChannel.Finalize.
86+
func (c *noOpBuildEventChannel) Finalize() error {
87+
return nil
88+
}

internal/bes/handler.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package bes
2+
3+
import (
4+
"context"
5+
6+
"google.golang.org/genproto/googleapis/devtools/build/v1"
7+
)
8+
9+
// BuildEventHandler in bktec mimics the bb-portal handler so that the
10+
// BuildEventServer.PublishBuildToolEventStream code can be used verbatim.
11+
//
12+
// BuildEventHandler orchestrates the handling of incoming Build Event streams.
13+
// For each incoming stream, and BuildEventChannel is created, which handles that stream.
14+
// BuildEventHandler is responsible for managing the things that are common to these event streams.
15+
type BuildEventHandler struct {
16+
}
17+
18+
// NewBuildEventHandler constructs a new BuildEventHandler
19+
func NewBuildEventHandler() *BuildEventHandler {
20+
return &BuildEventHandler{}
21+
}
22+
23+
// CreateEventChannel creates a new BuildEventChannel
24+
func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent *build.OrderedBuildEvent) BuildEventChannel {
25+
// If the first event does not have sequence number 1, we have processed this
26+
// invocation previously, and should skip all processing.
27+
if initialEvent.SequenceNumber != 1 {
28+
return &noOpBuildEventChannel{}
29+
}
30+
31+
return &buildEventChannel{
32+
ctx: ctx,
33+
streamID: initialEvent.StreamId,
34+
}
35+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
// Package quietslog provides a replacement for slog which downgrades Info
2+
// messages to Debug instead, so that the log output is quieter. This is done
3+
// specifically so that bes.BuildEventServer.PublishBuildToolEventStream()
4+
// source code can be kept unmodified from the bb-portal upstream it's copied
5+
// from.
6+
package quietslog
7+
8+
import (
9+
"context"
10+
"log/slog"
11+
)
12+
13+
// InfoContext delegates to DebugContext of the real logger, making this logger quiet.
14+
func InfoContext(ctx context.Context, msg string, args ...any) {
15+
slog.DebugContext(ctx, msg, args...)
16+
}
17+
18+
// WarnContext wraps the direct logger directly.
19+
func WarnContext(ctx context.Context, msg string, args ...any) {
20+
slog.WarnContext(ctx, msg, args...)
21+
}
22+
23+
// ErrorContext wraps the direct logger directly.
24+
func ErrorContext(ctx context.Context, msg string, args ...any) {
25+
slog.ErrorContext(ctx, msg, args...)
26+
}

0 commit comments

Comments
 (0)