-
Notifications
You must be signed in to change notification settings - Fork 6
Bazel Build Event Protocol (BEP) → bktec upload
#282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
pda
wants to merge
4
commits into
bktec-upload
Choose a base branch
from
bazel-bes
base: bktec-upload
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
ceda219
WIP: `bktec bazel listen` stub subcommand (not yet implemented)
pda a2bde7a
bes.BuildEventServer WIP: gRPC listener prints TestResult test.xml paths
pda 4023ada
package bes: refactor, closer to bb-portal's implementation
pda 3f92240
WIP: bazel-bes uploads via channels etc
pda File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |
| *.dll | ||
| *.so | ||
| *.dylib | ||
| /bktec | ||
|
|
||
| # Test binary, built with `go test -c` | ||
| *.test | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| // Package bes implements a Bazel Build Event Service gRPC listener: | ||
| // https://bazel.build/remote/bep#build-event-service | ||
| // It listens for TestResult events, and uploads their XML report to Test | ||
| // Engine. | ||
| package bes | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "io" | ||
| "sort" | ||
|
|
||
| slog "github.com/buildkite/test-engine-client/internal/bes/quietslog" | ||
|
|
||
| "google.golang.org/genproto/googleapis/devtools/build/v1" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/status" | ||
| "google.golang.org/protobuf/encoding/protojson" | ||
| "google.golang.org/protobuf/types/known/emptypb" | ||
| ) | ||
|
|
||
| type BuildEventServer struct { | ||
| handler *BuildEventHandler | ||
| } | ||
|
|
||
| // PublishLifecycleEvent is copied verbatim from: | ||
| // https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go | ||
| // | ||
| // PublishLifecycleEvent handles life cycle events. | ||
| func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) { | ||
| slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent())) | ||
| return &emptypb.Empty{}, nil | ||
| } | ||
|
|
||
| // PublishBuildToolEventStream is copied verbatim from: | ||
| // https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go | ||
| // The BuildEventHandler and BuildEventChannel that it passes events to mimicks | ||
| // the expected interfaces, but provide a bktec-specific implementation. | ||
| // | ||
| // PublishBuildToolEventStream handles a build tool event stream. | ||
| // bktec thanks buildbarn/bb-portal for the basis of this :D | ||
| func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error { | ||
| slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context()) | ||
|
|
||
| // List of SequenceIds we've received. | ||
| // We'll want to ack these once all events are received, as we don't support resumption. | ||
| seqNrs := make([]int64, 0) | ||
|
|
||
| ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) { | ||
| if err := stream.Send(&build.PublishBuildToolEventStreamResponse{ | ||
| StreamId: streamID, | ||
| SequenceNumber: sequenceNumber, | ||
| }); err != nil { | ||
|
|
||
| // with the option --bes_upload_mode=fully_async or nowait_for_upload_complete | ||
| // its not an error when the send fails. the bes gracefully terminated the close | ||
| // i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s) | ||
| // the context is processed in the background, so by the time we are acknowledging these | ||
| // requests, the client connection may have already timed out and these errors can be | ||
| // safely ignored | ||
| grpcErr := status.Convert(err) | ||
| if isClosing && | ||
| grpcErr.Code() == codes.Unavailable && | ||
| grpcErr.Message() == "transport is closing" { | ||
| return | ||
| } | ||
|
|
||
| slog.ErrorContext( | ||
| stream.Context(), | ||
| "Send failed", | ||
| "err", | ||
| err, | ||
| "streamid", | ||
| streamID, | ||
| "sequenceNumber", | ||
| sequenceNumber, | ||
| ) | ||
| } | ||
| } | ||
|
|
||
| var streamID *build.StreamId | ||
| reqCh := make(chan *build.PublishBuildToolEventStreamRequest) | ||
| errCh := make(chan error) | ||
| var eventCh BuildEventChannel | ||
|
|
||
| go func() { | ||
| for { | ||
| req, err := stream.Recv() | ||
| if err != nil { | ||
| errCh <- err | ||
| return | ||
| } | ||
| reqCh <- req | ||
| } | ||
| }() | ||
|
|
||
| for { | ||
| select { | ||
| case err := <-errCh: | ||
| if err == io.EOF { | ||
| slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context()) | ||
|
|
||
| if eventCh == nil { | ||
| slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context()) | ||
| return nil | ||
| } | ||
|
|
||
| // Validate that all events were received | ||
| sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] }) | ||
|
|
||
| // TODO: Find out if initial sequence number can be != 1 | ||
| expected := int64(1) | ||
| for _, seqNr := range seqNrs { | ||
| if seqNr != expected { | ||
| return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected)) | ||
| } | ||
| expected++ | ||
| } | ||
|
|
||
| err := eventCh.Finalize() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Ack all events | ||
| for _, seqNr := range seqNrs { | ||
| ack(streamID, seqNr, true) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| slog.ErrorContext(stream.Context(), "Recv failed", "err", err) | ||
| return err | ||
|
|
||
| case req := <-reqCh: | ||
| // First event | ||
| if streamID == nil { | ||
| streamID = req.OrderedBuildEvent.GetStreamId() | ||
| eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent) | ||
| } | ||
|
|
||
| seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber()) | ||
|
|
||
| if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil { | ||
| slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err) | ||
| return err | ||
| } | ||
| } | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| package bes | ||
|
|
||
| import "testing" | ||
|
|
||
| func TestPathFromURI(t *testing.T) { | ||
| path, err := pathFromURI("file:///hello/world.txt") | ||
| if err != nil { | ||
| t.Errorf("pathFromURI error: %v", err) | ||
| } | ||
|
|
||
| if want := "/hello/world.txt"; want != path { | ||
| t.Errorf("wanted %v got %v", want, path) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,107 @@ | ||
| package bes | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "net/url" | ||
| "time" | ||
|
|
||
| "github.com/buildbarn/bb-portal/third_party/bazel/gen/bes" | ||
| "google.golang.org/genproto/googleapis/devtools/build/v1" | ||
| ) | ||
|
|
||
| // BuildEventChannel in bktec mimics the bb-portal interface so that the | ||
| // BuildEventServer.PublishBuildEventServer code can be used verbatim. | ||
| // | ||
| // BuildEventChannel handles a single BuildEvent stream | ||
| type BuildEventChannel interface { | ||
| // HandleBuildEvent processes a single BuildEvent | ||
| // This method should be called for each received event. | ||
| HandleBuildEvent(event *build.BuildEvent) error | ||
|
|
||
| // Finalize does post-processing of a stream of BuildEvents. | ||
| // This method should be called after receiving the EOF event. | ||
| Finalize() error | ||
| } | ||
|
|
||
| type buildEventChannel struct { | ||
| ctx context.Context | ||
| streamID *build.StreamId | ||
| filenames chan<- string | ||
| } | ||
|
|
||
| // HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. | ||
| func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { | ||
| if event.GetBazelEvent() == nil { | ||
| return nil | ||
| } | ||
| var bazelEvent bes.BuildEvent | ||
| if err := event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil { | ||
| slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err) | ||
| return err | ||
| } | ||
|
|
||
| payload := bazelEvent.GetPayload() | ||
| if testResult, ok := payload.(*bes.BuildEvent_TestResult); ok { | ||
| r := testResult.TestResult | ||
| files := []string{} | ||
| for _, x := range r.GetTestActionOutput() { | ||
| if x.GetName() == "test.xml" { | ||
| path, err := pathFromURI(x.GetUri()) | ||
| if err != nil { | ||
| return err // maybe just a log a warning? | ||
| } | ||
| files = append(files, path) | ||
| c.filenames <- path | ||
| } | ||
| } | ||
| slog.Info("TestResult", | ||
| "status", r.GetStatus(), | ||
| "cached", r.GetCachedLocally(), | ||
| "dur", r.GetTestAttemptDuration().AsDuration().String(), | ||
| "files", files, | ||
| ) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func pathFromURI(uri string) (string, error) { | ||
| url, err := url.Parse(uri) | ||
| if err != nil { | ||
| return "", err | ||
| } | ||
| if url.Scheme != "file" { | ||
| return "", fmt.Errorf("expected file://..., got %v://...", url.Scheme) | ||
| } | ||
| return url.Path, nil | ||
| } | ||
|
|
||
| // Finalize implements BuildEventChannel.Finalize. | ||
| func (c *buildEventChannel) Finalize() error { | ||
| // defer the ctx so its not reaped when the client closes the connection | ||
| ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24) | ||
| defer cancel() | ||
|
|
||
| slog.Info("finalizing build event channel") | ||
| _ = ctx | ||
| // TODO: finalize anything that needs finalizing? | ||
|
|
||
| cancel() | ||
| return nil | ||
| } | ||
|
|
||
| // noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events. | ||
| // It is used when receiving a stream of events that we wish to ack without processing. | ||
| type noOpBuildEventChannel struct{} | ||
|
|
||
| // HandleBuildEvent implements BuildEventChannel.HandleBuildEvent. | ||
| func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error { | ||
| return nil | ||
| } | ||
|
|
||
| // Finalize implements BuildEventChannel.Finalize. | ||
| func (c *noOpBuildEventChannel) Finalize() error { | ||
| return nil | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍