Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*.dll
*.so
*.dylib
/bktec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


# Test binary, built with `go test -c`
*.test
Expand Down
17 changes: 10 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module github.com/buildkite/test-engine-client

go 1.21
go 1.23.0

toolchain go1.22.4
toolchain go1.24.0

require (
github.com/buildkite/roko v1.3.1
Expand All @@ -11,19 +11,22 @@ require (

require (
drjosh.dev/zzglob v0.4.0
github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c
github.com/google/uuid v1.6.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/olekukonko/tablewriter v0.0.5
github.com/pact-foundation/pact-go/v2 v2.0.10
golang.org/x/net v0.33.0
golang.org/x/net v0.35.0
golang.org/x/sys v0.30.0
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb
google.golang.org/grpc v1.70.0
google.golang.org/protobuf v1.36.5
)

require (
github.com/hashicorp/logutils v1.0.0 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
google.golang.org/grpc v1.67.3 // indirect
google.golang.org/protobuf v1.36.3 // indirect
golang.org/x/text v0.22.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e // indirect
)
42 changes: 32 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
drjosh.dev/zzglob v0.4.0 h1:gOb46aIHyHG8BlYpvZZM4dqR2dpsbKtI82IbYVAYIj4=
drjosh.dev/zzglob v0.4.0/go.mod h1:c3V3WPyfG+81h/bNOalEaba0jEQl16i9efSAmWOeOw8=
github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c h1:qLnyVD+ND7Ll3p9Lw0Z7Vk5HirKRZcBRJzHELYe5Z84=
github.com/buildbarn/bb-portal v0.0.0-20250220144241-94f72e8e190c/go.mod h1:GHZ5lGzUtz9LQ2oHt8EweXn0zS8t2sCD9bNBw9R9s8E=
github.com/buildkite/roko v1.3.1 h1:t7K30ceLLYn6k7hQP4oq1c7dVlhgD5nRcuSRDEEnY1s=
github.com/buildkite/roko v1.3.1/go.mod h1:23R9e6nHxgedznkwwfmqZ6+0VJZJZ2Sg/uVcp2cP46I=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
Expand All @@ -22,18 +30,32 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U=
go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg=
go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M=
go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8=
go.opentelemetry.io/otel/sdk v1.32.0 h1:RNxepc9vK59A8XsgZQouW8ue8Gkb4jpWtJm9ge5lEG4=
go.opentelemetry.io/otel/sdk v1.32.0/go.mod h1:LqgegDBjKMmb2GC6/PrTnteJG39I8/vJCAP9LlJXEjU=
go.opentelemetry.io/otel/sdk/metric v1.32.0 h1:rZvFnvmvawYb0alrYkjraqJq0Z4ZUJAiyYCU9snn1CU=
go.opentelemetry.io/otel/sdk/metric v1.32.0/go.mod h1:PWeZlq0zt9YkYAp3gjKZ0eicRYvOh1Gd+X99x6GHpCQ=
go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM=
go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc=
golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8/go.mod h1:lcTa1sDdWEIHMWlITnIczmw5w60CF9ffkb8Z+DVmmjA=
google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8=
google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s=
google.golang.org/protobuf v1.36.3 h1:82DV7MYdb8anAVi3qge1wSnMDrnKK7ebr+I0hHRN1BU=
google.golang.org/protobuf v1.36.3/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb h1:ITgPrl429bc6+2ZraNSzMDk3I95nmQln2fuPstKwFDE=
google.golang.org/genproto v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:sAo5UzpjUwgFBCzupwhcLcxHVDK7vG5IqI30YnwX2eE=
google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e h1:nsxey/MfoGzYNduN0NN/+hqP9iiCIYsrVbXb/8hjFM8=
google.golang.org/genproto/googleapis/api v0.0.0-20250227231956-55c901821b1e/go.mod h1:Xsh8gBVxGCcbV8ZeTB9wI5XPyZ5RvC6V3CTeeplHbiA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e h1:YA5lmSs3zc/5w+xsRcHqpETkaYyK63ivEPzNTcUUlSA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250227231956-55c901821b1e/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I=
google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ=
google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
Expand Down
151 changes: 151 additions & 0 deletions internal/bes/bes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Package bes implements a Bazel Build Event Service gRPC listener:
// https://bazel.build/remote/bep#build-event-service
// It listens for TestResult events, and uploads their XML report to Test
// Engine.
package bes

import (
"context"
"fmt"
"io"
"sort"

slog "github.com/buildkite/test-engine-client/internal/bes/quietslog"

"google.golang.org/genproto/googleapis/devtools/build/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/emptypb"
)

type BuildEventServer struct {
handler *BuildEventHandler
}

// PublishLifecycleEvent is copied verbatim from:
// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go
//
// PublishLifecycleEvent handles life cycle events.
func (s BuildEventServer) PublishLifecycleEvent(ctx context.Context, request *build.PublishLifecycleEventRequest) (*emptypb.Empty, error) {
slog.InfoContext(ctx, "Received event", "event", protojson.Format(request.BuildEvent.GetEvent()))
return &emptypb.Empty{}, nil
}

// PublishBuildToolEventStream is copied verbatim from:
// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go
// The BuildEventHandler and BuildEventChannel that it passes events to mimicks
// the expected interfaces, but provide a bktec-specific implementation.
//
// PublishBuildToolEventStream handles a build tool event stream.
// bktec thanks buildbarn/bb-portal for the basis of this :D
func (s BuildEventServer) PublishBuildToolEventStream(stream build.PublishBuildEvent_PublishBuildToolEventStreamServer) error {
slog.InfoContext(stream.Context(), "Stream started", "event", stream.Context())

// List of SequenceIds we've received.
// We'll want to ack these once all events are received, as we don't support resumption.
seqNrs := make([]int64, 0)

ack := func(streamID *build.StreamId, sequenceNumber int64, isClosing bool) {
if err := stream.Send(&build.PublishBuildToolEventStreamResponse{
StreamId: streamID,
SequenceNumber: sequenceNumber,
}); err != nil {

// with the option --bes_upload_mode=fully_async or nowait_for_upload_complete
// its not an error when the send fails. the bes gracefully terminated the close
// i.e. sent an EOF. for long running builds that take a while to save to the db (> 1s)
// the context is processed in the background, so by the time we are acknowledging these
// requests, the client connection may have already timed out and these errors can be
// safely ignored
grpcErr := status.Convert(err)
if isClosing &&
grpcErr.Code() == codes.Unavailable &&
grpcErr.Message() == "transport is closing" {
return
}

slog.ErrorContext(
stream.Context(),
"Send failed",
"err",
err,
"streamid",
streamID,
"sequenceNumber",
sequenceNumber,
)
}
}

var streamID *build.StreamId
reqCh := make(chan *build.PublishBuildToolEventStreamRequest)
errCh := make(chan error)
var eventCh BuildEventChannel

go func() {
for {
req, err := stream.Recv()
if err != nil {
errCh <- err
return
}
reqCh <- req
}
}()

for {
select {
case err := <-errCh:
if err == io.EOF {
slog.InfoContext(stream.Context(), "Stream finished", "event", stream.Context())

if eventCh == nil {
slog.WarnContext(stream.Context(), "No event channel found for stream event", "event", stream.Context())
return nil
}

// Validate that all events were received
sort.Slice(seqNrs, func(i, j int) bool { return seqNrs[i] < seqNrs[j] })

// TODO: Find out if initial sequence number can be != 1
expected := int64(1)
for _, seqNr := range seqNrs {
if seqNr != expected {
return status.Error(codes.Unknown, fmt.Sprintf("received unexpected sequence number %d, expected %d", seqNr, expected))
}
expected++
}

err := eventCh.Finalize()
if err != nil {
return err
}

// Ack all events
for _, seqNr := range seqNrs {
ack(streamID, seqNr, true)
}

return nil
}

slog.ErrorContext(stream.Context(), "Recv failed", "err", err)
return err

case req := <-reqCh:
// First event
if streamID == nil {
streamID = req.OrderedBuildEvent.GetStreamId()
eventCh = s.handler.CreateEventChannel(stream.Context(), req.OrderedBuildEvent)
}

seqNrs = append(seqNrs, req.OrderedBuildEvent.GetSequenceNumber())

if err := eventCh.HandleBuildEvent(req.OrderedBuildEvent.Event); err != nil {
slog.ErrorContext(stream.Context(), "HandleBuildEvent failed", "err", err)
return err
}
}
}
}
14 changes: 14 additions & 0 deletions internal/bes/bes_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bes

import "testing"

func TestPathFromURI(t *testing.T) {
path, err := pathFromURI("file:///hello/world.txt")
if err != nil {
t.Errorf("pathFromURI error: %v", err)
}

if want := "/hello/world.txt"; want != path {
t.Errorf("wanted %v got %v", want, path)
}
}
107 changes: 107 additions & 0 deletions internal/bes/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package bes

import (
"context"
"fmt"
"log/slog"
"net/url"
"time"

"github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
"google.golang.org/genproto/googleapis/devtools/build/v1"
)

// BuildEventChannel in bktec mimics the bb-portal interface so that the
// BuildEventServer.PublishBuildEventServer code can be used verbatim.
//
// BuildEventChannel handles a single BuildEvent stream
type BuildEventChannel interface {
// HandleBuildEvent processes a single BuildEvent
// This method should be called for each received event.
HandleBuildEvent(event *build.BuildEvent) error

// Finalize does post-processing of a stream of BuildEvents.
// This method should be called after receiving the EOF event.
Finalize() error
}

type buildEventChannel struct {
ctx context.Context
streamID *build.StreamId
filenames chan<- string
}

// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
if event.GetBazelEvent() == nil {
return nil
}
var bazelEvent bes.BuildEvent
if err := event.GetBazelEvent().UnmarshalTo(&bazelEvent); err != nil {
slog.ErrorContext(c.ctx, "UnmarshalTo failed", "err", err)
return err
}

payload := bazelEvent.GetPayload()
if testResult, ok := payload.(*bes.BuildEvent_TestResult); ok {
r := testResult.TestResult
files := []string{}
for _, x := range r.GetTestActionOutput() {
if x.GetName() == "test.xml" {
path, err := pathFromURI(x.GetUri())
if err != nil {
return err // maybe just a log a warning?
}
files = append(files, path)
c.filenames <- path
}
}
slog.Info("TestResult",
"status", r.GetStatus(),
"cached", r.GetCachedLocally(),
"dur", r.GetTestAttemptDuration().AsDuration().String(),
"files", files,
)
}

return nil
}

func pathFromURI(uri string) (string, error) {
url, err := url.Parse(uri)
if err != nil {
return "", err
}
if url.Scheme != "file" {
return "", fmt.Errorf("expected file://..., got %v://...", url.Scheme)
}
return url.Path, nil
}

// Finalize implements BuildEventChannel.Finalize.
func (c *buildEventChannel) Finalize() error {
// defer the ctx so its not reaped when the client closes the connection
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24)
defer cancel()

slog.Info("finalizing build event channel")
_ = ctx
// TODO: finalize anything that needs finalizing?

cancel()
return nil
}

// noOpBuildEventChannel is an implementation of BuildEventChannel which does no processing of events.
// It is used when receiving a stream of events that we wish to ack without processing.
type noOpBuildEventChannel struct{}

// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
func (c *noOpBuildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
return nil
}

// Finalize implements BuildEventChannel.Finalize.
func (c *noOpBuildEventChannel) Finalize() error {
return nil
}
Loading