Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for native Kubernetes executor #755

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# build stage
FROM golang:1.21-alpine AS build-env
FROM golang:1.21.4-alpine AS build-env
RUN apk add make git bash build-base
ENV GOPATH=/go
ENV PATH="/go/bin:${PATH}"
ADD ./ /go/src/github.com/ohsu-comp-bio/funnel
RUN cd /go/src/github.com/ohsu-comp-bio/funnel && make build

WORKDIR /go/src/github.com/ohsu-comp-bio/funnel
COPY go.* .
RUN go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build make build

# final stage
FROM alpine
Expand Down
10 changes: 7 additions & 3 deletions Dockerfile.dind
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# build stage
FROM golang:1.20-alpine AS build-env
FROM golang:1.21.4-alpine AS build-env
RUN apk add make git bash build-base
ENV GOPATH=/go
ENV PATH="/go/bin:${PATH}"
ADD ./ /go/src/github.com/ohsu-comp-bio/funnel
RUN cd /go/src/github.com/ohsu-comp-bio/funnel && make build

WORKDIR /go/src/github.com/ohsu-comp-bio/funnel
COPY go.* .
RUN go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build make build

# final stage
FROM docker:stable-dind
Expand Down
10 changes: 7 additions & 3 deletions Dockerfile.dind-rootless
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# build stage
FROM golang:1.20-alpine AS build-env
FROM golang:1.21.4-alpine AS build-env
RUN apk add make git bash build-base
ENV GOPATH=/go
ENV PATH="/go/bin:${PATH}"
ADD ./ /go/src/github.com/ohsu-comp-bio/funnel
RUN cd /go/src/github.com/ohsu-comp-bio/funnel && make build

WORKDIR /go/src/github.com/ohsu-comp-bio/funnel
COPY go.* .
RUN go mod download
COPY . .
RUN --mount=type=cache,target=/root/.cache/go-build make build

# final stage
FROM docker:stable-dind-rootless
Expand Down
2 changes: 2 additions & 0 deletions cmd/util/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ func computeFlags(flagConf *config.Config) *pflag.FlagSet {
f.Var(&flagConf.Scheduler.NodeDeadTimeout, "Scheduler.NodeDeadTimeout", "How long to wait before deleting a dead node from the DB")

// Kubernetes
f.StringVar(&flagConf.Kubernetes.Executor, "Kubernetes.Executor", flagConf.Kubernetes.Executor, "Executor to use for executing tasks (docker or kubernetes)")
f.StringVar(&flagConf.Kubernetes.ExecutorTemplateFile, "Kubernetes.ExecutorTemplateFile", flagConf.Kubernetes.ExecutorTemplateFile, "Path to executor job template file")
f.StringVar(&flagConf.Kubernetes.TemplateFile, "Kubernetes.TemplateFile", flagConf.Kubernetes.TemplateFile, "Path to job template file")
f.StringVar(&flagConf.Kubernetes.Namespace, "Kubernetes.Namespace", flagConf.Kubernetes.Namespace, "Namespace to spawn jobs within")
f.StringVar(&flagConf.Kubernetes.ConfigFile, "Kubernetes.ConfigFile", flagConf.Kubernetes.ConfigFile, "Path to kubernetes config file")
Expand Down
29 changes: 25 additions & 4 deletions cmd/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"context"
"fmt"
"io/ioutil"
"strings"

"github.com/ohsu-comp-bio/funnel/config"
Expand Down Expand Up @@ -70,11 +71,31 @@ func NewWorker(ctx context.Context, conf config.Config, log *logger.Logger, opts
}
store.AttachLogger(log)

if conf.Kubernetes.ExecutorTemplateFile != "" {
content, err := ioutil.ReadFile(conf.Kubernetes.ExecutorTemplateFile)
if err != nil {
return nil, fmt.Errorf("reading template: %v", err)
}
conf.Kubernetes.ExecutorTemplate = string(content)
}

// The executor always defaults to docker, unless explicitly set to kubernetes.
var executor = worker.Executor{
Backend: "docker",
}

if conf.Kubernetes.Executor == "kubernetes" {
executor.Backend = "kubernetes"
executor.Template = conf.Kubernetes.ExecutorTemplate
executor.Namespace = conf.Kubernetes.Namespace
}

return &worker.DefaultWorker{
Conf: conf.Worker,
Store: store,
TaskReader: reader,
EventWriter: writer,
Executor: executor,
Conf: conf.Worker,
Store: store,
TaskReader: reader,
EventWriter: writer,
}, nil
}

Expand Down
8 changes: 3 additions & 5 deletions compute/kubernetes/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,7 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error {
if err != nil {
return fmt.Errorf("creating job spec: %v", err)
}
_, err = b.client.Create(ctx, job, metav1.CreateOptions{
FieldManager: task.Id,
})
_, err = b.client.Create(context.TODO(), job, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("creating job: %v", err)
}
Expand All @@ -166,7 +164,7 @@ func (b *Backend) Submit(ctx context.Context, task *tes.Task) error {
func (b *Backend) deleteJob(ctx context.Context, taskID string) error {
var gracePeriod int64 = 0
var prop metav1.DeletionPropagation = metav1.DeletePropagationForeground
err := b.client.Delete(ctx, taskID, metav1.DeleteOptions{
err := b.client.Delete(context.TODO(), taskID, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
PropagationPolicy: &prop,
})
Expand Down Expand Up @@ -218,7 +216,7 @@ ReconcileLoop:
case <-ctx.Done():
return
case <-ticker.C:
jobs, err := b.client.List(ctx, metav1.ListOptions{})
jobs, err := b.client.List(context.TODO() ,metav1.ListOptions{})
if err != nil {
b.log.Error("reconcile: listing jobs", err)
continue ReconcileLoop
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ func (h FTPStorage) Valid() bool {

// Kubernetes describes the configuration for the Kubernetes compute backend.
type Kubernetes struct {
// The executor used to execute tasks. Available executors: docker, kubernetes
Executor string
// Turn off task state reconciler. When enabled, Funnel communicates with Kuberenetes
// to find tasks that are stuck in a queued state or errored and updates the task state
// accordingly.
Expand All @@ -395,6 +397,10 @@ type Kubernetes struct {
Template string
// TemplateFile is the path to the job template.
TemplateFile string
// Job template used for executing the tasks.
ExecutorTemplate string
// ExecutorTemplateFile is the path to the executor template.
ExecutorTemplateFile string
// Path to the Kubernetes configuration file, otherwise assumes the Funnel server is running in a pod and
// attempts to use https://godoc.org/k8s.io/client-go/rest#InClusterConfig to infer configuration.
ConfigFile string
Expand Down
23 changes: 16 additions & 7 deletions config/default-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Compute: local

# The name of the active event writer backend(s).
# Available backends: log, boltdb, badger, datastore, dynamodb, elastic, mongodb, kafka
EventWriters:
EventWriters:
- boltdb
- log

Expand Down Expand Up @@ -42,7 +42,7 @@ Server:
DisableHTTPCache: true

RPCClient:
# RPC server address
# RPC server address
ServerAddress: localhost:9090

# Credentials for Basic authentication for the server APIs using a password.
Expand All @@ -59,7 +59,7 @@ RPCClient:
# up to 1 minute
MaxRetries: 10

# The scheduler is used for the Manual compute backend.
# The scheduler is used for the Manual compute backend.
Scheduler:
# How often to run a scheduler iteration.
ScheduleRate: 1s
Expand All @@ -78,7 +78,7 @@ Node:
# -1 means there is no timeout. 0 means timeout immediately after the first task.
Timeout: -1s

# A Node will automatically try to detect what resources are available to it.
# A Node will automatically try to detect what resources are available to it.
# Defining Resources in the Node configuration overrides this behavior.
Resources:
# CPUs available.
Expand Down Expand Up @@ -149,7 +149,7 @@ Datastore:
# Optional. If possible, credentials will be automatically discovered
# from the environment.
CredentialsFile: ""

MongoDB:
# Addrs holds the addresses for the seed servers.
Addrs:
Expand Down Expand Up @@ -298,6 +298,8 @@ AWSBatch:

# Kubernetes describes the configuration for the Kubernetes compute backend.
Kubernetes:
# The executor used to execute tasks. Available executors: docker, kubernetes
Executor: "docker"
# Turn off task state reconciler. When enabled, Funnel communicates with Kubernetes
# to find tasks that are stuck in a queued state or errored and
# updates the task state accordingly.
Expand All @@ -307,11 +309,18 @@ Kubernetes:
ReconcileRate: 10m
# Kubernetes Namespace to spawn jobs within
Namespace: ""
# Batch job template. See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#job-v1-batch
# Master batch job template. See: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#job-v1-batch
Template: ""
# TemplateFile is the path to the job template.
# TemplateFile is the path to the master job template.
TemplateFile: ""

# Configuration of the Kubernetes executor.

# Job template used for executing the tasks.
ExecutorTemplate: ""
# ExecutorTemplateFile is the path to the executor template.
ExecutorTemplateFile: ""

#-------------------------------------------------------------------------------
# Storage
#-------------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func DefaultConfig() Config {
c.AWSBatch.ReconcileRate = reconcile
c.AWSBatch.DisableReconciler = true

kubernetesTemplate := intern.MustAsset("config/kubernetes-template.yaml")
executorTemplate := intern.MustAsset("config/kubernetes-executor-template.yaml")
c.Kubernetes.Executor = "docker"
c.Kubernetes.Template = string(kubernetesTemplate)
c.Kubernetes.ExecutorTemplate = string(executorTemplate)
c.Kubernetes.ReconcileRate = reconcile

return c
Expand Down
Loading