Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

If processor timeouts, the API call on check triggers another process of the queue #16

Open
floriankammermann opened this issue Jun 1, 2016 · 1 comment

Comments

@floriankammermann
Copy link

floriankammermann commented Jun 1, 2016

The following scenario:

  1. Normal enqueue
  2. The message is sent to the processor over the bus
  3. The timer is started to notify the queue, if the processor does not answer (standard is 4min)
  4. The processor doesn't send a response
  5. The check is triggered, after the consumer registration is expired in redis
  6. The check refresh the registration of the queue
  7. The queue sends the message to the processor
    --> Now the message is sent twice without waiting for the timeout. There is also no synchronisation between the timer and the check.

I think this is wrong, the check should ensure, that the message is not sent again, when the timer is not timeout yet.

@lbovet
Copy link
Member

lbovet commented Jun 2, 2016

5 . The check is triggered, after the consumer registration is expired in redis

Actually, the registration should not expire while a consumer is in CONSUMING state. The following code should ensure that registrations are kept during the processing:

       // Periodic refresh of my registrations on active queues.
        vertx.setPeriodic(refreshPeriod * 1000, event -> {
            // Check if I am still the registered consumer
            myQueues.entrySet().stream().filter(entry -> entry.getValue() == QueueState.CONSUMING).forEach(entry -> {
                final String queue = entry.getKey();
                // Check if I am still the registered consumer
                String consumerKey = redisPrefix + consumersPrefix + queue;
                if (log.isTraceEnabled()) {
                    log.trace("RedisQues refresh queues get: " + consumerKey);
                }
                redisClient.get(consumerKey, event1 -> {
                    String consumer = event1.result();
                    if (uid.equals(consumer)) {
                        log.debug("RedisQues Periodic consumer refresh for active queue " + queue);
                        refreshRegistration(queue, null);
                        updateTimestamp(queue, null);
                    } else {
                        log.debug("RedisQues Removing queue " + queue + " from the list");
                        myQueues.remove(queue);
                    }
                });
            });
        });

Is this code no more working correctly?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants