Skip to content

Commit

Permalink
feat(run): provides RunWithGracefulShutdown
Browse files Browse the repository at this point in the history
So to allow user easily wire up a clean up function
that can be called BEFORE the root context is done.
  • Loading branch information
liufuyang committed Nov 19, 2023
1 parent 17365f7 commit a6d33d6
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 9 deletions.
103 changes: 101 additions & 2 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"runtime/debug"
"syscall"
"time"

"go.einride.tech/cloudrunner/cloudclient"
"go.einride.tech/cloudrunner/cloudconfig"
Expand All @@ -27,6 +28,14 @@ import (
"google.golang.org/grpc"
)

// gracefulShutdownMaxGracePeriod is the maximum time we wait for the service to finish calling its cancel function
// after a SIGTERM/SIGINT is sent to us.
// If user is using cloudrunner in a Kubernetes like environment, make sure to set `terminationGracePeriodSeconds`
// (default as 30 seconds) above this value to make sure Kubernetes can wait for enough time for graceful shutdown.
// More info see here:
// https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace
const gracefulShutdownMaxGracePeriod = time.Second * 10

// runConfig configures the Run entrypoint from environment variables.
type runConfig struct {
// Runtime contains runtime config.
Expand All @@ -49,8 +58,44 @@ type runConfig struct {

// Run a service.
// Configuration of the service is loaded from the environment.
//
// Example usage code can be like:
//
// err := cloudrunner.Run(func(ctx context.Context) error {
// grpcServer := cloudrunner.NewGRPCServer(ctx)
// return cloudrunner.ListenGRPC(ctx, grpcServer)
// })
func Run(fn func(context.Context) error, options ...Option) (err error) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
fnWrapper := func(ctx context.Context, _ *Shutdown) error {
// Shutdown is not used
return fn(ctx)
}
return RunWithGracefulShutdown(fnWrapper, options...)
}

// RunWithGracefulShutdown runs a service and provides a Shutdown where uer can use to register a
// cancel function that will be called when SIGTERM is received.
// Root context will be canceled after running the registered cancel function.
// If the registered cancel functions runs for time longer than gracefulShutdownMaxGracePeriod, Shutdown
// will move on to shut down root context and exist.
//
// Configuration of the service is loaded from the environment.
//
// Example usage code can be like:
//
// err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error {
// grpcServer := cloudrunner.NewGRPCServer(ctx)
// shutdown.RegisterCancelFunc(func() {
// grpcServer.Stop()
// // or clean up any other resources here
// })
// return cloudrunner.ListenGRPC(ctx, grpcServer)
// })
func RunWithGracefulShutdown(
fn func(ctx context.Context, shutdown *Shutdown) error,
options ...Option,
) (err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
usage := flag.Bool("help", false, "show help then exit")
yamlServiceSpecificationFile := flag.String("config", "", "load environment from a YAML service specification")
Expand Down Expand Up @@ -152,7 +197,61 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
)
}
}()
return fn(ctx)

shutdown := &Shutdown{
rootCtxCancel: cancel,
}
go shutdown.trapShutdownSignal(ctx, logger)
return fn(ctx, shutdown)
}

// Shutdown is used for CloudRunner to gracefully shutdown. It makes sure its cancel is called before
// rootCtxCancel is called.
type Shutdown struct {
rootCtxCancel func()
cancel func()
}

// RegisterCancelFunc can register a cancel function which will be called when SIGTERM is received, and it is called
// before Shutdown calling its rootCtxCancel() to cancel the root context.
func (s *Shutdown) RegisterCancelFunc(cancel func()) {
s.cancel = cancel
}

// trapShutdownSignal blocks and waits for shutdown signal, if received, call s.cancel() then shutdown.
//
//nolint:lll
func (s *Shutdown) trapShutdownSignal(ctx context.Context, logger *zap.Logger) {
logger.Info("watching for termination signals")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)

// block and wait for a shutdown signal
sig := <-sigChan
logger.Info("got signal:", zap.String("signal", sig.String()))
if s.cancel == nil {
logger.Info(
"cloudrunner.Shutdown is not used. Canceling root context directly. Call RunWithGracefulShutdown(...) to enable graceful shutdown if preferred.",
)
s.rootCtxCancel()
return
}

// initiate graceful shutdown by calling s.cancel()
logger.Info("graceful shutdown has begun")
gracefulPeriodCtx, gracefulPeriodCtxCancel := context.WithTimeout(ctx, gracefulShutdownMaxGracePeriod)
go func() {
s.cancel()
logger.Info("Shutdown.cancel() has finished, meaning we will shutdown cleanly")
gracefulPeriodCtxCancel()
}()

// block and wait until s.cancel() finish or gracefulPeriodCtx timeout.
<-gracefulPeriodCtx.Done()
logger.Info("exiting by canceling root context due to shutdown signal")

s.rootCtxCancel()
}

type runContext struct {
Expand Down
84 changes: 77 additions & 7 deletions run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,98 @@ package cloudrunner_test

import (
"context"
"flag"
"log"
"os"
"sync"
"syscall"
"testing"
"time"

"go.einride.tech/cloudrunner"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"gotest.tools/v3/assert"
)

func ExampleRun_helloWorld() {
if err := cloudrunner.Run(func(ctx context.Context) error {
func Test_Run_helloWorld(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.Run(func(ctx context.Context) error {
cloudrunner.Logger(ctx).Info("hello world")
return nil
}); err != nil {
log.Fatal(err)
}
})

assert.NilError(t, err)
}

func ExampleRun_gRPCServer() {
if err := cloudrunner.Run(func(ctx context.Context) error {
func Test_Run_gRPCServer(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.Run(func(ctx context.Context) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)

// For shutdown gRPC server otherwise we get blocked on ListenGRPC
go func() {
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
return cloudrunner.ListenGRPC(ctx, grpcServer)
})

assert.NilError(t, err)
}

func Test_RunWithGracefulShutdown_gRPCServer(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
shutdown.RegisterCancelFunc(func() {
grpcServer.Stop()
healthServer.Shutdown()
})

// For shutdown gRPC server otherwise we get blocked on ListenGRPC
go func() {
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
return cloudrunner.ListenGRPC(ctx, grpcServer)
})

assert.NilError(t, err)
}

func Test_RunWithGracefulShutdown_helloWorld_ctx_cancel_should_before_clean_up_function_call(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
if err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error {
wg := sync.WaitGroup{}
wg.Add(1)
cleanup := func() {
var isRootContextDone bool
select {
case <-ctx.Done():
isRootContextDone = true
default:
isRootContextDone = false
}
assert.Equal(t, isRootContextDone, false)
wg.Done()
}

shutdown.RegisterCancelFunc(cleanup)
cloudrunner.Logger(ctx).Info("hello world")

go func() {
// Simulating seeding a SIGTERM call.
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

wg.Wait()
return nil
}); err != nil {
log.Fatal(err)
}
Expand Down

0 comments on commit a6d33d6

Please sign in to comment.