Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Resumable job execution prototype #553

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
## [Unreleased]

### Added
- backend/gce: first edition of worker-agent (building block for resumable job execution)

### Changed
- ratelimit: trace redis connection pool checkout
Expand Down
2 changes: 2 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ var (
)

var ErrDownloadTraceNotImplemented = errors.New("DownloadTrace not implemented")
var ErrInstallAgentNotImplemented = errors.New("InstallAgent not implemented")
var ErrInstanceIPNotImplemented = errors.New("Instance.IP not implemented")

// Backend wraps up an alias, backend provider help, and a factory func for a
// given backend provider wheee
Expand Down
8 changes: 8 additions & 0 deletions backend/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,10 @@ func (i *dockerInstance) uploadScriptSCP(ctx gocontext.Context, script []byte) e
return nil
}

func (i *dockerInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *dockerInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
if i.runNative {
return i.runScriptExec(ctx, output)
Expand Down Expand Up @@ -851,6 +855,10 @@ func (i *dockerInstance) ID() string {
return i.container.ID[0:7]
}

func (i *dockerInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *dockerInstance) ImageName() string {
return i.imageName
}
Expand Down
8 changes: 8 additions & 0 deletions backend/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ func (i *fakeInstance) UploadScript(ctx context.Context, script []byte) error {
return nil
}

func (i *fakeInstance) InstallAgent(ctx context.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *fakeInstance) RunScript(ctx context.Context, writer io.Writer) (*RunResult, error) {
if i.p.cfg.Get("ERROR") == "true" {
return &RunResult{Completed: false}, errors.New("fake provider is configured to error all jobs")
Expand Down Expand Up @@ -102,6 +106,10 @@ func (i *fakeInstance) ID() string {
return "fake"
}

func (i *fakeInstance) IP(ctx context.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *fakeInstance) ImageName() string {
return "fake"
}
Expand Down
60 changes: 60 additions & 0 deletions backend/gce.go
Original file line number Diff line number Diff line change
Expand Up @@ -1756,6 +1756,62 @@ func (i *gceInstance) isPreempted(ctx gocontext.Context) (bool, error) {
return preempted, err
}

func (i *gceInstance) InstallAgent(ctx gocontext.Context) error {
var conn remote.Remoter
var err error

// TODO: windows support
if i.os == "windows" {
return errors.New("agent is not supported on windows yet")
}

agentBinaryFile := "/Users/bogdana/go/src/github.com/travis-ci/worker-agent/worker-agent-linux-amd64"

agentBinaryBytes, err := ioutil.ReadFile(agentBinaryFile)
if err != nil {
return errors.Wrap(err, "couldn't read local agent binary")
}

conn, err = i.sshConnection(ctx)
if err != nil {
return errors.Wrap(err, "couldn't connect to remote server for agent upload")
}
defer conn.Close()

uploadDest := "/tmp/worker-agent"

context.LoggerFromContext(ctx).WithFields(logrus.Fields{
"dest": uploadDest,
"self": "backend/gce_instance",
}).Debug("uploading agent")

existed, err := conn.UploadFile(uploadDest, agentBinaryBytes)
if existed {
i.progresser.Progress(&ProgressEntry{
Message: "existing script detected",
State: ProgressFailure,
Interrupts: true,
})
return ErrStaleVM
}
if err != nil {
return errors.Wrap(err, "couldn't upload agent")
}

err = conn.Chmod(uploadDest, 0755)
if err != nil {
return errors.Wrap(err, "couldn't chmod agent")
}

agentCommand := "nohup bash -c '/tmp/worker-agent &> /tmp/worker-agent.out &'"
_, err = conn.RunCommand(agentCommand, ioutil.Discard)
if err != nil {
return errors.Wrap(err, "couldn't run agent")
}

return nil
}

func (i *gceInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
var conn remote.Remoter
var err error
Expand Down Expand Up @@ -1919,6 +1975,10 @@ func (i *gceInstance) ID() string {
return i.instance.Name
}

func (i *gceInstance) IP(ctx gocontext.Context) (string, error) {
return i.getCachedIP(ctx)
}

func (i *gceInstance) ImageName() string {
return i.imageName
}
Expand Down
8 changes: 8 additions & 0 deletions backend/jupiterbrain.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ func (i *jupiterBrainInstance) UploadScript(ctx gocontext.Context, script []byte
return nil
}

func (i *jupiterBrainInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *jupiterBrainInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
conn, err := i.sshConnection()
if err != nil {
Expand Down Expand Up @@ -499,6 +503,10 @@ func (i *jupiterBrainInstance) ID() string {
return i.payload.ID
}

func (i *jupiterBrainInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *jupiterBrainInstance) ImageName() string {
if i.payload == nil {
return "{unidentified}"
Expand Down
8 changes: 8 additions & 0 deletions backend/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (i *localInstance) UploadScript(ctx gocontext.Context, script []byte) error
return err
}

func (i *localInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *localInstance) RunScript(ctx gocontext.Context, writer io.Writer) (*RunResult, error) {
if i.scriptPath == "" {
return &RunResult{Completed: false}, errNoScriptUploaded
Expand Down Expand Up @@ -138,6 +142,10 @@ func (i *localInstance) ID() string {
return fmt.Sprintf("local:%s", i.scriptPath)
}

func (i *localInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *localInstance) ImageName() string {
return ""
}
Expand Down
8 changes: 8 additions & 0 deletions backend/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,10 @@ func (i *osInstance) UploadScript(ctx gocontext.Context, script []byte) error {
return nil
}

func (i *osInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *osInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
conn, err := i.sshConnection()
if err != nil {
Expand Down Expand Up @@ -670,6 +674,10 @@ func (i *osInstance) ID() string {
return i.instance.ID
}

func (i *osInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *osInstance) ImageName() string {
return i.imageName
}
Expand Down
9 changes: 9 additions & 0 deletions backend/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ type Instance interface {
// method should not be called multiple times.
UploadScript(gocontext.Context, []byte) error

// InstallAgent uploads the worker-agent binary and starts
// it as a background process
InstallAgent(gocontext.Context) error

// RunScript runs the build script that was uploaded with the
// UploadScript method.
RunScript(gocontext.Context, io.Writer) (*RunResult, error)
Expand All @@ -98,6 +102,11 @@ type Instance interface {
// ID is used when identifying the instance in logs and such
ID() string

// IP is the ip or hostname of the instance
// TODO: think about a more general struct that includes
// the pod, project, maybe even region
IP(ctx gocontext.Context) (string, error)

// ImageName is the name of the image used to boot the instance
ImageName() string

Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,9 @@ var (
Usage: "sample rate for trace as an inverse fraction - for sample rate n, every nth event will be sampled",
Value: 1,
}),
NewConfigDef("AgentEnabled", &cli.BoolFlag{
Usage: "Experimental: Use worker-agent for resumable job execution",
}),
}

// Flags is the list of all CLI flags accepted by travis-worker
Expand Down Expand Up @@ -437,6 +440,7 @@ type Config struct {
StackdriverProjectID string `config:"stackdriver-project-id"`
OpencensusTracingEnabled bool `config:"opencensus-tracing-enabled"`
OpencensusSamplingRate int `config:"opencensus-sampling-rate"`
AgentEnabled bool `config:"agent-enabled"`

ProviderConfig *ProviderConfig
}
Expand Down
2 changes: 2 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) {
&stepCheckCancellation{},
&stepUploadScript{
uploadTimeout: p.config.ScriptUploadTimeout,
agentEnabled: p.config.AgentEnabled,
},
&stepCheckCancellation{},
&stepUpdateState{},
Expand All @@ -252,6 +253,7 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) {
logTimeout: logTimeout,
hardTimeout: buildJob.StartAttributes().HardTimeout,
skipShutdownOnLogTimeout: p.config.SkipShutdownOnLogTimeout,
agentEnabled: p.config.AgentEnabled,
},
&stepDownloadTrace{
persister: p.persister,
Expand Down
6 changes: 5 additions & 1 deletion remote/package.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package remote

import "io"
import (
"io"
"os"
)

type Remoter interface {
UploadFile(path string, data []byte) (bool, error)
DownloadFile(path string) ([]byte, error)
RunCommand(command string, output io.Writer) (int32, error)
Chmod(path string, mode os.FileMode) error
Close() error
}
16 changes: 16 additions & 0 deletions ssh/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Connection interface {
UploadFile(path string, data []byte) (bool, error)
DownloadFile(path string) ([]byte, error)
RunCommand(command string, output io.Writer) (int32, error)
Chmod(path string, mode os.FileMode) error
Close() error
}

Expand Down Expand Up @@ -193,6 +194,21 @@ func (c *sshConnection) RunCommand(command string, output io.Writer) (int32, err
}
}

func (c *sshConnection) Chmod(path string, mode os.FileMode) error {
sftp, err := sftp.NewClient(c.client)
if err != nil {
return errors.Wrap(err, "couldn't create SFTP client")
}
defer sftp.Close()

err = sftp.Chmod(path, mode)
if err != nil {
return errors.Wrap(err, "couldn't chmod file")
}

return nil
}

func (c *sshConnection) Close() error {
return c.client.Close()
}
Loading