Boost logo

Boost :

From: Ivica Siladic (ivica.siladic_at_[hidden])
Date: 2024-10-21 14:17:04


Hi Vinnie,

> On 20.10.2024., at 23:09, Vinnie Falco <vinnie.falco_at_[hidden]> wrote:
>
> On Sun, Oct 20, 2024 at 1:31 PM Ivica Siladic <ivica.siladic_at_[hidden] <mailto:ivica.siladic_at_[hidden]>> wrote:
>> TlsContext is indeed not a concept
>
> I think that the treatment of TlsContext in the public API doesn't rise to the same level of quality as the rest of the library. I'm not sure what the best way to improve it is, though. One thing I am certain of, is that it should not be presented as if it is a concept. It should be designed as the least-powerful construct which permits the desired functionality. A concept is too broad. A "bool" seems right, either TLS is on or off. Ultimately it is up to you to decide how much to invest here (or not to).
>

You’re probably right that we could improve how we handle TlsContext. The current solution is the most straightforward approach we could devise. The core issue is that we create, configure, and manage the SSL socket entirely within the mqtt_client code. These operations require access to the appropriate SSL context, which depends on the specific stream implementation. This is true for both Asio SSL (OpenSSL) and Botan SSL streams, and it would likely apply to any other Asio-compatible SSL streams. Since the context needs to be created based on various user-defined policies, it must be supplied by the user.

One alternative could have been to expose a function-based customization point that creates the context when needed. We could call this “factory” function from within the mqtt_client code, obtain the context, and proceed as we do now. However, this approach doesn't work well if the user wants to reuse the same context for other SSL streams within the same program that are unrelated to mqtt_client (which is something we’ve encountered in practice). Providing the SSL context directly through the constructor seems like a more straightforward and flexible solution in this regard.

Since TlsContext cannot be type-erased—we don’t know or need to know its interface—we ended up using template parameterization for mqtt_client with TlsContext.

As mentioned earlier, this is the best solution we could come up with so far.

>> Literally all Asio async objects have constructors that accept an executor or execution context. I believe this is a bit of a historical relic, but we included it to maintain consistency with Asio standards.
>
> Yeah...one school of thought is to only include functions which are used and have a demonstrable use case. There is an argument to be made for removing this. Asio includes it for backward compatibility, yet this rationale doesn't apply to your library which is new. Up to you to decide what to do.
>
>> This is precisely how, for example, the Asio basic_deadline_timer works.
>
> Okay, then its fine I suppose.
>
>> ...the conceptual usage of the MQTT Will is likely beyond the scope of the Async.MQTT5 documentation.
>
> I agree but you can at least have one or two sentences which explain that it is an MQTT thing, and you can include a hyperlink to an external site which explains it more deeply. For example:
>
> /**
> * \brief Assign a \ref will Message.
> *
> * \details The \ref will Message that the Broker should publish
> * after the Network Connection is closed and it is not
> * closed normally.
> *
> * \attention This function takes action when the client is in a non-operational state,
> * meaning the \ref async_run function has not been invoked.
> * Furthermore, you can use this function after the \ref cancel function has been called,
> * before the \ref async_run function is invoked again.
> *
> * \par References
> *
> * \see https://www.hivemq.com/blog/mqtt-essentials-part-9-last-will-and-testament/

Thanks, it’s definitely better that way.

> */
>
>> In the vast majority of cases, there will be only one broker
>
> Fine.
>
>> Resource consumption will not grow indefinitely. The client will attempt to send up to 65,535 packets, after which async_publish will immediately return an error code (pid_overrun), as it will have run out of Packet Identifiers.
>
> 65,535 sounds like quite a lot. The reason that I ask about buffering, is because I am trying to determine if the mqtt_client interface interferes with the correct operation of application-level TCP flow control. This is best accomplished by designing a system with back-pressure, which automatically limits the application's ability to generate new work to match the peer's ability to consume it.
>
> For example, asio sockets can only have one pending read and one pending write operation. To write the next buffer you have to finish writing the current buffer. In other words, the application is blocked from writing as long as there is a write active. In this case, the delivery of the write completion is naturally delayed until the peer acknowledges the data.
>
> If the mqtt_client buffers up to 65,535 outgoing writes, then the calling application will perform much more work than it should. It will perform more work than the receiving end is able to process. The result is bursting of the network link and oversubscribing. When the peer is unable to keep up with incoming messages, it is better for the sending application to slow down so that it produces new work at the lower rate.
>
> Disclaimer: this is all based on theories and I have not performed any measurements.

In many aspects, the MQTT protocol treats the client and broker (server) symmetrically. For example, the broker sends PUBLISH messages in much the same way the client does. The key difference lies in the scale: a broker typically needs to manage connections to tens of thousands of clients, all while running on a single machine.

When we designed the behavior of the buffers in mqtt_client, we looked closely at how this is handled in the EMQX broker, one of the most popular brokers, also used by AWS. EMQX allows users to configure the size of the outgoing queue, which defaults to 1,000. While this is significantly lower than the 64k buffer size we use, it’s important to remember that brokers must handle a large number of simultaneous connections. We opted for a policy that’s roughly equivalent to a broker managing 64 concurrent connections, which seemed like a reasonable balance.

Further reasoning can be found in the response to the next question:

>
>> If you want to impose stricter limits, you can implement a more complex solution on the user's side. For example, you could track the number of bytes (or messages) sent by incrementing a counter each time you call async_publish and decrementing it in the completion handler of async_publish. Before attempting to publish, you should check whether the number of in-flight messages has reached your limit, and if so, simply skip publishing until some messages are acknowledged.
>
> Well, that's not a nice solution at all. A strength of the library is to take care of complicated things automatically. The library's design should naturally solve this problem without any action required by callers. To "skip publishing" may be difficult, or even impossible. For example, if the application has a condition that all messages must be received by the peer, then it cannot just throw out messages when a limit is reached. It would have to queue them up. Which defeats the purpose of adding an in-flight message limit feature :)
>

To clarify: when calling async_publish with QoS = 0, the operation completes (i.e., the completion handler is invoked) once the data is successfully written to the underlying stream. For QoS = 1, it completes when the broker acknowledges the packet. With QoS = 2, it completes after a two-way acknowledgment between the client and broker is finalized.

In all three cases, the typical way to send messages is by calling async_publish in a manner similar to how Asio handles async_write on streams—publishing a new message only when the previous one has completed. However, in scenarios where faster message delivery is needed, you can use Asio's parallel_group to initiate multiple simultaneous async_publish calls and wait for all of them to finish. The mqtt_client will optimize multiple publish operations by internally batching them, combining several packets into a single async_write call using scatter/gather techniques.

There’s another critical use case to consider. When QoS > 0, it indicates that you really want the server to guarantee receipt of your messages. In IoT environments, devices may restart, and relying solely on in-memory message queues is insufficient. A common approach is to first store the message in persistent storage (such as a file or SQLite database), then send it using async_publish, and once the completion handler is triggered, you delete the message from local storage.

If the device loses network connectivity, messages will accumulate in memory, while your application likely continues to generate new messages to be published later. Once connectivity is restored, it's common to send the queued messages as quickly as possible. In such cases, you can use parallel_group to accelerate delivery and take advantage of the MQTT client’s built-in optimizations (e.g., batching multiple messages together).

In all these scenarios, the user should be mindful of the potential absence of network connectivity and structure their code accordingly. It’s straightforward to track the number of messages (or bytes) queued by async_publish and then decrement that count when each async_publish completes.

> I am not completely sure since I have not deployed the library in production, but I think it would be helpful if async_publish would not invoke the completion handler until the outgoing queue is decongested. I'm not sure how to congestion would be defined but you as an expert of MQTT and with field experience could probably come up with the right definition.
>
> On the receiving side, the mqtt_client should not keep calling socket::async_read when there is a backlog of messages. Instead, it should wait until the caller has processed some or all of the backlog and only then resume calling async_read.
>
> This buffering of incoming and outgoing data was a huge problem in the websocketpp library which is one of the main motivations why I wrote beast::websocket.
>
>> Incoming messages are placed into an asio::basic_channel, with its limit set to std::numeric_limits<size_t>::max(), effectively making it unlimited. The program may crash if you subscribe to a topic and fail to consume the incoming messages. It’s important to note that the receiving side operates very differently from the publishing side, whose buffer depends solely on network bandwidth and availability. The receiving side only gets messages that have successfully traversed the network.
>

I realized I made an error in my previous explanations. In addition to the 64k limit on the sending queue, there is also a 64k limit on the receiving queue. However, this limit isn’t set directly on receiving channel. Instead, it’s managed through a custom, bounded internal queue that temporarily stores messages before they are delivered to the channel and eventually to the user. If the channel is full (i.e., no one is consuming messages), this bounded queue will discard the oldest message when a new one arrives, providing overflow protection on the receiving side as well.

> I already covered most of this above, and it is worth repeating here. The problem is not so much the number of elements placed into the basic_channel container, but rather that there is no backpressure. If mqtt_client just reads quickly in a loop without waiting for the caller to consume some data, then the peer believes that the TCP window should be as wide as possible. Thus defeating application-level flow control.
>
> What you might do is keep track of the number of received messages which are buffered, and that the caller has not yet received, and if that number goes over a configured threshold then do not call async_read on the socket again until the number goes back down.
>
> Another disclaimer, all of the above is theoretical and it could be completely wrong :)
>
>> We set the limit to std::numeric_limits<size_t>::max() because we couldn't come up with a reasonable lower value. The channel buffers received messages because there may be lengthy operations occurring between consecutive async_receive calls, during which several messages might arrive. However, it is the user's responsibility to pop these messages in a timely manner.
>
> On the receiving side, you might actually want an API that gives the caller complete control over when the low-level async_read is resumed. Because the caller has more information than you do. The caller knows, for every message it receives, whether it will process it immediately or whether it will become a long-running task.
>
> This one issue of application-level flow control should not prevent the library from acceptance if it is to be accepted, as it is functional and useful the way it is now. However, I think that exploring whether or not the current design meshes with application-level flow control, and if not then how the API might be improved, would be useful.
>
> Thanks
>

Thanks,
Ivica Siladic


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk