Skip to content

Commit

Permalink
fix:consumer processes deliveries only once
Browse files Browse the repository at this point in the history
  • Loading branch information
akolpakov-somehash committed Dec 25, 2024
1 parent e708a3c commit a034ad1
Showing 1 changed file with 60 additions and 58 deletions.
118 changes: 60 additions & 58 deletions internal/pkg/rabbitmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package rabbitmq
import (
"context"
"fmt"
"reflect"
"runtime"
"strings"
"time"

"github.com/ahmetb/go-linq/v3"
"github.com/iancoleman/strcase"
jsoniter "github.com/json-iterator/go"
Expand All @@ -11,10 +16,6 @@ import (
"github.com/streadway/amqp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"reflect"
"runtime"
"strings"
"time"
)

//go:generate mockery --name IConsumer
Expand Down Expand Up @@ -104,60 +105,61 @@ func (c Consumer[T]) ConsumeMessage(msg interface{}, dependencies T) error {
}

go func() {

select {
case <-c.ctx.Done():
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
c.log.Errorf("failed to close channel closed for for queue: %s", q.Name)
}
}(ch)
c.log.Infof("channel closed for for queue: %s", q.Name)
return

case delivery, ok := <-deliveries:
{
if !ok {
c.log.Errorf("NOT OK deliveries channel closed for queue: %s", q.Name)
return
}

// Extract headers
c.ctx = otel.ExtractAMQPHeaders(c.ctx, delivery.Headers)

err := c.handler(q.Name, delivery, dependencies)
if err != nil {
c.log.Error(err.Error())
}

consumedMessages = append(consumedMessages, snakeTypeName)

_, span := c.jaegerTracer.Start(c.ctx, consumerHandlerName)

h, err := jsoniter.Marshal(delivery.Headers)

if err != nil {
c.log.Errorf("Error in marshalling headers in consumer: %v", string(h))
}

span.SetAttributes(attribute.Key("message-id").String(delivery.MessageId))
span.SetAttributes(attribute.Key("correlation-id").String(delivery.CorrelationId))
span.SetAttributes(attribute.Key("queue").String(q.Name))
span.SetAttributes(attribute.Key("exchange").String(delivery.Exchange))
span.SetAttributes(attribute.Key("routing-key").String(delivery.RoutingKey))
span.SetAttributes(attribute.Key("ack").Bool(true))
span.SetAttributes(attribute.Key("timestamp").String(delivery.Timestamp.String()))
span.SetAttributes(attribute.Key("body").String(string(delivery.Body)))
span.SetAttributes(attribute.Key("headers").String(string(h)))

// Cannot use defer inside a for loop
time.Sleep(1 * time.Millisecond)
span.End()

err = delivery.Ack(false)
if err != nil {
c.log.Errorf("We didn't get a ack for delivery: %v", string(delivery.Body))
for {
select {
case <-c.ctx.Done():
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
c.log.Errorf("failed to close channel closed for for queue: %s", q.Name)
}
}(ch)
c.log.Infof("channel closed for for queue: %s", q.Name)
return

case delivery, ok := <-deliveries:
{
if !ok {
c.log.Errorf("NOT OK deliveries channel closed for queue: %s", q.Name)
return
}

// Extract headers
c.ctx = otel.ExtractAMQPHeaders(c.ctx, delivery.Headers)

err := c.handler(q.Name, delivery, dependencies)
if err != nil {
c.log.Error(err.Error())
}

consumedMessages = append(consumedMessages, snakeTypeName)

_, span := c.jaegerTracer.Start(c.ctx, consumerHandlerName)

h, err := jsoniter.Marshal(delivery.Headers)

if err != nil {
c.log.Errorf("Error in marshalling headers in consumer: %v", string(h))
}

span.SetAttributes(attribute.Key("message-id").String(delivery.MessageId))
span.SetAttributes(attribute.Key("correlation-id").String(delivery.CorrelationId))
span.SetAttributes(attribute.Key("queue").String(q.Name))
span.SetAttributes(attribute.Key("exchange").String(delivery.Exchange))
span.SetAttributes(attribute.Key("routing-key").String(delivery.RoutingKey))
span.SetAttributes(attribute.Key("ack").Bool(true))
span.SetAttributes(attribute.Key("timestamp").String(delivery.Timestamp.String()))
span.SetAttributes(attribute.Key("body").String(string(delivery.Body)))
span.SetAttributes(attribute.Key("headers").String(string(h)))

// Cannot use defer inside a for loop
time.Sleep(1 * time.Millisecond)
span.End()

err = delivery.Ack(false)
if err != nil {
c.log.Errorf("We didn't get a ack for delivery: %v", string(delivery.Body))
}
}
}
}
Expand Down

0 comments on commit a034ad1

Please sign in to comment.