Skip to content

Commit

Permalink
feat(run): provides RunWithGracefulShutdownHook
Browse files Browse the repository at this point in the history
So to allow user easily wire up a clean up function
that can be called BEFORE the root context is done.
  • Loading branch information
liufuyang committed Nov 16, 2023
1 parent 17365f7 commit f8a9f4a
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 9 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/soheilhy/cmux v0.1.5
github.com/stretchr/testify v1.8.4
go.einride.tech/protobuf-sensitive v0.5.0
go.opencensus.io v0.24.0
go.opentelemetry.io/contrib/detectors/gcp v1.21.0
Expand Down Expand Up @@ -44,6 +45,7 @@ require (
cloud.google.com/go/trace v1.10.3 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.20.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.44.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -55,6 +57,7 @@ require (
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/shirou/gopsutil/v3 v3.23.10 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
Expand Down
100 changes: 98 additions & 2 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"runtime/debug"
"syscall"
"time"

"go.einride.tech/cloudrunner/cloudclient"
"go.einride.tech/cloudrunner/cloudconfig"
Expand All @@ -27,6 +28,14 @@ import (
"google.golang.org/grpc"
)

// gracefulShutdownMaxGracePeriod is the maximum time we wait for the service to finish calling its cancel function
// after a SIGTERM/SIGINT is sent to us.
// If user is using cloudrunner in a Kubernetes like environment, make sure to set `terminationGracePeriodSeconds`
// (default as 30 seconds) above this value to make sure Kubernetes can wait for enough time for graceful shutdown.
// More info see here:
// https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace
const gracefulShutdownMaxGracePeriod = time.Second * 10

// runConfig configures the Run entrypoint from environment variables.
type runConfig struct {
// Runtime contains runtime config.
Expand All @@ -49,8 +58,41 @@ type runConfig struct {

// Run a service.
// Configuration of the service is loaded from the environment.
//
// Example usage code can be like:
//
// err := cloudrunner.Run(func(ctx context.Context) error {
// grpcServer := cloudrunner.NewGRPCServer(ctx)
// return cloudrunner.ListenGRPC(ctx, grpcServer)
// })
func Run(fn func(context.Context) error, options ...Option) (err error) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
noShutdownHook := func(ctx context.Context, _ *ShutdownHook) error {
return fn(ctx)
}
return RunWithGracefulShutdownHook(noShutdownHook, options...)
}

// RunWithGracefulShutdownHook runs a service and provides a hook ShutdownHook where uer can call to register a
// cancel function that will be called before canceling the root context.
// Root context will be canceled if the registered cancel functions runs for time longer than
// gracefulShutdownMaxGracePeriod.
// Configuration of the service is loaded from the environment.
//
// Example usage code can be like:
//
// err := cloudrunner.RunWithGracefulShutdownHook(func(ctx context.Context, hook *cloudrunner.ShutdownHook) error {
// grpcServer := cloudrunner.NewGRPCServer(ctx)
// hook.HookCancelFunc(func() {
// grpcServer.Stop()
// // or clean up any other resources here
// })
// return cloudrunner.ListenGRPC(ctx, grpcServer)
// })
func RunWithGracefulShutdownHook(
fn func(ctx context.Context, shutdownHook *ShutdownHook) error,
options ...Option,
) (err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
usage := flag.Bool("help", false, "show help then exit")
yamlServiceSpecificationFile := flag.String("config", "", "load environment from a YAML service specification")
Expand Down Expand Up @@ -152,7 +194,61 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
)
}
}()
return fn(ctx)

hook := &ShutdownHook{
rootCtxCancelFunc: cancel,
}
go hook.trapShutdownSignal(ctx, logger)
return fn(ctx, hook)
}

// ShutdownHook is used for CloudRunner to gracefully shutdown. It makes sure shutdownFunc is called before
// rootCtxCancelFunc is called.
type ShutdownHook struct {
rootCtxCancelFunc func()
shutdownFunc func()
}

// HookCancelFunc can wire up a cancel function which will be called when SIGTERM is received, and before the root
// context is canceled.
func (s *ShutdownHook) HookCancelFunc(cancel func()) {
s.shutdownFunc = cancel
}

// trapShutdownSignal blocks and waits for shutdown signal, if received, call s.shutdownFunc() then shutdown.
//
//nolint:lll
func (s *ShutdownHook) trapShutdownSignal(ctx context.Context, logger *zap.Logger) {
logger.Info("watching for termination signals")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)

// block and wait for a shutdown signal
sig := <-sigChan
logger.Info("got signal:", zap.String("signal", sig.String()))
if s.shutdownFunc == nil {
logger.Info(
"ShutdownHook is not used. Canceling root context directly. Call RunWithGracefulShutdownHook(...) to enable graceful shutdown if preferred.",
)
s.rootCtxCancelFunc()
return
}

// initiate graceful shutdown by calling s.shutdownFunc()
logger.Info("graceful shutdown has begun")
gracefulPeriodCtx, gracefulPeriodCtxCancel := context.WithTimeout(ctx, gracefulShutdownMaxGracePeriod)
go func() {
s.shutdownFunc()
logger.Info("ShutdownHook.shutdownFunc() has finished, meaning we will shutdown cleanly")
gracefulPeriodCtxCancel()
}()

// block and wait until s.shutdownFunc() finish or gracefulPeriodCtx timeout.
<-gracefulPeriodCtx.Done()
logger.Info("exiting by canceling root context due to shutdown signal")

s.rootCtxCancelFunc()
}

type runContext struct {
Expand Down
84 changes: 77 additions & 7 deletions run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,98 @@ package cloudrunner_test

import (
"context"
"flag"
"log"
"os"
"sync"
"syscall"
"testing"
"time"

"go.einride.tech/cloudrunner"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"gotest.tools/v3/assert"
)

func ExampleRun_helloWorld() {
if err := cloudrunner.Run(func(ctx context.Context) error {
func Test_Run_helloWorld(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.Run(func(ctx context.Context) error {
cloudrunner.Logger(ctx).Info("hello world")
return nil
}); err != nil {
log.Fatal(err)
}
})

assert.NilError(t, err)
}

func ExampleRun_gRPCServer() {
if err := cloudrunner.Run(func(ctx context.Context) error {
func Test_Run_gRPCServer(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.Run(func(ctx context.Context) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)

// For shutdown gRPC server otherwise we get blocked on ListenGRPC
go func() {
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
return cloudrunner.ListenGRPC(ctx, grpcServer)
})

assert.NilError(t, err)
}

func Test_RunWithGracefulShutdownHook_gRPCServer(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.RunWithGracefulShutdownHook(func(ctx context.Context, hook *cloudrunner.ShutdownHook) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
hook.HookCancelFunc(func() {
grpcServer.Stop()
healthServer.Shutdown()
})

// For shutdown gRPC server otherwise we get blocked on ListenGRPC
go func() {
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
return cloudrunner.ListenGRPC(ctx, grpcServer)
})

assert.NilError(t, err)
}

func Test_RunWithGracefulShutdownHook_helloWorld_ctx_cancel_should_before_clean_up_function_call(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
if err := cloudrunner.RunWithGracefulShutdownHook(func(ctx context.Context, hook *cloudrunner.ShutdownHook) error {
wg := sync.WaitGroup{}
wg.Add(1)
cleanup := func() {
var isRootContextDone bool
select {
case <-ctx.Done():
isRootContextDone = true
default:
isRootContextDone = false
}
assert.Equal(t, isRootContextDone, false)
wg.Done()
}

hook.HookCancelFunc(cleanup)
cloudrunner.Logger(ctx).Info("hello world")

go func() {
// Simulating seeding a SIGTERM call.
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

wg.Wait()
return nil
}); err != nil {
log.Fatal(err)
}
Expand Down

0 comments on commit f8a9f4a

Please sign in to comment.