Boost logo

Boost :

From: Ivica Siladic (ivica.siladic_at_[hidden])
Date: 2024-10-19 17:35:23


Hi Ruben,

thanks a lot again for you comments and questions. Our comments are inlined:

> On 19.10.2024., at 15:08, Ruben Perez <rubenperez038_at_[hidden]> wrote:
>
> Hi Ivica,
>
> Thanks for your detailed answer. I understand and agree with most of
> your points. I'd like to discuss some of them further:
>
>> 3. Quoting the executors section
>> (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/executors.html):
>> "The same executor must execute mqtt_client::async_run and all the
>> subsequent async_xxx operations". This sounds like an unusual
>> restriction. What happens if it is violated? Wouldn't it make more
>> sense to use the executor associated with async_run for intermediate
>> handlers, and then the async_xxx associated executor for the final
>> one?
>>
>>
>> The `async_run` function starts the internal stream read loop. The executor used for the read operation is either the one associated with the `async_run` completion handler or the default executor provided to the `mqtt_client` constructor.
>>
>> Asynchronous operations like `async_publish` are composed operations that involve both writing to stream and reading messages. These operations must be synchronized with the read loop. If we were to use a different executor for `async_publish` than the one used for the internal read loop, we would need to synchronize access to shared data (like read buffers) and, more importantly, to the underlying stream’s `async_read` and `async_write` operations. This is why the same executor must be used for both `async_run` and other asynchronous operations.
>
> Now that I've looked at the implementation, it seems like the stream's
> async_write will use the executor bound to the first message found in
> the queue. This can yield surprising results. For instance:
>
> cli.async_publish(..., asio::bind_executor(ex1, tok1));
> cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1
>
> I think having this limitation is dangerous, as it's not present in
> any other Asio-based library, and it's not enforced anywhere (as far
> as I've read).
>
> I think that this restriction can be lifted by making the stream's
> async_write a child agent of async_run. That would imply changing
> async_sender::async_send
> (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c076cd6/include/async_mqtt5/impl/async_sender.hpp#L156)
> to push the message to the queue (as is today), then notifying a
> writer task (e.g. using a channel or a timer), then waiting for the
> writer task to complete the write (again using a channel or a timer).
> If I'm not mistaken, this is how Boost.Redis works.
>

You are correct that the second handler might complete on ex1, and from that perspective, the code doesn't fully adhere to typical Asio standards. The issue can indeed be resolved as you suggested, either by using a channel or a timer, or possibly even more efficiently by dispatching the message to the writer queue using the stream's executor via asio::dispatch.

However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.

This leads to a classic problem with asynchronous "services" that need to execute internal tasks using a strand-like executor, ensuring serialized, lock-free behavior. In such cases, it's necessary to call asio::post or asio::dispatch from the initiation function to the inner code, perform the work within the strand, and finally invoke the handler using asio::dispatch. However, Asio's operates under the assumption that single-threaded code shouldn't pay the performance cost of multithreaded code. In other words, if we call a series of async functions on Asio objects from the same thread, we are guaranteed to avoid any executor switches during execution. If, however, we intend to use multithreaded code with Asio async objects, it is up to us, the users, to explicitly dispatch the async function calls within a strand, whether implicit or explicit.

In our design, mqtt_client essentially operates within an implicit strand. Regarding your original question—why "The same executor must execute mqtt_client::async_run and all subsequent async operations"—this is why we can take the first executor from the queue. In fact, all executors of the handlers in the queue are the same, including the stream's executor. This would explain why your example would be considered ill-formed based on the documentation.

> Some follow-up questions:
>
> 9. I've written two small clients
> (https://github.com/anarthal/async-mqtt5-test). One simulates reading
> a couple of sensors, at different frequencies, and publishes measures
> to a topic. The other subscribes to the topics and prints the
> measures. In the sender, I've written code like this:
>
> // Wait loop
> auto next_tp = std::chrono::steady_clock::now();
> while (true)
> {
> // Read the sensor
> double measure = sensor_to_read.read();
>
> // Publish the measure. Don't co_await it, because we want to
> // keep reading the sensor even if the network is unavailable
> cli.async_publish<mqtt::qos_e::at_most_once>(
> std::string(sensor_name),
> std::to_string(measure),
> mqtt::retain_e::yes,
> mqtt::publish_props{},
> [](error_code ec) {
> if (ec)
> std::cout << "Error during publish: " << ec << std::endl;
> }
> );
>
> // Use a timer to wait until the next measure is due (omitted for brevity)
> }
>
> There are two things in my code that make me feel uncomfortable:
> a. I'd like to somehow detect and break from the loop if async_publish
> had an immediate error. I could use a channel to solve that.
> b. If network problems persist, resource consumption is going to
> increase until the system collapses. A ring-buffer strategy would be
> the most common solution here, but I don't see how to do it easily. I
> could use a channel as a counter semaphore to prevent more than N
> measurements from being in flight for each sensor.
>
> How would you write this use case? Are there alternative ways I
> haven't considered?
>

a) A quick and dirty solution could look like this:

bool should_continue = true;
while (should_continue)
{
    // Read the sensor
    double measure = sensor_to_read.read();

    // Publish the measurement. Don't co_await the result, as we want
    // to continue reading the sensor even if the network is unavailable.
    cli.async_publish<mqtt::qos_e::at_most_once>(
        std::string(sensor_name),
        std::to_string(measure),
        mqtt::retain_e::yes,
        mqtt::publish_props{},
        [&should_continue, &timer](error_code ec) {
            if (ec) {
                should_continue = false;
                timer.cancel();
                std::cout << "Error during publish: " << ec << std::endl;
            }
        }
    );

    // Use a timer to wait until the next measurement is due (omitted for brevity)
}

However, in a more realistic scenario, you likely wouldn't use a while (true) loop, since it would prevent clean program exit. A better approach would be to run a "recursive" timer, where async_publish is called from within the timer's completion handler. In the event of an error, the completion handler for async_publish could call cancel() on the timer if needed.

b) Resource consumption will not grow indefinitely. The client will attempt to send up to 65,535 packets, after which async_publish will immediately return an error code (pid_overrun), as it will run out of Packet Identifiers.
If you want to impose stricter limits, you can implement a more complex solution on the user's side. For example, you could track the number of bytes (or messages) sent by incrementing a counter each time you call async_publish and decrementing it in the completion handler of async_publish. Before attempting to publish, you should check whether the number of in-flight messages has reached your limit, and if so, simply skip publishing until some messages are acknowledged.

> 10. I find the broker function signature
> (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/brokers.html)
> surprising. Is there a reason for that particular signature, instead
> of a vector<string>, for example?
>

In the vast majority of cases, there will be only one broker, so creating a std::vector<std::string> with a single element would introduce unnecessary overhead. If multiple brokers are involved, they are either configured in a .conf file or hard-coded. In the case of a configuration file, it's simpler to pass the entire broker string directly to the mqtt_client rather than parsing the string, splitting it into substrings, and filling a vector. If the brokers are hard-coded, then this is irrelevant anyway.

> 11. Per-operation cancellation in async_publish may yield surprising
> results. For instance:
>
> cli.async_publish(..., asio::cancel_after(5s));
>
> Will end up tearing down the entire client. I'm not saying this is
> wrong (it actually makes sense), but it can be surprising.
>
> As an alternative, if you end up implementing my suggestion for (3),
> you can make async_publish support total cancellation (as writing is
> now a child of async_run). I'd also consider stopping the potential
> retransmission of the PUBLISH packet for QOS0 (and maybe QOS1)
> messages for which a cancellation has been requested (as you do in
> async_subscribe, for instance).
>

async_publish actually supports total cancellation. So, if you write

async_publish(..., asio::cancel_after(5s, cancellation_type::total));

it will work as you expect. Support for total cancellation type is quite complicated for PUBLISH messages and I've described the reasons on the Slack channel earlier. For the reference, I'll put the explanation here as well:

In MQTT, the implementation of Quality of Service (QoS), which can be 0, 1, or 2, introduces additional considerations. For QoS 0, no acknowledgment from the broker is expected. However, with QoS 1, the broker must send a PUBACK packet. For QoS 2, the client sends a PUBLISH packet, waits for the broker’s PUBREC, then sends PUBREL, and finally waits for the broker’s PUBCOMP packet.

Now, if the client sends a PUBLISH packet with QoS 2, it must respond with PUBREL after receiving PUBREC from the broker. If the client fails to send PUBREL, the broker will consider the connection corrupted and will close the TCP link.

This means that even if a single async_publish operation is canceled with the total cancellation type, the client must continue to communicate with the broker according to MQTT protocol requirements. Otherwise, the connection could close, causing all other messages to be dropped and requiring them to be resent.

A similar issue occurs with Boost.Beast’s WebSocket connections: to support total cancellation of an asynchronous WebSocket operation, specific bytes may need to be sent even after cancellation.

This behavior necessitates a different approach to cancellation. The outer handler's cancellation slot (if any) cannot be directly propagated to the stream's async_write. Instead, stream cancellation is managed independently of the outer handler, which only supports the terminal cancellation type. This occurs, for example, when the client is stopped with cancel() or when mqtt_client's destructor is called.

When you cancel an async_publish handler with the total cancellation type, the operation intercepts the cancellation, marks itself as "canceled," returns an error code of operation_aborted to the outer handler, and continues to exchange messages with the broker as required by the protocol.

> 12. I've noticed that the implementation doesn't use
> asio::async_compose, but has its own version. Is there a reason for
> it? Also, note that cancellable_handler is implemented as always
> having a bound executor, regardless of what the underlying handler has
> (note that initiating an operation with bind_executor(ex_default, tok)
> and with tok is not strictly equivalent, even if
> get_associated_executor(tok, ex_default) yields the same value).
>

I wrote a rather lengthy explanation fo the reasons why we don't use async_compose in an PDF document shared by Klemens on the mailing list. Here's the link to it in case you've missed it:

https://www.mireo.com/cdn/Async.MQTT5.pdf

cancellable_handler uses our internal tracking_executor type which is the result of the operation
asio::prefer(
        asio::get_associated_executor(handler, ex),
        asio::execution::outstanding_work.tracked
)

It's here to prevent handler's execution context from running out of job. If handler doesn't have an associated executor it will be dispatched using default mqtt_client executor and, to my understanding, the above tracking executor will prevent client's execution context to run out of job (although this is not strictly necessary).

> 13. I've noticed the usage of std::vector (as opposed to
> boost::core::span) in many APIs (e.g.
> https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/async_subscribe.html).
> Is it for safety during deferred initiation?
>

To be honest, we completely overlooked the existence of boost::core::span. Since std::span is part of C++20 and we wanted to maintain support for C++17, we opted to use const std::vector&. However, it should be perfectly safe to replace all instances of const std::vector& with boost::core::span.

> 14. I've measured build times in the linked repo. Using clang-18 with
> C++23 on my machine, it takes 22s to build the sender in debug mode,
> and 30s in release mode. This is a little bit high IMO. Some
> observations here:
> - The client is unconditionally including Boost.Beast websocket, which
> causes a considerable overhead. I'd consider a way to forward-declare
> it or make it part of a separate header.
> - Instantiations of async functions are pretty heavyweight.
> asio::prepend is used a lot, which is not free. Did you experiment
> with/without it and measure the benefit?
> - The spirit headers/functions also appear in the flame graph
> (although I thought they would be much higher).
>
> Have you considered optimizations of compile times?
>

We've already tried to minimize the inclusion of Boost.Beast stream (websocket) headers, but forward declarations didn't work well in this case. The code in connect_op uses if constexpr to generate the necessary boilerplate when the stream is a websocket, and unfortunately, we couldn't find a reasonable solution that avoids including the Boost.Beast stream headers.

You are correct that the use of asio::prepend, consign, and append is the main contributor to the longer-than-desired compilation times. These constructs are fundamental to our code since we don't use async_compose, and we're unsure how to avoid them in these scenarios.

As you pointed out, Boost.Spirit doesn't significantly impact compilation times. This is because we use it in a very "lightweight" manner that avoids generating too many expression template instantiations.

We’ve done already everything that could come up to our minds to reduce compilation time. There’s certainly some more room but we just don’t know at the moment how to do it.

> Kind regards,
> Ruben.

With regards,

Ivica SIladic


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk