Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(run): provides RunWithGracefulShutdown #564

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 101 additions & 2 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os/signal"
"runtime/debug"
"syscall"
"time"

"go.einride.tech/cloudrunner/cloudclient"
"go.einride.tech/cloudrunner/cloudconfig"
Expand All @@ -27,6 +28,14 @@ import (
"google.golang.org/grpc"
)

// gracefulShutdownMaxGracePeriod is the maximum time we wait for the service to finish calling its cancel function
// after a SIGTERM/SIGINT is sent to us.
// If user is using cloudrunner in a Kubernetes like environment, make sure to set `terminationGracePeriodSeconds`
// (default as 30 seconds) above this value to make sure Kubernetes can wait for enough time for graceful shutdown.
// More info see here:
// https://cloud.google.com/blog/products/containers-kubernetes/kubernetes-best-practices-terminating-with-grace
const gracefulShutdownMaxGracePeriod = time.Second * 10
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using 10 seconds here as in our preferred CloudRun environment the terminationGracePeriodSeconds seems not configurable and a fixed 10s is specified in the CloudRun doc.

https://cloud.google.com/run/docs/container-contract#instance-shutdown

Perhaps I should put this in the comment as well.


// runConfig configures the Run entrypoint from environment variables.
type runConfig struct {
// Runtime contains runtime config.
Expand All @@ -49,8 +58,44 @@ type runConfig struct {

// Run a service.
// Configuration of the service is loaded from the environment.
//
// Example usage code can be like:
//
// err := cloudrunner.Run(func(ctx context.Context) error {
// grpcServer := cloudrunner.NewGRPCServer(ctx)
// return cloudrunner.ListenGRPC(ctx, grpcServer)
// })
func Run(fn func(context.Context) error, options ...Option) (err error) {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
Copy link
Contributor Author

@liufuyang liufuyang Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Old implementation makes a hard bond between the root context and SIGTERM, which means when a SIGTERM is received, the ctx will be canceled directly and user has no control of it.

New implementation breaks the bond, see trapShutdownSignal() for details. Taking some ideas from this post. https://medium.com/over-engineering/graceful-shutdown-with-go-http-servers-and-kubernetes-rolling-updates-6697e7db17cf

fnWrapper := func(ctx context.Context, _ *Shutdown) error {
// Shutdown is not used
return fn(ctx)
}
return RunWithGracefulShutdown(fnWrapper, options...)
}

// RunWithGracefulShutdown runs a service and provides a Shutdown where uer can use to register a
// cancel function that will be called when SIGTERM is received.
// Root context will be canceled after running the registered cancel function.
// If the registered cancel functions runs for time longer than gracefulShutdownMaxGracePeriod, Shutdown
// will move on to shut down root context and exist.
//
// Configuration of the service is loaded from the environment.
//
// Example usage code can be like:
//
// err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error {
// grpcServer := cloudrunner.NewGRPCServer(ctx)
// shutdown.RegisterCancelFunc(func() {
// grpcServer.Stop()
// // or clean up any other resources here
// })
// return cloudrunner.ListenGRPC(ctx, grpcServer)
// })
func RunWithGracefulShutdown(
fn func(ctx context.Context, shutdown *Shutdown) error,
options ...Option,
) (err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
usage := flag.Bool("help", false, "show help then exit")
yamlServiceSpecificationFile := flag.String("config", "", "load environment from a YAML service specification")
Expand Down Expand Up @@ -152,7 +197,61 @@ func Run(fn func(context.Context) error, options ...Option) (err error) {
)
}
}()
return fn(ctx)

shutdown := &Shutdown{
rootCtxCancel: cancel,
}
go shutdown.trapShutdownSignal(ctx, logger)
return fn(ctx, shutdown)
}

// Shutdown is used for CloudRunner to gracefully shutdown. It makes sure its cancel is called before
// rootCtxCancel is called.
type Shutdown struct {
rootCtxCancel func()
cancel func()
}

// RegisterCancelFunc can register a cancel function which will be called when SIGTERM is received, and it is called
// before Shutdown calling its rootCtxCancel() to cancel the root context.
func (s *Shutdown) RegisterCancelFunc(cancel func()) {
s.cancel = cancel
}

// trapShutdownSignal blocks and waits for shutdown signal, if received, call s.cancel() then shutdown.
//
//nolint:lll
func (s *Shutdown) trapShutdownSignal(ctx context.Context, logger *zap.Logger) {
logger.Info("watching for termination signals")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM)
signal.Notify(sigChan, syscall.SIGINT)

// block and wait for a shutdown signal
sig := <-sigChan
logger.Info("got signal:", zap.String("signal", sig.String()))
if s.cancel == nil {
logger.Info(
"cloudrunner.Shutdown is not used. Canceling root context directly. Call RunWithGracefulShutdown(...) to enable graceful shutdown if preferred.",
)
s.rootCtxCancel()
return
}

// initiate graceful shutdown by calling s.cancel()
logger.Info("graceful shutdown has begun")
gracefulPeriodCtx, gracefulPeriodCtxCancel := context.WithTimeout(ctx, gracefulShutdownMaxGracePeriod)
go func() {
s.cancel()
logger.Info("Shutdown.cancel() has finished, meaning we will shutdown cleanly")
gracefulPeriodCtxCancel()
}()

// block and wait until s.cancel() finish or gracefulPeriodCtx timeout.
<-gracefulPeriodCtx.Done()
logger.Info("exiting by canceling root context due to shutdown signal")

s.rootCtxCancel()
}

type runContext struct {
Expand Down
84 changes: 77 additions & 7 deletions run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,98 @@ package cloudrunner_test

import (
"context"
"flag"
"log"
"os"
"sync"
"syscall"
"testing"
"time"

"go.einride.tech/cloudrunner"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"gotest.tools/v3/assert"
)

func ExampleRun_helloWorld() {
if err := cloudrunner.Run(func(ctx context.Context) error {
func Test_Run_helloWorld(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.Run(func(ctx context.Context) error {
cloudrunner.Logger(ctx).Info("hello world")
return nil
}); err != nil {
log.Fatal(err)
}
})

assert.NilError(t, err)
}

func ExampleRun_gRPCServer() {
if err := cloudrunner.Run(func(ctx context.Context) error {
func Test_Run_gRPCServer(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.Run(func(ctx context.Context) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)

// For shutdown gRPC server otherwise we get blocked on ListenGRPC
go func() {
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
return cloudrunner.ListenGRPC(ctx, grpcServer)
})

assert.NilError(t, err)
}

func Test_RunWithGracefulShutdown_gRPCServer(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error {
grpcServer := cloudrunner.NewGRPCServer(ctx)
healthServer := health.NewServer()
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
shutdown.RegisterCancelFunc(func() {
grpcServer.Stop()
healthServer.Shutdown()
})

// For shutdown gRPC server otherwise we get blocked on ListenGRPC
go func() {
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()
return cloudrunner.ListenGRPC(ctx, grpcServer)
})

assert.NilError(t, err)
}

func Test_RunWithGracefulShutdown_helloWorld_ctx_cancel_should_before_clean_up_function_call(t *testing.T) {
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ContinueOnError)
if err := cloudrunner.RunWithGracefulShutdown(func(ctx context.Context, shutdown *cloudrunner.Shutdown) error {
wg := sync.WaitGroup{}
wg.Add(1)
cleanup := func() {
var isRootContextDone bool
select {
case <-ctx.Done():
isRootContextDone = true
default:
isRootContextDone = false
}
assert.Equal(t, isRootContextDone, false)
wg.Done()
}

shutdown.RegisterCancelFunc(cleanup)
cloudrunner.Logger(ctx).Info("hello world")

go func() {
// Simulating seeding a SIGTERM call.
time.Sleep(time.Second)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
}()

wg.Wait()
return nil
}); err != nil {
log.Fatal(err)
}
Expand Down