Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add initial Exacloud support #958

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/server/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func NewServer(ctx context.Context, conf config.Config, log *logger.Logger) (*Se
Compute: compute,
Read: reader,
Log: log,
HostName: conf.Server.HostName,
},
Events: &events.Service{Writer: writer},
Nodes: nodes,
Expand Down
12 changes: 6 additions & 6 deletions compute/hpc_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type HPCBackend struct {
func (b *HPCBackend) WriteEvent(ctx context.Context, ev *events.Event) error {
switch ev.Type {
case events.Type_TASK_CREATED:
return b.Submit(ev.GetTask())
return b.Submit(ctx, ev.GetTask())

case events.Type_TASK_STATE:
if ev.GetState() == tes.State_CANCELED {
Expand All @@ -61,10 +61,8 @@ func (b *HPCBackend) WriteEvent(ctx context.Context, ev *events.Event) error {
func (b *HPCBackend) Close() {}

// Submit submits a task via "qsub", "condor_submit", "sbatch", etc.
func (b *HPCBackend) Submit(task *tes.Task) error {
ctx := context.Background()

submitPath, err := b.setupTemplatedHPCSubmit(task)
func (b *HPCBackend) Submit(ctx context.Context, task *tes.Task) error {
submitPath, err := b.setupTemplatedHPCSubmit(ctx, task)
if err != nil {
return err
}
Expand Down Expand Up @@ -229,7 +227,7 @@ ReconcileLoop:
// setupTemplatedHPCSubmit sets up a task submission in a HPC environment with
// a shared file system. It generates a submission file based on a template for
// schedulers such as SLURM, HTCondor, SGE, PBS/Torque, etc.
func (b *HPCBackend) setupTemplatedHPCSubmit(task *tes.Task) (string, error) {
func (b *HPCBackend) setupTemplatedHPCSubmit(ctx context.Context, task *tes.Task) (string, error) {
var err error

// TODO document that these working dirs need manual cleanup
Expand Down Expand Up @@ -264,13 +262,15 @@ func (b *HPCBackend) setupTemplatedHPCSubmit(task *tes.Task) (string, error) {
zone = zones[0]
}

args := fmt.Sprintf("--Server.HostName %v", ctx.Value("HostName"))
err = submitTpl.Execute(f, map[string]interface{}{
"TaskId": task.Id,
"WorkDir": workdir,
"Cpus": res.GetCpuCores(),
"RamGb": res.GetRamGb(),
"DiskGb": res.GetDiskGb(),
"Zone": zone,
"Args": args,
})
if err != nil {
return "", err
Expand Down
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ type Worker struct {
LeaveWorkDir bool
// Limit the number of concurrent downloads/uploads
MaxParallelTransfers int
// Container engine to use for executing tasks.
// Typically this is "docker".
Engine string
}

// HPCBackend describes the configuration for a HPC scheduler backend such as
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func DefaultConfig() Config {
LogUpdateRate: Duration(time.Second * 5),
LogTailSize: 10000,
MaxParallelTransfers: 10,
Engine: "docker",
},
Logger: logger.DefaultConfig(),
// databases / event handlers
Expand Down
40 changes: 40 additions & 0 deletions examples/exacloud.config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
LocalStorage:
# Whitelist of local directory paths which Funnel is allowed to access.
AllowedDirs:
- ./
- /tmp

Worker:
LeaveWorkDir: true
Engine: sudo /opt/acc/sbin/exadocker

Server:
HostName: exahead1

Compute: slurm

Slurm:
Template: |
#!/bin/bash
#SBATCH --job-name {{.TaskId}}
#SBATCH --ntasks 1
#SBATCH --error {{.WorkDir}}/funnel-stderr
#SBATCH --output {{.WorkDir}}/funnel-stdout
#SBATCH --gres disk:1
{{if ne .Cpus 0 -}}
{{printf "#SBATCH --cpus-per-task %d" .Cpus}}
{{- end}}
{{if ne .RamGb 0.0 -}}
{{printf "#SBATCH --mem %.0fGB" .RamGb}}
{{- end}}
{{if ne .DiskGb 0.0 -}}
{{printf "#SBATCH --tmp %.0fGB" .DiskGb}}
{{- end}}

srun /usr/local/bin/mkdir-scratch.sh
SCRATCH_PATH="/mnt/scratch/${SLURM_JOB_ID}"
cd $SCRATCH_PATH
srun bash -c 'funnel worker run --taskID {{.TaskId}} {{.Args}}' --temp-dir $SCRATCH_PATH
cd -
srun /usr/local/bin/rmdir-scratch.sh

2 changes: 2 additions & 0 deletions server/tes.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type TaskService struct {
Compute events.Computer
Read tes.ReadOnlyServer
Log *logger.Logger
HostName string
}

// CreateTask provides an HTTP/gRPC endpoint for creating a task.
Expand All @@ -44,6 +45,7 @@ func (ts *TaskService) CreateTask(ctx context.Context, task *tes.Task) (*tes.Cre
return nil, fmt.Errorf("error from backend: %s", err)
}

ctx = context.WithValue(ctx, "HostName", ts.HostName)
if err := ts.Event.WriteEvent(ctx, events.NewTaskCreated(task)); err != nil {
return nil, fmt.Errorf("error creating task: %s", err)
}
Expand Down
2 changes: 0 additions & 2 deletions webdash/src/Pages.js
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,8 @@ function ServiceInfo() {

React.useEffect(() => {
var url = new URL("/v1/service-info", window.location.origin);
console.log("DEBUG: ServiceInfo url:", url);
get(url).then(
(info) => {
console.log("DEBUG: ServiceInfo info:", info);
setInfo(info);
});
}, []);
Expand Down
15 changes: 15 additions & 0 deletions website/content/docs/interop/exacloud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
title: Exacloud
menu:
main:
parent: Interop
---

> ⚠️ Exacloud support is currently in development and requires a few additional steps to run which are included below.

# Exacloud

## Getting Started

## Additional Resources

15 changes: 0 additions & 15 deletions worker/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,9 @@ func (dcmd DockerCommand) Run(ctx context.Context) error {
}

args = append(args, dcmd.Image)
// if dcmd.Command contains the ["/bin/bash", "-ue"] string, replace it with "/bin/bash -ue"
c := strings.Join(dcmd.Command, " ")
if strings.Contains(c, "[/bin/bash, -ue]") {
dcmd.Command = transformCommandSlice(dcmd.Command)
}
args = append(args, dcmd.Command...)

// Roughly: `docker run --rm -i --read-only -w [workdir] -v [bindings] [imageName] [cmd]`
err = dcmd.Event.Info(fmt.Sprintf("args: %s", args))
dcmd.Event.Info("Running command", "cmd", "docker "+strings.Join(args, " "))
cmd := exec.Command("docker", args...)

Expand Down Expand Up @@ -112,15 +106,6 @@ func formatVolumeArg(v Volume) string {
return fmt.Sprintf("%s:%s:%s", v.HostPath, v.ContainerPath, mode)
}

func transformCommandSlice(cmd []string) []string {
if len(cmd) == 0 {
return cmd // Handle empty slice
}

cmd[2] = "/bin/bash -ue .command.run &> .command.log"
return cmd
}

type metadata struct {
ID string
Name string
Expand Down
27 changes: 27 additions & 0 deletions worker/engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package worker

import "context"

type Engine interface {
Pull(ctx context.Context, image string) error

Run(ctx context.Context, config ContainerConfig) error

Stop(ctx context.Context, containerID string) error

Inspect(ctx context.Context, containerID string) (ContainerInfo, error)
}

type ContainerConfig struct {
Image string
Command []string
Args []string
EnvVars map[string]string
Volumes []Volume
}

type ContainerInfo struct {
ID string
Name string
Image string
}
Loading