diff --git a/run.go b/run.go index f2cb9988..5120e26a 100644 --- a/run.go +++ b/run.go @@ -9,6 +9,7 @@ import ( "os/signal" "runtime/debug" "syscall" + "time" "go.einride.tech/cloudrunner/cloudclient" "go.einride.tech/cloudrunner/cloudconfig" @@ -27,6 +28,14 @@ import ( "google.golang.org/grpc" ) +// gracefulShutdownMaxGracePeriod is the maximum time we wait for the service to finish calling its cancel function +// after a SIGTERM/SIGINT is sent to us. +// If user is using cloudrunner in a Kubernetes like environment, make sure to set `terminationGracePeriodSeconds` +// (default as 30 seconds) above this value to make sure Kubernetes can wait for enough time for graceful shutdown. +// More info see here: +// https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace +const gracefulShutdownMaxGracePeriod = time.Second * 10 + // runConfig configures the Run entrypoint from environment variables. type runConfig struct { // Runtime contains runtime config. @@ -49,8 +58,44 @@ type runConfig struct { // Run a service. // Configuration of the service is loaded from the environment. +// +// Example usage code can be like: +// +// err := cloudrunner.Run(func(ctx context.Context) error { +// grpcServer := cloudrunner.NewGRPCServer(ctx) +// return cloudrunner.ListenGRPC(ctx, grpcServer) +// }) func Run(fn func(context.Context) error, options ...Option) (err error) { - ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + fnWrapper := func(ctx context.Context, _ *Shutdown) error { + // Shutdown is not used + return fn(ctx) + } + return RunWithGracefulShutdown(fnWrapper, options...) +} + +// RunWithGracefulShutdown runs a service and provides a Shutdown where uer can use to register a +// cancel function that will be called when SIGTERM is received. +// Root context will be canceled after running the registered cancel function. +// If the registered cancel functions runs for time longer than gracefulShutdownMaxGracePeriod, Shutdown +// will move on to shut down root context and exist. +// +// Configuration of the service is loaded from the environment. +// +// Example usage code can be like: +// +// err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error { +// grpcServer := cloudrunner.NewGRPCServer(ctx) +// shutdown.RegisterCancelFunc(func() { +// grpcServer.Stop() +// // or clean up any other resources here +// }) +// return cloudrunner.ListenGRPC(ctx, grpcServer) +// }) +func RunWithGracefulShutdown( + fn func(ctx context.Context, shutdown *Shutdown) error, + options ...Option, +) (err error) { + ctx, cancel := context.WithCancel(context.Background()) defer cancel() usage := flag.Bool("help", false, "show help then exit") yamlServiceSpecificationFile := flag.String("config", "", "load environment from a YAML service specification") @@ -152,7 +197,61 @@ func Run(fn func(context.Context) error, options ...Option) (err error) { ) } }() - return fn(ctx) + + shutdown := &Shutdown{ + rootCtxCancel: cancel, + } + go shutdown.trapShutdownSignal(ctx, logger) + return fn(ctx, shutdown) +} + +// Shutdown is used for CloudRunner to gracefully shutdown. It makes sure its cancel is called before +// rootCtxCancel is called. +type Shutdown struct { + rootCtxCancel func() + cancel func() +} + +// RegisterCancelFunc can register a cancel function which will be called when SIGTERM is received, and it is called +// before Shutdown calling its rootCtxCancel() to cancel the root context. +func (s *Shutdown) RegisterCancelFunc(cancel func()) { + s.cancel = cancel +} + +// trapShutdownSignal blocks and waits for shutdown signal, if received, call s.cancel() then shutdown. +// +//nolint:lll +func (s *Shutdown) trapShutdownSignal(ctx context.Context, logger *zap.Logger) { + logger.Info("watching for termination signals") + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGTERM) + signal.Notify(sigChan, syscall.SIGINT) + + // block and wait for a shutdown signal + sig := <-sigChan + logger.Info("got signal:", zap.String("signal", sig.String())) + if s.cancel == nil { + logger.Info( + "cloudrunner.Shutdown is not used. Canceling root context directly. Call RunWithGracefulShutdown(...) to enable graceful shutdown if preferred.", + ) + s.rootCtxCancel() + return + } + + // initiate graceful shutdown by calling s.cancel() + logger.Info("graceful shutdown has begun") + gracefulPeriodCtx, gracefulPeriodCtxCancel := context.WithTimeout(ctx, gracefulShutdownMaxGracePeriod) + go func() { + s.cancel() + logger.Info("Shutdown.cancel() has finished, meaning we will shutdown cleanly") + gracefulPeriodCtxCancel() + }() + + // block and wait until s.cancel() finish or gracefulPeriodCtx timeout. + <-gracefulPeriodCtx.Done() + logger.Info("exiting by canceling root context due to shutdown signal") + + s.rootCtxCancel() } type runContext struct { diff --git a/run_test.go b/run_test.go index 7c34076d..0da08bff 100644 --- a/run_test.go +++ b/run_test.go @@ -2,28 +2,98 @@ package cloudrunner_test import ( "context" + "flag" "log" + "os" + "sync" + "syscall" + "testing" + "time" "go.einride.tech/cloudrunner" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" + "gotest.tools/v3/assert" ) -func ExampleRun_helloWorld() { - if err := cloudrunner.Run(func(ctx context.Context) error { +func Test_Run_helloWorld(t *testing.T) { + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + err := cloudrunner.Run(func(ctx context.Context) error { cloudrunner.Logger(ctx).Info("hello world") return nil - }); err != nil { - log.Fatal(err) - } + }) + + assert.NilError(t, err) } -func ExampleRun_gRPCServer() { - if err := cloudrunner.Run(func(ctx context.Context) error { +func Test_Run_gRPCServer(t *testing.T) { + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + err := cloudrunner.Run(func(ctx context.Context) error { grpcServer := cloudrunner.NewGRPCServer(ctx) healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + + // For shutdown gRPC server otherwise we get blocked on ListenGRPC + go func() { + time.Sleep(time.Second) + _ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + }() return cloudrunner.ListenGRPC(ctx, grpcServer) + }) + + assert.NilError(t, err) +} + +func Test_RunWithGracefulShutdown_gRPCServer(t *testing.T) { + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error { + grpcServer := cloudrunner.NewGRPCServer(ctx) + healthServer := health.NewServer() + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + shutdown.RegisterCancelFunc(func() { + grpcServer.Stop() + healthServer.Shutdown() + }) + + // For shutdown gRPC server otherwise we get blocked on ListenGRPC + go func() { + time.Sleep(time.Second) + _ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + }() + return cloudrunner.ListenGRPC(ctx, grpcServer) + }) + + assert.NilError(t, err) +} + +func Test_RunWithGracefulShutdown_helloWorld_ctx_cancel_should_before_clean_up_function_call(t *testing.T) { + flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError) + if err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error { + wg := sync.WaitGroup{} + wg.Add(1) + cleanup := func() { + var isRootContextDone bool + select { + case <-ctx.Done(): + isRootContextDone = true + default: + isRootContextDone = false + } + assert.Equal(t, isRootContextDone, false) + wg.Done() + } + + shutdown.RegisterCancelFunc(cleanup) + cloudrunner.Logger(ctx).Info("hello world") + + go func() { + // Simulating seeding a SIGTERM call. + time.Sleep(time.Second) + _ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + }() + + wg.Wait() + return nil }); err != nil { log.Fatal(err) }