Skip to content

Commit

Permalink
Use exit signal handler to terminate the process cleanly
Browse files Browse the repository at this point in the history
Signed-off-by: Md Soharab Ansari <[email protected]>
  • Loading branch information
soharab-ic committed Sep 25, 2024
1 parent 1735cb6 commit badedbf
Showing 1 changed file with 14 additions and 4 deletions.
18 changes: 14 additions & 4 deletions redis-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"log"
"net/http"
"os"
"os/signal"
"syscall"

"github.com/go-redis/redis/v8"
"go.uber.org/zap"
Expand All @@ -19,7 +21,7 @@ type redisConnector struct {
logger *zap.Logger
}

func (conn redisConnector) consumeMessage(ctx context.Context) {
func (conn redisConnector) consumeMessage(sigterm chan os.Signal) {

headers := http.Header{
"KEDA-Topic": {conn.connectordata.Topic},
Expand All @@ -29,14 +31,16 @@ func (conn redisConnector) consumeMessage(ctx context.Context) {
"KEDA-Source-Name": {conn.connectordata.SourceName},
}

var ctx = context.Background()
forever := make(chan bool)

go func() {
for {
// BLPop will block and wait for a new message if the list is empty
msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result()
if err != nil {
conn.logger.Fatal("Error in consuming queue: ", zap.Error((err)))
conn.logger.Error("Error in consuming queue: ", zap.Error((err)))
forever <- false
}

if len(msg) > 1 {
Expand All @@ -57,13 +61,15 @@ func (conn redisConnector) consumeMessage(ctx context.Context) {
err = response.Body.Close()
if err != nil {
conn.logger.Error(err.Error())
forever <- false
}
}
}
}
}()
conn.logger.Info("Redis consumer up and running!")
<-forever
sigterm <- syscall.SIGTERM
}

func (conn redisConnector) errorHandler(ctx context.Context, err error) {
Expand Down Expand Up @@ -118,16 +124,20 @@ func main() {
}
password := os.Getenv("PASSWORD_FROM_ENV")

var ctx = context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: address,
Password: password,
})

sigterm := make(chan os.Signal, 1)
conn := redisConnector{
rdbConnection: rdb,
connectordata: connectordata,
logger: logger,
}
conn.consumeMessage(ctx)
conn.consumeMessage(sigterm)

signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
<-sigterm
logger.Info("Terminating: Redis consumer")
}

0 comments on commit badedbf

Please sign in to comment.