Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Review amqp.py to consider publish_confirms, consumers, etc... #457

Open
petersilva opened this issue Feb 24, 2022 · 27 comments
Open

Review amqp.py to consider publish_confirms, consumers, etc... #457

petersilva opened this issue Feb 24, 2022 · 27 comments
Assignees
Labels
Developer not a problem, more of a note to self for devs about work to do. Refactor change implementation of existing functionality. wishlist would be nice, not pressing for any particular client. work-around a work-around is provided, mitigating the issue.

Comments

@petersilva
Copy link
Contributor

Seeing message loss in flakey_broker (sessions looking at logs with @reidsunderland ) and we think one way to address it,
is to add publisher_confirms to the amqp.py module.

Looking at: https://docs.celeryproject.org/projects/amqp/en/latest/index.html

We need to add a callback to the amqp.py and have the channel refer to it.
the ack will remove messages from the ack_wait worklist...
(basic publish would need to put messages on the "pending_messages" waitlist...
Channel.events['basic_ack'].append(my_callback) <-- search for that...

so far... the way the amqp library is used is completely sychronous. we perform an operation, we wait for it. then next operation... I think this requires having the library do some async stuff... there is mention of: Connection.heartbeat_tick(rate=2) must called at regular intervals (half of the heartbeat value if rate is 2).

if you look at mqtt.py... it is already implemented that way. so we have a model.

@petersilva
Copy link
Contributor Author

side effect of #407

@petersilva
Copy link
Contributor Author

I think it means putSetup launching a thread.... and putCleanup likely needs to reap it?

@reidsunderland
Copy link
Member

While reading the mqtt.py code, I noticed a possible bug on line 379?

except Exception as ex:
logger.error( "ignored malformed message: %s" % mqttMessage.payload )
logger.error("decode error" % err)

@petersilva
Copy link
Contributor Author

yeah... err should be ex, I guess.

@petersilva
Copy link
Contributor Author

petersilva commented Feb 26, 2022

based on @reidsunderland's work on #407, this does not look like a priority for now.

If we come back to this, we should also study whether use of consumers (basic_consume) would be helpful in some way. note that in brokers we never register as "consumers." It is unclear what effect the lack of registration as consumers has on message flow.

@petersilva petersilva added Developer not a problem, more of a note to self for devs about work to do. wishlist would be nice, not pressing for any particular client. work-around a work-around is provided, mitigating the issue. labels Feb 26, 2022
@petersilva
Copy link
Contributor Author

@petersilva petersilva changed the title shift amqp.py to use publish_confirms... Review amqp.py to consier publish_confirms, consumers, etc... Feb 26, 2022
@petersilva petersilva changed the title Review amqp.py to consier publish_confirms, consumers, etc... Review amqp.py to consider publish_confirms, consumers, etc... Feb 26, 2022
@petersilva petersilva added the Refactor change implementation of existing functionality. label Oct 21, 2022
@reidsunderland
Copy link
Member

I made an attempt at using basic_consume instead of basic_get. It seems to work, and the flow tests pass. On branch issue_457_amqp_consumer

@petersilva
Copy link
Contributor Author

very cool!
Use "amqp_consumer" as a flag to use that AMQP implementation instead of the standard one, so I guess we can do A:B testing that ways... That makes it super low risk to merge, and try it out in various situations to be more certain about it.

I guess you put the flag in default.conf when you run the flow tests, and it's OK?

Yeah, the lack of registered consumers has always bothered me (The C one uses a consumer btw.) I don't know why you need the findAllSubclasses() entry point, feels like it was fine before (would have found this new one, no?)

This is a super safe easy way to introduce the code without changing behaviour, so we can kick the tires for a while. After a while we could just drop the flag (or set it to always true) or merge the classes... or something, I´m struggling to know why we would keep both once we are confident that basic_consume stuff works.

@reidsunderland
Copy link
Member

Yeah, I ran the flow tests with amqp_consumer True in the default.conf.

One of the indirect benefits of using consumers is that it's much easier to find out which IP addresses are consuming from a queue, it shows up directly in the GUI or CLI. That will be nice if we need to track down where a queue is coming from.

__subclasses__() was only finding the classes that have Moth as the parent. AMQPConsumer is a subclass of AMQP and it wasn't in the list of Moth subclasses, so I had to make it recursive to find the subclasses' subclasses.

I was thinking eventually the amqp_consumer default could be changed to True or just merge the classes like you said. I'm also not sure if there's any benefit to keeping basic_get around if this is working.

More testing is needed, like having multiple nodes (IPs) consume from the same queue. It should work fine but I haven't tested that. I also want to see if there's any performance improvement with consumers. In theory consuming should be faster, but I don't know if this implementation is or not.

I'll clean up the forced log level debug in the code, and try to get it working in GitHub Actions, then create a PR?

@petersilva
Copy link
Contributor Author

yeah, that sounds great!

@reidsunderland
Copy link
Member

Having this code

# docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.basic_consume
        self.channel.basic_consume(self.o['queueName'], no_ack=False, callback=self.__get_on_message)
        self.__stop_thread() # make sure it's not already running
        self._consumer_thread = threading.Thread(target=self.__drain_events)
        self._consumer_thread.start()

in getSetup causes it to execute even when it's just doing things like declaring queues or exchanges. I moved it into its own function that gets called by getNewMessage (same place getSetup is called).

Unscientifically testing on my computer, the static_flow test using the consumer version (4 min 55 sec) seems to be about 10 seconds faster than the basic_get version (5 min 5 sec).

@petersilva
Copy link
Contributor Author

fwiw, I did a few test runs.. I'm actually surprised at how consistent it is. I don't see any performance difference. I didn't monitor load though...
This is on my home PC.

basic_get (baseline)

/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 08:21:33 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 08:24:36 PM EST       3:03
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 08:26:17 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 08:29:20 PM EST       3:03
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 08:30:00 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 08:32:14 PM EST       2:14
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 08:34:27 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Tue 14 Nov 2023 08:44:39 PM EST    10:12
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 08:46:42 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Tue 14 Nov 2023 08:56:19 PM EST    10:07
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 08:57:57 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Tue 14 Nov 2023 09:07:18 PM EST     9:21
/home/peter/Sarracenia/insects/dynamic_flow start Tue 14 Nov 2023 10:56:37 PM EST
/home/peter/Sarracenia/insects/dynamic_flow end Tue 14 Nov 2023 11:20:58 PM EST     24:19
/home/peter/Sarracenia/insects/no_mirror start Tue 14 Nov 2023 11:25:55 PM EST
/home/peter/Sarracenia/insects/no_mirror end Tue 14 Nov 2023 11:31:42 PM EST         5:47

amqp_consumer


/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 11:34:37 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 11:37:40 PM EST      3:03
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 11:41:50 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 11:44:54 PM EST      3:04
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 11:46:18 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 11:49:11 PM EST      2:53
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 11:51:03 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Wed 15 Nov 2023 12:01:10 AM EST   10:07
/home/peter/Sarracenia/insects/flakey_broker start Wed 15 Nov 2023 12:03:09 AM EST
/home/peter/Sarracenia/insects/flakey_broker end Wed 15 Nov 2023 12:12:30 AM EST    9:21
/home/peter/Sarracenia/insects/flakey_broker start Wed 15 Nov 2023 12:15:18 AM EST
/home/peter/Sarracenia/insects/flakey_broker end Wed 15 Nov 2023 12:24:55 AM EST   10:13
/home/peter/Sarracenia/insects/dynamic_flow start Wed 15 Nov 2023 12:26:57 AM EST
/home/peter/Sarracenia/insects/dynamic_flow end Wed 15 Nov 2023 12:52:21 AM EST    27:18
/home/peter/Sarracenia/insects/no_mirror start Wed 15 Nov 2023 12:54:47 AM EST
/home/peter/Sarracenia/insects/no_mirror end Wed 15 Nov 2023 01:00:36 AM EST        7:23

The later runs are slower, but I think need more samples, and I don't think that's real yet. Using consumer is still great for many other reasons... but I'm not sure we can claim performance improvements yet.

@petersilva
Copy link
Contributor Author

The dynamic flow has started consistently crashing/haning at the start, with the initial shovel dying (not sure if it's hung) with "malloc(): unsorted double linked list corrupted"

2023-11-16 08:37:09,339 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,339 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,340 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,340 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,340 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
malloc(): unsorted double linked list corrupted
2023-11-16 08:37:09,457 177976 [CRITICAL] root run_command subprocess.run failed err=Command '['/usr/bin/python3', '/home/peter/Sarracenia/sr3/sarracenia/instance.py', '--no', '0', 'foreground', 'shovel/t_dd1_f00']' died with <Signals.SIGABRT: 6>.

no idea what happenned, but I can't get it to complete any more.
If I turn amqp_consumer off it becomes functional again.

@reidsunderland
Copy link
Member

I am just guessing, I think it might be related to the changes introduced by adding the new queue declares for metrics: #818

Two threads might be trying to do different things with the connection at the same time?

@petersilva
Copy link
Contributor Author

yup. I commented out the periodic queue declares, and it does not get stuck anymore.
hmm... how to fix?

@reidsunderland
Copy link
Member

From a bit of reading, connections are supposed to be thread safe, and we're using two different channels. That seems to be the right way to do this... But the drain_events is called on the connection, not the channel, and something is crashing.

That came from these examples: https://github.com/celery/py-amqp#quick-overview

@petersilva
Copy link
Contributor Author

petersilva commented Nov 20, 2023

I tried removing the periodic queue declares )leaving the initial one, and that is broken also. If someone instead turns the existing declare_queue setting off, then it should work... but all the flow tests require declaring of queues. so that's a bit of engineering... (set it up so sr3 declare does the right thing, then add something to turn the option off before any configs are started. It's a bit convoluted.

@reidsunderland
Copy link
Member

reidsunderland commented Nov 21, 2023

I found the answer, connections are not thread safe. I don't know why I thought they were. So it's almost more surprising that it seemed to work okay before

celery/py-amqp#267 (comment)

@petersilva
Copy link
Contributor Author

do we need a thread? maybe a single flow is fine?

@reidsunderland
Copy link
Member

We could probably call drain_events in getNewMessage and not use a thread, but if there's no events to drain we'd have to wait for it to timeout

@petersilva
Copy link
Contributor Author

drain event has a timeout... just set to something like a microsecond... that's what I did on the C side. with a 2 ghz clock, that means it waits 2 thousand cycles, but there's still a million waits in a second... so likely fine for all purposes. In the C I started doing exponential back off... wait 1 microsecond at first, then double each time there is nothing.... until some upper bound, like a second ... I guess.

@petersilva
Copy link
Contributor Author

the only point of the exponential back off is to reduce cpu load.

@petersilva
Copy link
Contributor Author

perhaps try it without threads? I don't think we need it, and KISS principle applies.

@reidsunderland
Copy link
Member

We have the ability to set a consumer tag with a consumer. It might be useful to have these things in the tag:

  • PID
  • Instance #
  • Hostname

@petersilva
Copy link
Contributor Author

petersilva commented Mar 11, 2024

yes. I think that's great. We already have the ip-address in the rabbitmq management gui so the hostname is useful information. we discussed privacy concerns... perhaps someone would object to divulging their hostnam to the broker... hmm... We could use the HOSTNAME environment variable, which the user can override in their config file if they object... ? is that a resonable mechanism... it gives people an out if they object, but the most useful setting is the default.

oh... just noticed that HOSTNAME to override self.hostname is not explicitly implemented in sr3... might have to check that that actually works. ( #967 )

@reidsunderland
Copy link
Member

I'd like to add an option to let the user define the tag like we have for queueName, and make it work with all variables that are supported for queueName

@reidsunderland
Copy link
Member

The flakey broker test is consistently failing with the AMQP consumer, it seems to be a real problem with the code, not a test issue. I'll try to figure out the problem when I have time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Developer not a problem, more of a note to self for devs about work to do. Refactor change implementation of existing functionality. wishlist would be nice, not pressing for any particular client. work-around a work-around is provided, mitigating the issue.
Projects
None yet
Development

No branches or pull requests

2 participants