Skip to content

Commit d91ab02

Browse files
committed
engine: refactor session+client management
This commit focuses on making sessions + associated state management simpler: * More comprehensible+centralized state management * Rather than spread all over the place and tied together in random places, all of the state associated with a given session is in a daggerSession object and all of the state associated with a given client in a session is a daggerClient object * The code is also a lot more structured and "boring" in terms of locking/mutating state/etc. Not a rube goldberg machine anymore * The whole "pre-register a nested client's state before it calls", which was a fountain of confusion and bugs, is gone. * No more insane gRPC tunneling, the engine API is just an HTTP server now * graphQL http requests are just that, don't have to tunnel them through gRPC streams * session attachables are still gRPC based, but over a hijacked http conn (as opposed to a gRPC stream embedded in another gRPC stream) * This allowed us to move off the Session method from buildkit's upstream controller interface * That in turn let us delete huge chunks of complicated code around handing conns (i.e. engine/server/conn.go) and no longer need to be paranoid about gRPC max message limits in as many places There are more details in the PR description (7315). Signed-off-by: Erik Sipsma <[email protected]>
1 parent 73ed582 commit d91ab02

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2348
-2552
lines changed

analytics/analytics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ type Event struct {
2929
Type string `json:"type,omitempty"`
3030
Properties map[string]string `json:"properties,omitempty"`
3131

32-
DeviceID string `json:"device_id,omitempty"`
33-
ServerID string `json:"server_id,omitempty"`
32+
DeviceID string `json:"device_id,omitempty"`
33+
SessionID string `json:"session_id,omitempty"`
3434

3535
ClientVersion string `json:"client_version,omitempty"`
3636
ClientOS string `json:"client_os,omitempty"`
@@ -155,7 +155,7 @@ func (t *CloudTracker) Capture(ctx context.Context, event string, properties map
155155
ev.GitAuthorHashed = fmt.Sprintf("%x", sha256.Sum256([]byte(author)))
156156
}
157157
if clientMetadata, err := engine.ClientMetadataFromContext(ctx); err == nil {
158-
ev.ServerID = clientMetadata.ServerID
158+
ev.SessionID = clientMetadata.SessionID
159159
}
160160

161161
t.queue = append(t.queue, &queuedEvent{ctx: ctx, event: ev})

cmd/dagger/listen.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/rs/cors"
1212
"github.com/spf13/cobra"
1313
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
14+
"golang.org/x/net/http2"
15+
"golang.org/x/net/http2/h2c"
1416

1517
"dagger.io/dagger"
1618
"github.com/dagger/dagger/engine/client"
@@ -46,20 +48,29 @@ func Listen(ctx context.Context, engineClient *client.Client, _ *dagger.Module,
4648
defer sessionL.Close()
4749

4850
var handler http.Handler = engineClient
51+
4952
if allowCORS {
5053
handler = cors.AllowAll().Handler(handler)
5154
}
5255

56+
handler = otelhttp.NewHandler(handler, "listen", otelhttp.WithSpanNameFormatter(func(o string, r *http.Request) string {
57+
return fmt.Sprintf("%s: HTTP %s %s", o, r.Method, r.URL.Path)
58+
}))
59+
60+
http2Srv := &http2.Server{}
61+
handler = h2c.NewHandler(handler, http2Srv)
62+
5363
srv := &http.Server{
54-
Handler: otelhttp.NewHandler(handler, "listen", otelhttp.WithSpanNameFormatter(func(o string, r *http.Request) string {
55-
return fmt.Sprintf("%s: HTTP %s %s", o, r.Method, r.URL.Path)
56-
})),
64+
Handler: handler,
5765
// Gosec G112: prevent slowloris attacks
5866
ReadHeaderTimeout: 10 * time.Second,
5967
BaseContext: func(_ net.Listener) context.Context {
6068
return ctx
6169
},
6270
}
71+
if err := http2.ConfigureServer(srv, http2Srv); err != nil {
72+
return fmt.Errorf("http2 server configuration: %w", err)
73+
}
6374

6475
go func() {
6576
<-ctx.Done()

cmd/dagger/shell.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (ts *terminalSession) Run() error {
5858
NetDialContext: ts.Client.DialContext,
5959
}
6060

61-
reqHeader := http.Header{}
61+
reqHeader := ts.Client.AppendHTTPRequestHeaders(http.Header{})
6262
if ts.Client.SecretToken != "" {
6363
reqHeader["Authorization"] = []string{"Basic " + base64.StdEncoding.EncodeToString([]byte(ts.Client.SecretToken+":"))}
6464
}

cmd/engine/debug.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func logTraceMetrics(ctx context.Context) {
8383
}
8484
}
8585

86-
func logMetrics(ctx context.Context, engineStateRootDir string, eng *server.Engine) {
86+
func logMetrics(ctx context.Context, engineStateRootDir string, eng *server.Server) {
8787
for range time.Tick(60 * time.Second) {
8888
l := bklog.G(ctx)
8989

cmd/engine/main.go

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ import (
44
"context"
55
"crypto/tls"
66
"crypto/x509"
7-
goerrors "errors"
7+
"errors"
88
"fmt"
99
"io"
1010
"net"
11+
"net/http"
1112
"os"
1213
"os/exec"
1314
"os/user"
@@ -30,11 +31,12 @@ import (
3031
"github.com/moby/buildkit/util/profiler"
3132
"github.com/moby/buildkit/util/stack"
3233
"github.com/moby/buildkit/version"
33-
"github.com/pkg/errors"
3434
sloglogrus "github.com/samber/slog-logrus/v2"
3535
"github.com/sirupsen/logrus"
3636
"github.com/urfave/cli"
3737
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
38+
"golang.org/x/net/http2"
39+
"golang.org/x/net/http2/h2c"
3840
"golang.org/x/sync/errgroup"
3941
"google.golang.org/grpc"
4042

@@ -339,18 +341,18 @@ func main() { //nolint:gocyclo
339341
cfg.Root = root
340342

341343
if err := os.MkdirAll(root, 0o700); err != nil {
342-
return errors.Wrapf(err, "failed to create %s", root)
344+
return fmt.Errorf("failed to create %s: %w", root, err)
343345
}
344346

345347
bklog.G(ctx).Debug("creating engine lockfile")
346348
lockPath := filepath.Join(root, "buildkitd.lock")
347349
lock := flock.New(lockPath)
348350
locked, err := lock.TryLock()
349351
if err != nil {
350-
return errors.Wrapf(err, "could not lock %s", lockPath)
352+
return fmt.Errorf("could not lock %s: %w", lockPath, err)
351353
}
352354
if !locked {
353-
return errors.Errorf("could not lock %s, another instance running?", lockPath)
355+
return fmt.Errorf("could not lock %s, another instance running?", lockPath)
354356
}
355357
defer func() {
356358
lock.Unlock()
@@ -367,7 +369,7 @@ func main() { //nolint:gocyclo
367369
case "network.host":
368370
cfg.Entitlements = append(cfg.Entitlements, e)
369371
default:
370-
return errors.Errorf("invalid entitlement : %s", e)
372+
return fmt.Errorf("invalid entitlement : %s", e)
371373
}
372374
}
373375
}
@@ -381,9 +383,7 @@ func main() { //nolint:gocyclo
381383
if err != nil {
382384
return fmt.Errorf("failed to create engine: %w", err)
383385
}
384-
defer eng.Close()
385-
386-
eng.Register(grpcServer)
386+
defer srv.Close()
387387

388388
go logMetrics(context.Background(), cfg.Root, srv)
389389
if cfg.Trace {
@@ -398,9 +398,26 @@ func main() { //nolint:gocyclo
398398
}
399399

400400
// start serving on the listeners for actual clients
401-
bklog.G(ctx).Debug("starting main engine grpc listeners")
401+
bklog.G(ctx).Debug("starting main engine api listeners")
402+
srv.Register(grpcServer)
403+
http2Server := &http2.Server{}
404+
httpServer := &http.Server{
405+
ReadHeaderTimeout: 30 * time.Second,
406+
Handler: h2c.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
407+
if r.ProtoMajor == 2 && strings.HasPrefix(r.Header.Get("content-type"), "application/grpc") {
408+
// The docs on grpcServer.ServeHTTP warn that some features are missing vs. serving fully "native" gRPC,
409+
// but in practice it seems to work fine for us and only be relevant for some advanced features we don't use.
410+
grpcServer.ServeHTTP(w, r)
411+
return
412+
}
413+
srv.ServeHTTP(w, r)
414+
}), http2Server),
415+
}
416+
if err := http2.ConfigureServer(httpServer, http2Server); err != nil {
417+
return fmt.Errorf("failed to configure http2 server: %w", err)
418+
}
402419
errCh := make(chan error, 1)
403-
if err := serveGRPC(cfg.GRPC, grpcServer, errCh); err != nil {
420+
if err := serveAPI(cfg.GRPC, httpServer, errCh); err != nil {
404421
return err
405422
}
406423

@@ -424,7 +441,7 @@ func main() { //nolint:gocyclo
424441
if stopCacheErr != nil {
425442
bklog.G(ctx).WithError(stopCacheErr).Error("failed to stop cache")
426443
}
427-
err = goerrors.Join(err, stopCacheErr)
444+
err = errors.Join(err, stopCacheErr)
428445
cancelNetworking()
429446

430447
bklog.G(ctx).Infof("stopping server")
@@ -449,7 +466,11 @@ func main() { //nolint:gocyclo
449466
}
450467
}
451468

452-
func serveGRPC(cfg config.GRPCConfig, server *grpc.Server, errCh chan error) error {
469+
func serveAPI(
470+
cfg config.GRPCConfig,
471+
httpServer *http.Server,
472+
errCh chan error,
473+
) error {
453474
addrs := cfg.Address
454475
if len(addrs) == 0 {
455476
return errors.New("--addr cannot be empty")
@@ -480,7 +501,12 @@ func serveGRPC(cfg config.GRPCConfig, server *grpc.Server, errCh chan error) err
480501
eg.Go(func() error {
481502
defer l.Close()
482503
logrus.Infof("running server on %s", l.Addr())
483-
return server.Serve(l)
504+
505+
err := httpServer.Serve(l)
506+
if err != nil {
507+
return fmt.Errorf("serve: %w", err)
508+
}
509+
return nil
484510
})
485511
}(l)
486512
}
@@ -616,7 +642,7 @@ func applyMainFlags(c *cli.Context, cfg *config.Config) error {
616642
} else {
617643
maxParallelism, err = strconv.Atoi(maxParallelismStr)
618644
if err != nil {
619-
return errors.Wrap(err, "failed to parse oci-max-parallelism, should be positive integer, 0 for unlimited, or 'num-cpu' for setting to the number of CPUs")
645+
return fmt.Errorf("failed to parse oci-max-parallelism, should be positive integer, 0 for unlimited, or 'num-cpu' for setting to the number of CPUs: %w", err)
620646
}
621647
}
622648
cfg.Workers.OCI.MaxParallelism = maxParallelism
@@ -661,7 +687,7 @@ func grouptoGID(group string) (int, error) {
661687
func getListener(addr string, uid, gid int, tlsConfig *tls.Config) (net.Listener, error) {
662688
addrSlice := strings.SplitN(addr, "://", 2)
663689
if len(addrSlice) < 2 {
664-
return nil, errors.Errorf("address %s does not contain proto, you meant unix://%s ?",
690+
return nil, fmt.Errorf("address %s does not contain proto, you meant unix://%s ?",
665691
addr, addr)
666692
}
667693
proto := addrSlice[0]
@@ -686,7 +712,7 @@ func getListener(addr string, uid, gid int, tlsConfig *tls.Config) (net.Listener
686712
}
687713
return tls.NewListener(l, tlsConfig), nil
688714
default:
689-
return nil, errors.Errorf("addr %s not supported", addr)
715+
return nil, fmt.Errorf("addr %s not supported", addr)
690716
}
691717
}
692718

@@ -706,7 +732,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
706732
}
707733
certificate, err := tls.LoadX509KeyPair(certFile, keyFile)
708734
if err != nil {
709-
return nil, errors.Wrap(err, "could not load server key pair")
735+
return nil, fmt.Errorf("could not load server key pair: %w", err)
710736
}
711737
tlsConf := &tls.Config{
712738
Certificates: []tls.Certificate{certificate},
@@ -716,7 +742,7 @@ func serverCredentials(cfg config.TLSConfig) (*tls.Config, error) {
716742
certPool := x509.NewCertPool()
717743
ca, err := os.ReadFile(caFile)
718744
if err != nil {
719-
return nil, errors.Wrap(err, "could not read ca certificate")
745+
return nil, fmt.Errorf("could not read ca certificate: %w", err)
720746
}
721747
// Append the client certificates from the CA
722748
if ok := certPool.AppendCertsFromPEM(ca); !ok {
@@ -733,7 +759,7 @@ func attrMap(sl []string) (map[string]string, error) {
733759
for _, v := range sl {
734760
parts := strings.SplitN(v, "=", 2)
735761
if len(parts) != 2 {
736-
return nil, errors.Errorf("invalid value %s", v)
762+
return nil, fmt.Errorf("invalid value %s", v)
737763
}
738764
m[parts[0]] = parts[1]
739765
}

core/container.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ func (container *Container) From(ctx context.Context, addr string) (*Container,
301301
},
302302
})
303303
if err != nil {
304-
return nil, err
304+
return nil, fmt.Errorf("failed to resolve image %s: %w", ref, err)
305305
}
306306

307307
digested, err := reference.WithDigest(refName, digest)

core/container_exec.go

Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ import (
1212
"github.com/dagger/dagger/engine"
1313
"github.com/dagger/dagger/engine/buildkit"
1414
"github.com/moby/buildkit/client/llb"
15+
"github.com/moby/buildkit/identity"
1516
"github.com/pkg/errors"
16-
"go.opentelemetry.io/otel"
17-
"go.opentelemetry.io/otel/propagation"
18-
"go.opentelemetry.io/otel/trace"
1917
)
2018

2119
type ContainerExecOpts struct {
@@ -41,9 +39,8 @@ type ContainerExecOpts struct {
4139
// Grant the process all root capabilities
4240
InsecureRootCapabilities bool `default:"false"`
4341

44-
// (Internal-only) If this is a nested exec for a Function call, this should be set
45-
// with the metadata for that call
46-
NestedExecFunctionCall *FunctionCall `name:"-"`
42+
// (Internal-only) If this is a nested exec, exec metadata to use for it
43+
NestedExecMetadata *buildkit.ExecutionMetadata `name:"-"`
4744
}
4845

4946
func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts) (*Container, error) { //nolint:gocyclo
@@ -72,18 +69,18 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
7269
return nil, err
7370
}
7471

75-
execMD := buildkit.ExecutionMetadata{
76-
ServerID: clientMetadata.ServerID,
77-
78-
HostAliases: make(map[string][]string),
79-
80-
RedirectStdoutPath: opts.RedirectStdout,
81-
RedirectStderrPath: opts.RedirectStderr,
82-
83-
SystemEnvNames: container.SystemEnvNames,
84-
85-
EnabledGPUs: container.EnabledGPUs,
72+
execMD := buildkit.ExecutionMetadata{}
73+
if opts.NestedExecMetadata != nil {
74+
execMD = *opts.NestedExecMetadata
75+
}
76+
execMD.SessionID = clientMetadata.SessionID
77+
if execMD.HostAliases == nil {
78+
execMD.HostAliases = make(map[string][]string)
8679
}
80+
execMD.RedirectStdoutPath = opts.RedirectStdout
81+
execMD.RedirectStderrPath = opts.RedirectStderr
82+
execMD.SystemEnvNames = container.SystemEnvNames
83+
execMD.EnabledGPUs = container.EnabledGPUs
8784

8885
// if GPU parameters are set for this container pass them over:
8986
if len(execMD.EnabledGPUs) > 0 {
@@ -94,44 +91,27 @@ func (container *Container) WithExec(ctx context.Context, opts ContainerExecOpts
9491

9592
// this allows executed containers to communicate back to this API
9693
if opts.ExperimentalPrivilegedNesting {
97-
callerOpts := opts.NestedExecFunctionCall
98-
if callerOpts == nil {
99-
// default to caching the nested exec
100-
callerOpts = &FunctionCall{
101-
Cache: true,
102-
}
103-
}
104-
10594
// directing telemetry to another span (i.e. a function call).
106-
if callerOpts.SpanContext.IsValid() {
107-
execMD.SpanContext = propagation.MapCarrier{}
108-
otel.GetTextMapPropagator().Inject(
109-
trace.ContextWithSpanContext(ctx, callerOpts.SpanContext),
110-
execMD.SpanContext,
111-
)
112-
95+
if execMD.SpanContext != nil {
11396
// hide the exec span
11497
spanName = buildkit.InternalPrefix + spanName
11598
}
11699

117-
execMD.ClientID, err = container.Query.RegisterCaller(ctx, callerOpts)
118-
if err != nil {
119-
return nil, fmt.Errorf("register caller: %w", err)
120-
}
100+
execMD.ClientID = identity.NewID()
121101

122102
// include the engine version so that these execs get invalidated if the engine/API change
123103
runOpts = append(runOpts, llb.AddEnv(buildkit.DaggerEngineVersionEnv, engine.Version))
124104

125105
// include a digest of the current call so that we scope of the cache of the ExecOp to this call
126106
runOpts = append(runOpts, llb.AddEnv(buildkit.DaggerCallDigestEnv, string(dagql.CurrentID(ctx).Digest())))
127107

128-
if !callerOpts.Cache {
129-
// include the ServerID here so that we bust cache once-per-session
108+
if execMD.CachePerSession {
109+
// include the SessionID here so that we bust cache once-per-session
130110
clientMetadata, err := engine.ClientMetadataFromContext(ctx)
131111
if err != nil {
132112
return nil, err
133113
}
134-
runOpts = append(runOpts, llb.AddEnv(buildkit.DaggerServerIDEnv, clientMetadata.ServerID))
114+
runOpts = append(runOpts, llb.AddEnv(buildkit.DaggerSessionIDEnv, clientMetadata.SessionID))
135115
}
136116
}
137117

core/git.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,5 +105,5 @@ func (ref *GitRef) getState(ctx context.Context) (llb.State, error) {
105105
return llb.State{}, err
106106
}
107107

108-
return gitdns.Git(ref.Repo.URL, ref.Ref, clientMetadata.ServerID, opts...), nil
108+
return gitdns.Git(ref.Repo.URL, ref.Ref, clientMetadata.SessionID, opts...), nil
109109
}

0 commit comments

Comments
 (0)