-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Review amqp.py to consider publish_confirms, consumers, etc... #457
Comments
side effect of #407 |
I think it means putSetup launching a thread.... and putCleanup likely needs to reap it? |
While reading the mqtt.py code, I noticed a possible bug on line 379? sarracenia/sarracenia/moth/mqtt.py Lines 377 to 379 in bee974e
|
yeah... err should be ex, I guess. |
based on @reidsunderland's work on #407, this does not look like a priority for now. If we come back to this, we should also study whether use of consumers (basic_consume) would be helpful in some way. note that in brokers we never register as "consumers." It is unclear what effect the lack of registration as consumers has on message flow. |
I made an attempt at using basic_consume instead of basic_get. It seems to work, and the flow tests pass. On branch issue_457_amqp_consumer |
very cool! I guess you put the flag in default.conf when you run the flow tests, and it's OK? Yeah, the lack of registered consumers has always bothered me (The C one uses a consumer btw.) I don't know why you need the findAllSubclasses() entry point, feels like it was fine before (would have found this new one, no?) This is a super safe easy way to introduce the code without changing behaviour, so we can kick the tires for a while. After a while we could just drop the flag (or set it to always true) or merge the classes... or something, I´m struggling to know why we would keep both once we are confident that basic_consume stuff works. |
Yeah, I ran the flow tests with One of the indirect benefits of using consumers is that it's much easier to find out which IP addresses are consuming from a queue, it shows up directly in the GUI or CLI. That will be nice if we need to track down where a queue is coming from.
I was thinking eventually the More testing is needed, like having multiple nodes (IPs) consume from the same queue. It should work fine but I haven't tested that. I also want to see if there's any performance improvement with consumers. In theory consuming should be faster, but I don't know if this implementation is or not. I'll clean up the forced log level debug in the code, and try to get it working in GitHub Actions, then create a PR? |
yeah, that sounds great! |
Having this code # docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.basic_consume
self.channel.basic_consume(self.o['queueName'], no_ack=False, callback=self.__get_on_message)
self.__stop_thread() # make sure it's not already running
self._consumer_thread = threading.Thread(target=self.__drain_events)
self._consumer_thread.start() in getSetup causes it to execute even when it's just doing things like declaring queues or exchanges. I moved it into its own function that gets called by getNewMessage (same place getSetup is called). Unscientifically testing on my computer, the static_flow test using the consumer version (4 min 55 sec) seems to be about 10 seconds faster than the basic_get version (5 min 5 sec). |
fwiw, I did a few test runs.. I'm actually surprised at how consistent it is. I don't see any performance difference. I didn't monitor load though...
The later runs are slower, but I think need more samples, and I don't think that's real yet. Using consumer is still great for many other reasons... but I'm not sure we can claim performance improvements yet. |
The dynamic flow has started consistently crashing/haning at the start, with the initial shovel dying (not sure if it's hung) with "malloc(): unsorted double linked list corrupted"
no idea what happenned, but I can't get it to complete any more. |
I am just guessing, I think it might be related to the changes introduced by adding the new queue declares for metrics: #818 Two threads might be trying to do different things with the connection at the same time? |
yup. I commented out the periodic queue declares, and it does not get stuck anymore. |
From a bit of reading, connections are supposed to be thread safe, and we're using two different channels. That seems to be the right way to do this... But the drain_events is called on the connection, not the channel, and something is crashing. That came from these examples: https://github.com/celery/py-amqp#quick-overview |
I tried removing the periodic queue declares )leaving the initial one, and that is broken also. If someone instead turns the existing declare_queue setting off, then it should work... but all the flow tests require declaring of queues. so that's a bit of engineering... (set it up so sr3 declare does the right thing, then add something to turn the option off before any configs are started. It's a bit convoluted. |
I found the answer, connections are not thread safe. I don't know why I thought they were. So it's almost more surprising that it seemed to work okay before |
do we need a thread? maybe a single flow is fine? |
We could probably call drain_events in getNewMessage and not use a thread, but if there's no events to drain we'd have to wait for it to timeout |
drain event has a timeout... just set to something like a microsecond... that's what I did on the C side. with a 2 ghz clock, that means it waits 2 thousand cycles, but there's still a million waits in a second... so likely fine for all purposes. In the C I started doing exponential back off... wait 1 microsecond at first, then double each time there is nothing.... until some upper bound, like a second ... I guess. |
the only point of the exponential back off is to reduce cpu load. |
perhaps try it without threads? I don't think we need it, and KISS principle applies. |
We have the ability to set a consumer tag with a consumer. It might be useful to have these things in the tag:
|
yes. I think that's great. We already have the ip-address in the rabbitmq management gui so the hostname is useful information. we discussed privacy concerns... perhaps someone would object to divulging their hostnam to the broker... hmm... We could use the HOSTNAME environment variable, which the user can override in their config file if they object... ? is that a resonable mechanism... it gives people an out if they object, but the most useful setting is the default. oh... just noticed that HOSTNAME to override self.hostname is not explicitly implemented in sr3... might have to check that that actually works. ( #967 ) |
I'd like to add an option to let the user define the tag like we have for queueName, and make it work with all variables that are supported for queueName |
The flakey broker test is consistently failing with the AMQP consumer, it seems to be a real problem with the code, not a test issue. I'll try to figure out the problem when I have time. |
Seeing message loss in flakey_broker (sessions looking at logs with @reidsunderland ) and we think one way to address it,
is to add publisher_confirms to the amqp.py module.
Looking at: https://docs.celeryproject.org/projects/amqp/en/latest/index.html
We need to add a callback to the amqp.py and have the channel refer to it.
the ack will remove messages from the ack_wait worklist...
(basic publish would need to put messages on the "pending_messages" waitlist...
Channel.events['basic_ack'].append(my_callback) <-- search for that...
so far... the way the amqp library is used is completely sychronous. we perform an operation, we wait for it. then next operation... I think this requires having the library do some async stuff... there is mention of: Connection.heartbeat_tick(rate=2) must called at regular intervals (half of the heartbeat value if rate is 2).
if you look at mqtt.py... it is already implemented that way. so we have a model.
The text was updated successfully, but these errors were encountered: