diff --git a/internal/pkg/rabbitmq/consumer.go b/internal/pkg/rabbitmq/consumer.go index 2c0b3ba..018548d 100644 --- a/internal/pkg/rabbitmq/consumer.go +++ b/internal/pkg/rabbitmq/consumer.go @@ -3,6 +3,11 @@ package rabbitmq import ( "context" "fmt" + "reflect" + "runtime" + "strings" + "time" + "github.com/ahmetb/go-linq/v3" "github.com/iancoleman/strcase" jsoniter "github.com/json-iterator/go" @@ -11,10 +16,6 @@ import ( "github.com/streadway/amqp" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" - "reflect" - "runtime" - "strings" - "time" ) //go:generate mockery --name IConsumer @@ -104,60 +105,61 @@ func (c Consumer[T]) ConsumeMessage(msg interface{}, dependencies T) error { } go func() { - - select { - case <-c.ctx.Done(): - defer func(ch *amqp.Channel) { - err := ch.Close() - if err != nil { - c.log.Errorf("failed to close channel closed for for queue: %s", q.Name) - } - }(ch) - c.log.Infof("channel closed for for queue: %s", q.Name) - return - - case delivery, ok := <-deliveries: - { - if !ok { - c.log.Errorf("NOT OK deliveries channel closed for queue: %s", q.Name) - return - } - - // Extract headers - c.ctx = otel.ExtractAMQPHeaders(c.ctx, delivery.Headers) - - err := c.handler(q.Name, delivery, dependencies) - if err != nil { - c.log.Error(err.Error()) - } - - consumedMessages = append(consumedMessages, snakeTypeName) - - _, span := c.jaegerTracer.Start(c.ctx, consumerHandlerName) - - h, err := jsoniter.Marshal(delivery.Headers) - - if err != nil { - c.log.Errorf("Error in marshalling headers in consumer: %v", string(h)) - } - - span.SetAttributes(attribute.Key("message-id").String(delivery.MessageId)) - span.SetAttributes(attribute.Key("correlation-id").String(delivery.CorrelationId)) - span.SetAttributes(attribute.Key("queue").String(q.Name)) - span.SetAttributes(attribute.Key("exchange").String(delivery.Exchange)) - span.SetAttributes(attribute.Key("routing-key").String(delivery.RoutingKey)) - span.SetAttributes(attribute.Key("ack").Bool(true)) - span.SetAttributes(attribute.Key("timestamp").String(delivery.Timestamp.String())) - span.SetAttributes(attribute.Key("body").String(string(delivery.Body))) - span.SetAttributes(attribute.Key("headers").String(string(h))) - - // Cannot use defer inside a for loop - time.Sleep(1 * time.Millisecond) - span.End() - - err = delivery.Ack(false) - if err != nil { - c.log.Errorf("We didn't get a ack for delivery: %v", string(delivery.Body)) + for { + select { + case <-c.ctx.Done(): + defer func(ch *amqp.Channel) { + err := ch.Close() + if err != nil { + c.log.Errorf("failed to close channel closed for for queue: %s", q.Name) + } + }(ch) + c.log.Infof("channel closed for for queue: %s", q.Name) + return + + case delivery, ok := <-deliveries: + { + if !ok { + c.log.Errorf("NOT OK deliveries channel closed for queue: %s", q.Name) + return + } + + // Extract headers + c.ctx = otel.ExtractAMQPHeaders(c.ctx, delivery.Headers) + + err := c.handler(q.Name, delivery, dependencies) + if err != nil { + c.log.Error(err.Error()) + } + + consumedMessages = append(consumedMessages, snakeTypeName) + + _, span := c.jaegerTracer.Start(c.ctx, consumerHandlerName) + + h, err := jsoniter.Marshal(delivery.Headers) + + if err != nil { + c.log.Errorf("Error in marshalling headers in consumer: %v", string(h)) + } + + span.SetAttributes(attribute.Key("message-id").String(delivery.MessageId)) + span.SetAttributes(attribute.Key("correlation-id").String(delivery.CorrelationId)) + span.SetAttributes(attribute.Key("queue").String(q.Name)) + span.SetAttributes(attribute.Key("exchange").String(delivery.Exchange)) + span.SetAttributes(attribute.Key("routing-key").String(delivery.RoutingKey)) + span.SetAttributes(attribute.Key("ack").Bool(true)) + span.SetAttributes(attribute.Key("timestamp").String(delivery.Timestamp.String())) + span.SetAttributes(attribute.Key("body").String(string(delivery.Body))) + span.SetAttributes(attribute.Key("headers").String(string(h))) + + // Cannot use defer inside a for loop + time.Sleep(1 * time.Millisecond) + span.End() + + err = delivery.Ack(false) + if err != nil { + c.log.Errorf("We didn't get a ack for delivery: %v", string(delivery.Body)) + } } } }