-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Multiple outstanding sent messages with QoS? #165
Comments
Hi @AndrewHeim , Is this a breach of protocol to send a packet before the previous message has been acknowledged? I'll have to dive a bit into some parts of the specification that I have yet to implement (resend with DUP flag). That being said, I think I see your point: you would like to be able to enqueue the next message to the server while the server processes the packet ack back to the publisher. The message ordering could still be maintained but you would essentially want to be able to use the full time slots asynchronously. At this time, you are correct that the Client Public Method does not allow this publication pattern, but the underlying architecture certainly is compatible. Follow the red and blue lines in the diagram... you'll see that only the Client API (green box) is blocking you. |
Thank you for the very thorough answer. As I think about it more, my request is a little bit of a silly one if you take into account the API design. In short, I'm not sure that much better could be done other than exactly what you describe. I remember when I went looking through the source of a C-based MQTT library, I was satisfied when I saw something like How to have an API that both lets you easily send multiple messages AND lets you choose to retry if there's a failure (while knowing what data went with what message)? On first brush, it's easy to have the caller subscribe to a callback on case of failure, but then how do you decide when to expire your data? You'd have to have an event for success too. By the time you have all those events flying around in LabVIEW (and the potential for bottle-necking it might introduce), along with the data storage space for sent-and-possibly-to-be-retried-data, it gets complicated. Far simpler to call Client Publish in parallel for as many writes as you want your pipe to be wide. Certainly less elegant (and less dynamic to network conditions) but possibly far easier to implement. In summary: After navel-gazing about my request, I think being able to call Send in parallel is as good in a per-development-time way as any other API. Unless you have a better idea? I'm not wondering whether ANY library may give me what I want without modification. I need to do some digging to make sure MQTT as a protocol will support the data rates I need. |
I'll keep this opened as it is a distinct possibility that one could provide for two different ways to implement the API... one that is synchronous (with wait for ACK) and one that queues it up. This extension would have no effect whatsoever on the underlying architecture, so I will consider implementing this in the future. thanks for the suggestion! Note to self: |
Hello Francois You may also recognized our CS++ add-on libraries for the NI Actor Framework. A good starting point is https://git.gsi.de/EE-LV/CSPP/cspp-template, branch develop. It used MQTT as default. I adopted your libraries to implement derived classes to support MWTT in CS++. You find it in the submodule https://git.gsi.de/EE-LV/CSPP/CSPP_MQTT. When using QoS=0 in principle everything works fine. Setting QoS=1 leads to dramatic performance issues resulting in error "Not enough memory to complete this operation." in application with many topics. This seems to be related to the synchronous message handling for QoS>0. You noted the problem yourself above. This is true for the G Open Source Project MQTT Broker as well as Mosquitto. So, The problem seems to exist also in the client libray, not in the broker only. Furthermore the Mosquitto broker report in its log-file: "protocoll error" My SW tries to reconnect and that is working for a few times, then Mosquitto resuses connection due to many reconnects. It assumes a bad client. So, I really would appreciate if you could implement the asynchronous message handling. If you are looking for a real world project with ~13500 topics refer to - https://git.gsi.de/EE-LV/CSPP/GSI_VAC/VacuumHeating, branch: feature/MQTT. If you are willing to use that project as an example for testing, of coarse I am willing to assist to setup the project on your computer. It would be very nice, if we could continue with our existing projects in distributed heterogenious environments, e.g. Python or EPICS based control systems, while slowly mirgrating. Best Holger |
Hi Holger @HB-GSI , I've been totally swamped with work and have not had much time to devote to this library apart from very low-hanging fruit bug fixes. To be completely honest, I don't anticipate that I will be able to work on and test such a change any time soon. On the broker side, there might be easier optimizations in the subscription engine that could increase throughput to the point where this limit is pushed a bit further. It would not be a truly scalable long-term solution but might be giving some breathing room until a solution can be enacted on asynchronously. I think the QoS latency is purely a Client API issue because of the synchronous wait, but the broker does not wait at all in a QoS=1 and emits the puback response as soon as it can. The "not enough memory" error is likely either 1) a protocol violation in the handle incoming packet of the broker's core (some infinite loop due to a bug?), or 2) something related to the subscription engine which makes loads of copies. One thing you could explore to find out which one is your biggest hit in your projects is to overwrite the MQTT Tracer object in the MQTT Broker library, and monitor the incoming and outgoing messages. This way, you can measure the latency for matching PUB and PUBACK IDs. I have wanted to produce such a debugging tool for a while but I have nothing yet to show for it. The hooks are in place in the broker lib. Noteworthy to know:
If you want to tackle the asynchronous client ACK handling, I welcome pull requests! |
Hi Francois |
Does this library have outstanding sent messages with QoS?
A bit more background: MQTT has a limit on the max message size - I get the feeling that it may possibly vary a bit from platform to platform, but I've reached the conclusion that an MQTT message is within an order of magnitude of a UDP datagram (65k bytes). I forget at the moment whether it's closer to half or double.
When you have a high-latency link, like say an IoT device, the number of outstanding messages (with QoS) can strongly influence the total effective bandwidth. If each message sent needs to receive acknowledgement before the next one is sent, with 100+ms between each, that puts a damper on the data you can send.
After looking through the code, it gave me the impression that it only had one packet outstanding at a time. It seemed to send a message, and if QoS was enabled it would wait there until receiving acknowledgement before returning (and sending the next message). This would be opposed to a scheme where multiple messages could be sent and are waiting outstanding, pending an acknowledgement. But I could be wrong in my conclusion, depending on how those VIs were called - the architecture is well abstracted and appears very extensible.
Did I understand that right? I'm looking for a library that supports multiple messages outstanding, so I can get a bit higher throughput. Not massive throughput of course, since MQTT isn't the MOST efficient thing out there, but a bit higher than 3-4 messages per second at that size. (It's important to use such a messaging layer with scenarios where a TCP session might break, because those buffers don't tell you exactly what data made it and what didn't - MQTT or such a layer does.)
The text was updated successfully, but these errors were encountered: