Skip to content

Commit 78c1c93

Browse files
committed
WIP: bazel-bes uploads via channels etc
1 parent 20627e7 commit 78c1c93

File tree

7 files changed

+223
-35
lines changed

7 files changed

+223
-35
lines changed

internal/bes/bes.go

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,47 +8,21 @@ import (
88
"context"
99
"fmt"
1010
"io"
11-
"net"
1211
"sort"
1312

1413
slog "github.com/buildkite/test-engine-client/internal/bes/quietslog"
1514

1615
"google.golang.org/genproto/googleapis/devtools/build/v1"
17-
"google.golang.org/grpc"
1816
"google.golang.org/grpc/codes"
1917
"google.golang.org/grpc/status"
2018
"google.golang.org/protobuf/encoding/protojson"
2119
"google.golang.org/protobuf/types/known/emptypb"
2220
)
2321

24-
var host = "127.0.0.1"
25-
var port = 60242 // 0 for OS-allocated
26-
2722
type BuildEventServer struct {
2823
handler *BuildEventHandler
2924
}
3025

31-
func Listen() error {
32-
addr := fmt.Sprintf("%s:%d", host, port)
33-
listener, err := net.Listen("tcp", addr)
34-
if err != nil {
35-
return fmt.Errorf("listening on %s: %w", addr, err)
36-
}
37-
fmt.Println("Bazel BES listener: grpc://" + listener.Addr().String())
38-
39-
opts := []grpc.ServerOption{}
40-
grpcServer := grpc.NewServer(opts...)
41-
42-
build.RegisterPublishBuildEventServer(grpcServer, newServer())
43-
grpcServer.Serve(listener)
44-
45-
return nil
46-
}
47-
48-
func newServer() build.PublishBuildEventServer {
49-
return BuildEventServer{}
50-
}
51-
5226
// PublishLifecycleEvent is copied verbatim from:
5327
// https://github.com/buildbarn/bb-portal/blob/abb76f0a9324cf4f9d5da44b53804a8d9a0a2155/internal/api/grpc/bes/server.go
5428
//

internal/bes/bes_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package bes
2+
3+
import "testing"
4+
5+
func TestPathFromURI(t *testing.T) {
6+
path, err := pathFromURI("file:///hello/world.txt")
7+
if err != nil {
8+
t.Errorf("pathFromURI error: %v", err)
9+
}
10+
11+
if want := "/hello/world.txt"; want != path {
12+
t.Errorf("wanted %v got %v", want, path)
13+
}
14+
}

internal/bes/channel.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package bes
22

33
import (
44
"context"
5+
"fmt"
56
"log/slog"
7+
"net/url"
68
"time"
79

810
"github.com/buildbarn/bb-portal/third_party/bazel/gen/bes"
@@ -24,8 +26,9 @@ type BuildEventChannel interface {
2426
}
2527

2628
type buildEventChannel struct {
27-
ctx context.Context
28-
streamID *build.StreamId
29+
ctx context.Context
30+
streamID *build.StreamId
31+
filenames chan<- string
2932
}
3033

3134
// HandleBuildEvent implements BuildEventChannel.HandleBuildEvent.
@@ -45,7 +48,12 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
4548
files := []string{}
4649
for _, x := range r.GetTestActionOutput() {
4750
if x.GetName() == "test.xml" {
48-
files = append(files, x.GetUri())
51+
path, err := pathFromURI(x.GetUri())
52+
if err != nil {
53+
return err // maybe just a log a warning?
54+
}
55+
files = append(files, path)
56+
c.filenames <- path
4957
}
5058
}
5159
slog.Info("TestResult",
@@ -59,13 +67,24 @@ func (c *buildEventChannel) HandleBuildEvent(event *build.BuildEvent) error {
5967
return nil
6068
}
6169

70+
func pathFromURI(uri string) (string, error) {
71+
url, err := url.Parse(uri)
72+
if err != nil {
73+
return "", err
74+
}
75+
if url.Scheme != "file" {
76+
return "", fmt.Errorf("expected file://..., got %v://...", url.Scheme)
77+
}
78+
return url.Path, nil
79+
}
80+
6281
// Finalize implements BuildEventChannel.Finalize.
6382
func (c *buildEventChannel) Finalize() error {
6483
// defer the ctx so its not reaped when the client closes the connection
6584
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*24)
6685
defer cancel()
6786

68-
slog.Debug("finalizing build event channel")
87+
slog.Info("finalizing build event channel")
6988
_ = ctx
7089
// TODO: finalize anything that needs finalizing?
7190

internal/bes/handler.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@ import (
1313
// For each incoming stream, and BuildEventChannel is created, which handles that stream.
1414
// BuildEventHandler is responsible for managing the things that are common to these event streams.
1515
type BuildEventHandler struct {
16+
filenames chan<- string
1617
}
1718

1819
// NewBuildEventHandler constructs a new BuildEventHandler
19-
func NewBuildEventHandler() *BuildEventHandler {
20-
return &BuildEventHandler{}
20+
func NewBuildEventHandler(filenames chan<- string) *BuildEventHandler {
21+
return &BuildEventHandler{
22+
filenames: filenames,
23+
}
2124
}
2225

2326
// CreateEventChannel creates a new BuildEventChannel
@@ -29,7 +32,8 @@ func (h *BuildEventHandler) CreateEventChannel(ctx context.Context, initialEvent
2932
}
3033

3134
return &buildEventChannel{
32-
ctx: ctx,
33-
streamID: initialEvent.StreamId,
35+
ctx: ctx,
36+
streamID: initialEvent.StreamId,
37+
filenames: h.filenames,
3438
}
3539
}

internal/bes/listen.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package bes
2+
3+
import (
4+
"context"
5+
"flag"
6+
"fmt"
7+
"log/slog"
8+
"net"
9+
"os"
10+
"os/signal"
11+
"syscall"
12+
13+
"github.com/buildkite/test-engine-client/internal/env"
14+
"github.com/buildkite/test-engine-client/internal/upload"
15+
"google.golang.org/genproto/googleapis/devtools/build/v1"
16+
"google.golang.org/grpc"
17+
)
18+
19+
func ListenCLI(argv []string, env env.Env) error {
20+
flags := flag.NewFlagSet("bktec bazel listen", flag.ExitOnError)
21+
portFlag := flags.Int("port", 0, "gRPC port to listen")
22+
listenHostFlag := flags.String("listen-host", "127.0.0.1", "gRPC host to listen")
23+
debugFlag := flags.Bool("debug", false, "debug logging")
24+
flags.Parse(argv)
25+
26+
if *debugFlag {
27+
slog.SetLogLoggerLevel(slog.LevelDebug)
28+
}
29+
30+
ctx, cancel := context.WithCancel(context.Background())
31+
defer cancel()
32+
33+
// a channel to propagate OS signals
34+
signals := make(chan os.Signal, 1)
35+
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT)
36+
37+
// configure uploader
38+
cfg, err := upload.ConfigFromEnv(env)
39+
if err != nil {
40+
return fmt.Errorf("uploader configuration: %w", err)
41+
}
42+
runEnv, err := upload.RunEnvFromEnv(env)
43+
if err != nil {
44+
return fmt.Errorf("uploader run_env configuration: %w", err)
45+
}
46+
uploader := NewUploader(cfg, runEnv, "junit")
47+
go uploader.Start(ctx)
48+
49+
// configure gRPC Bazel BES server
50+
addr := fmt.Sprintf("%s:%d", *listenHostFlag, *portFlag)
51+
listener, err := net.Listen("tcp", addr)
52+
if err != nil {
53+
return fmt.Errorf("listening on %s: %w", addr, err)
54+
}
55+
opts := []grpc.ServerOption{}
56+
srv := grpc.NewServer(opts...)
57+
build.RegisterPublishBuildEventServer(srv, BuildEventServer{
58+
handler: &BuildEventHandler{
59+
filenames: uploader.Filenames,
60+
},
61+
})
62+
slog.Info("Bazel BES listener", "addr", "grpc://"+listener.Addr().String())
63+
go serve(srv, listener)
64+
65+
// main loop
66+
run := true
67+
sigCount := 0
68+
for run {
69+
select {
70+
case url, ok := <-uploader.Responses:
71+
if !ok {
72+
slog.Debug("Response channel closed")
73+
run = false
74+
continue
75+
}
76+
slog.Info("Uploaded", "url", url)
77+
case err := <-uploader.Errs:
78+
slog.Error("Upload error", "error", err)
79+
case sig := <-signals:
80+
sigCount++
81+
srv.Stop()
82+
if sigCount == 1 {
83+
slog.Info("Stopping (again to force)...", "signal", sig)
84+
uploader.Stop()
85+
} else {
86+
slog.Info("Stopping forcefully...", "signal", sig)
87+
cancel()
88+
}
89+
}
90+
}
91+
92+
slog.Debug("done")
93+
return nil
94+
}
95+
96+
func serve(s *grpc.Server, listener net.Listener) {
97+
err := s.Serve(listener)
98+
if err != nil {
99+
slog.Error("gRPC server error", "err", err)
100+
}
101+
}

internal/bes/uploader.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package bes
2+
3+
import (
4+
"context"
5+
"log/slog"
6+
7+
"github.com/buildkite/test-engine-client/internal/upload"
8+
)
9+
10+
type Uploader struct {
11+
Config upload.Config
12+
RunEnv upload.RunEnvMap
13+
Format string
14+
Filenames chan string
15+
Responses chan string
16+
Errs chan error
17+
18+
stopping bool
19+
}
20+
21+
func NewUploader(cfg upload.Config, runEnv upload.RunEnvMap, format string) *Uploader {
22+
// a channel to pass filenames from BES server to uploader
23+
filenames := make(chan string, 1024)
24+
25+
// a channel to receive response upload URLs
26+
responses := make(chan string)
27+
28+
// a channel to receive errors from the uploader
29+
errs := make(chan error)
30+
31+
return &Uploader{
32+
Config: cfg,
33+
RunEnv: runEnv,
34+
Format: format,
35+
Filenames: filenames,
36+
Responses: responses,
37+
Errs: errs,
38+
}
39+
}
40+
41+
func (u *Uploader) Start(ctx context.Context) {
42+
for filename := range u.Filenames {
43+
if ctx.Err() != nil {
44+
slog.Debug("Uploader context canceled")
45+
break
46+
}
47+
resp, err := u.UploadFile(ctx, filename)
48+
if err != nil {
49+
u.Errs <- err
50+
continue
51+
}
52+
u.Responses <- resp["upload_url"]
53+
}
54+
slog.Debug("Uploader finished")
55+
close(u.Responses)
56+
}
57+
58+
// Stop closes the Filenames channel; filenames already buffered on the channel
59+
// will be uploaded before finishing.
60+
func (u *Uploader) Stop() {
61+
if u.stopping {
62+
slog.Warn("Uploader GracefulStop: already stopping")
63+
return
64+
}
65+
slog.Debug("Uploader GracefulStop")
66+
u.stopping = true
67+
close(u.Filenames)
68+
}
69+
70+
func (u *Uploader) UploadFile(ctx context.Context, filename string) (map[string]string, error) {
71+
resp, err := upload.Upload(ctx, u.Config, u.RunEnv, u.Format, filename)
72+
if err != nil {
73+
return nil, err
74+
}
75+
return resp, nil
76+
}

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func main() {
7272
}
7373
os.Exit(0)
7474
} else if flag.Arg(0) == "bazel" && flag.Arg(1) == "listen" {
75-
if err := bes.Listen(); err != nil {
75+
if err := bes.ListenCLI(os.Args[3:], env); err != nil {
7676
log.Fatal(err)
7777
}
7878
os.Exit(0)

0 commit comments

Comments
 (0)