Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce sync+exec watch action #12330

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,5 @@ require (
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/compose-spec/compose-go/v2 => github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8E
github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE=
github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4=
github.com/compose-spec/compose-go/v2 v2.4.5 h1:p4ih4Jb6VgGPLPxh3fSFVKAjFHtZd+7HVLCSFzcFx9Y=
github.com/compose-spec/compose-go/v2 v2.4.5/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
github.com/containerd/cgroups/v3 v3.0.2 h1:f5WFqIVSgo5IZmtTT3qVBo6TzI1ON6sycSBKkymb9L0=
github.com/containerd/cgroups/v3 v3.0.2/go.mod h1:JUgITrzdFqp42uI2ryGA+ge0ap/nxzYgkGmIcetmErE=
Expand Down Expand Up @@ -359,6 +357,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab h1:3Q4/1sAnPv4nMpak/lIzWsQJjX8X5zKZRkDd6mlf2mc=
github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU=
Expand Down
139 changes: 82 additions & 57 deletions pkg/compose/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import (
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"time"

"github.com/compose-spec/compose-go/v2/types"
ccli "github.com/docker/cli/cli/command/container"
pathutil "github.com/docker/compose/v2/internal/paths"
"github.com/docker/compose/v2/internal/sync"
"github.com/docker/compose/v2/pkg/api"
Expand All @@ -48,7 +48,7 @@ const quietPeriod = 500 * time.Millisecond
// fileEvent contains the Compose service and modified host system path.
type fileEvent struct {
sync.PathMapping
Action types.WatchAction
Trigger types.Trigger
}

// getSyncImplementation returns an appropriate sync implementation for the
Expand Down Expand Up @@ -298,7 +298,7 @@ func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMat
}

return &fileEvent{
Action: trigger.Action,
Trigger: trigger,
PathMapping: sync.PathMapping{
HostPath: hostPath,
ContainerPath: containerPath,
Expand Down Expand Up @@ -336,7 +336,10 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
}

if trigger.Action == types.WatchActionRebuild && service.Build == nil {
return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
return nil, fmt.Errorf("service %s doesn't have a build section, can't apply %s on watch", types.WatchActionRebuild, service.Name)
}
if trigger.Action == types.WatchActionSyncExec && len(trigger.Exec.Command) == 0 {
return nil, fmt.Errorf("can't watch with action %q on service %s wihtout a command", types.WatchActionSyncExec, service.Name)
}

config.Watch[i] = trigger
Expand All @@ -352,24 +355,17 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
out := make(chan []fileEvent)
go func() {
defer close(out)
seen := make(map[fileEvent]time.Time)
seen := make(map[string]fileEvent)
flushEvents := func() {
if len(seen) == 0 {
return
}
events := make([]fileEvent, 0, len(seen))
for e := range seen {
for _, e := range seen {
events = append(events, e)
}
// sort batch by oldest -> newest
// (if an event is seen > 1 per batch, it gets the latest timestamp)
sort.SliceStable(events, func(i, j int) bool {
x := events[i]
y := events[j]
return seen[x].Before(seen[y])
})
out <- events
seen = make(map[fileEvent]time.Time)
seen = make(map[string]fileEvent)
}

t := clock.NewTicker(delay)
Expand All @@ -386,7 +382,7 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
flushEvents()
return
}
seen[e] = time.Now()
seen[e.HostPath] = e
t.Reset(delay)
}
}
Expand Down Expand Up @@ -485,49 +481,10 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
pathMappings := make([]sync.PathMapping, len(batch))
restartService := false
for i := range batch {
if batch[i].Action == types.WatchActionRebuild {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
// restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName}
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)

if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
return err
}

if options.Prune {
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
}

options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))

err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName},
Inherit: true,
Recreate: api.RecreateForce,
})
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
return err
}

services := []string{serviceName}
p, err := project.WithSelectedServices(services)
if err != nil {
return err
}
err = s.start(ctx, project.Name, api.StartOptions{
Project: p,
Services: services,
AttachTo: services,
}, nil)
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
}
return nil
if batch[i].Trigger.Action == types.WatchActionRebuild {
return s.rebuild(ctx, project, serviceName, options)
}
if batch[i].Action == types.WatchActionSyncRestart {
if batch[i].Trigger.Action == types.WatchActionSyncRestart {
restartService = true
}
pathMappings[i] = batch[i].PathMapping
Expand All @@ -554,7 +511,75 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
options.LogTo.Log(
api.WatchLogger,
fmt.Sprintf("service %q restarted", serviceName))
}
eg, ctx := errgroup.WithContext(ctx)
for _, b := range batch {
if b.Trigger.Action == types.WatchActionSyncExec {
containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, serviceName)
if err != nil {
return err
}
x := b.Trigger.Exec
for _, c := range containers {
eg.Go(func() error {
exec := ccli.NewExecOptions()
exec.User = x.User
exec.Privileged = x.Privileged
exec.Command = x.Command
exec.Workdir = x.WorkingDir
for _, v := range x.Environment.ToMapping().Values() {
err := exec.Env.Set(v)
if err != nil {
return err
}
}
return ccli.RunExec(ctx, s.dockerCli, c.ID, exec)
})
}
}
}
return eg.Wait()
}

func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
// restrict the build to ONLY this service, not any of its dependencies
options.Build.Services = []string{serviceName}
imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)

if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
return err
}

if options.Prune {
s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
}

options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))

err = s.create(ctx, project, api.CreateOptions{
Services: []string{serviceName},
Inherit: true,
Recreate: api.RecreateForce,
})
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
return err
}

services := []string{serviceName}
p, err := project.WithSelectedServices(services)
if err != nil {
return err
}
err = s.start(ctx, project.Name, api.StartOptions{
Project: p,
Services: services,
AttachTo: services,
}, nil)
if err != nil {
options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
}
return nil
}
Expand Down
23 changes: 17 additions & 6 deletions pkg/compose/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"os"
"slices"
"strings"
"testing"
"time"

Expand All @@ -42,23 +44,32 @@ func TestDebounceBatching(t *testing.T) {
ctx, stop := context.WithCancel(context.Background())
t.Cleanup(stop)

trigger := types.Trigger{
Path: "/",
}
matcher := watch.EmptyMatcher{}
eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch)
for i := 0; i < 100; i++ {
var action types.WatchAction = "a"
var path = "/a"
if i%2 == 0 {
action = "b"
path = "/b"
}
ch <- fileEvent{Action: action}

event := maybeFileEvent(trigger, path, matcher)
require.NotNil(t, event)
ch <- *event
}
// we sent 100 events + the debouncer
clock.BlockUntil(101)
clock.Advance(quietPeriod)
select {
case batch := <-eventBatchCh:
require.ElementsMatch(t, batch, []fileEvent{
{Action: "a"},
{Action: "b"},
slices.SortFunc(batch, func(a, b fileEvent) int {
return strings.Compare(a.HostPath, b.HostPath)
})
assert.Equal(t, len(batch), 2)
assert.Equal(t, batch[0].HostPath, "/a")
assert.Equal(t, batch[1].HostPath, "/b")
case <-time.After(50 * time.Millisecond):
t.Fatal("timed out waiting for events")
}
Expand Down
14 changes: 14 additions & 0 deletions pkg/e2e/fixtures/watch/exec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
test:
build:
dockerfile_inline: FROM alpine
command: ping localhost
volumes:
- /data
develop:
watch:
- path: .
target: /data
action: sync+exec
exec:
command: echo "SUCCESS"
40 changes: 40 additions & 0 deletions pkg/e2e/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package e2e

import (
"bytes"
"crypto/rand"
"fmt"
"os"
Expand Down Expand Up @@ -293,3 +294,42 @@ func doTest(t *testing.T, svcName string) {

testComplete.Store(true)
}

func TestWatchExec(t *testing.T) {
cli := NewCLI(t)
const projectName = "test_watch_exec"

t.Cleanup(func() {
cli.RunDockerComposeCmd(t, "-p", projectName, "down")
})

tmpdir := t.TempDir()
composeFilePath := filepath.Join(tmpdir, "compose.yaml")
CopyFile(t, filepath.Join("fixtures", "watch", "exec.yaml"), composeFilePath)
cmd := cli.NewDockerComposeCmd(t, "-p", projectName, "-f", composeFilePath, "up", "--watch")
buffer := bytes.NewBuffer(nil)
cmd.Stdout = buffer
watch := icmd.StartCmd(cmd)

poll.WaitOn(t, func(l poll.LogT) poll.Result {
out := buffer.String()
if strings.Contains(out, "64 bytes from") {
return poll.Success()
}
return poll.Continue("%v", watch.Stdout())
})

t.Logf("Create new file")

testFile := filepath.Join(tmpdir, "test")
require.NoError(t, os.WriteFile(testFile, []byte("test\n"), 0o600))

poll.WaitOn(t, func(l poll.LogT) poll.Result {
out := buffer.String()
if strings.Contains(out, "SUCCESS") {
return poll.Success()
}
return poll.Continue("%v", out)
})
cli.RunDockerComposeCmdNoCheck(t, "-p", projectName, "kill", "-s", "9")
}
Loading