Boost logo

Boost :

From: Ruben Perez (rubenperez038_at_[hidden])
Date: 2024-10-19 13:08:19


Hi Ivica,

Thanks for your detailed answer. I understand and agree with most of
your points. I'd like to discuss some of them further:

> 3. Quoting the executors section
> (https://spacetime.mireo.com/async-mqtt5/async_mqtt5/asio_compliance/executors.html):
> "The same executor must execute mqtt_client::async_run and all the
> subsequent async_xxx operations". This sounds like an unusual
> restriction. What happens if it is violated? Wouldn't it make more
> sense to use the executor associated with async_run for intermediate
> handlers, and then the async_xxx associated executor for the final
> one?
>
>
> The `async_run` function starts the internal stream read loop. The executor used for the read operation is either the one associated with the `async_run` completion handler or the default executor provided to the `mqtt_client` constructor.
>
> Asynchronous operations like `async_publish` are composed operations that involve both writing to stream and reading messages. These operations must be synchronized with the read loop. If we were to use a different executor for `async_publish` than the one used for the internal read loop, we would need to synchronize access to shared data (like read buffers) and, more importantly, to the underlying stream’s `async_read` and `async_write` operations. This is why the same executor must be used for both `async_run` and other asynchronous operations.

Now that I've looked at the implementation, it seems like the stream's
async_write will use the executor bound to the first message found in
the queue. This can yield surprising results. For instance:

cli.async_publish(..., asio::bind_executor(ex1, tok1));
cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1

I think having this limitation is dangerous, as it's not present in
any other Asio-based library, and it's not enforced anywhere (as far
as I've read).

I think that this restriction can be lifted by making the stream's
async_write a child agent of async_run. That would imply changing
async_sender::async_send
(https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c076cd6/include/async_mqtt5/impl/async_sender.hpp#L156)
to push the message to the queue (as is today), then notifying a
writer task (e.g. using a channel or a timer), then waiting for the
writer task to complete the write (again using a channel or a timer).
If I'm not mistaken, this is how Boost.Redis works.

Some follow-up questions:

9. I've written two small clients
(https://github.com/anarthal/async-mqtt5-test). One simulates reading
a couple of sensors, at different frequencies, and publishes measures
to a topic. The other subscribes to the topics and prints the
measures. In the sender, I've written code like this:

// Wait loop
auto next_tp = std::chrono::steady_clock::now();
while (true)
{
    // Read the sensor
    double measure = sensor_to_read.read();

    // Publish the measure. Don't co_await it, because we want to
    // keep reading the sensor even if the network is unavailable
    cli.async_publish<mqtt::qos_e::at_most_once>(
        std::string(sensor_name),
        std::to_string(measure),
        mqtt::retain_e::yes,
        mqtt::publish_props{},
        [](error_code ec) {
            if (ec)
                std::cout << "Error during publish: " << ec << std::endl;
        }
    );

    // Use a timer to wait until the next measure is due (omitted for brevity)
}

There are two things in my code that make me feel uncomfortable:
a. I'd like to somehow detect and break from the loop if async_publish
had an immediate error. I could use a channel to solve that.
b. If network problems persist, resource consumption is going to
increase until the system collapses. A ring-buffer strategy would be
the most common solution here, but I don't see how to do it easily. I
could use a channel as a counter semaphore to prevent more than N
measurements from being in flight for each sensor.

How would you write this use case? Are there alternative ways I
haven't considered?

10. I find the broker function signature
(https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/brokers.html)
surprising. Is there a reason for that particular signature, instead
of a vector<string>, for example?

11. Per-operation cancellation in async_publish may yield surprising
results. For instance:

cli.async_publish(..., asio::cancel_after(5s));

Will end up tearing down the entire client. I'm not saying this is
wrong (it actually makes sense), but it can be surprising.

As an alternative, if you end up implementing my suggestion for (3),
you can make async_publish support total cancellation (as writing is
now a child of async_run). I'd also consider stopping the potential
retransmission of the PUBLISH packet for QOS0 (and maybe QOS1)
messages for which a cancellation has been requested (as you do in
async_subscribe, for instance).

12. I've noticed that the implementation doesn't use
asio::async_compose, but has its own version. Is there a reason for
it? Also, note that cancellable_handler is implemented as always
having a bound executor, regardless of what the underlying handler has
(note that initiating an operation with bind_executor(ex_default, tok)
and with tok is not strictly equivalent, even if
get_associated_executor(tok, ex_default) yields the same value).

13. I've noticed the usage of std::vector (as opposed to
boost::core::span) in many APIs (e.g.
https://spacetime.mireo.com/async-mqtt5/async_mqtt5/ref/mqtt_client/async_subscribe.html).
Is it for safety during deferred initiation?

14. I've measured build times in the linked repo. Using clang-18 with
C++23 on my machine, it takes 22s to build the sender in debug mode,
and 30s in release mode. This is a little bit high IMO. Some
observations here:
- The client is unconditionally including Boost.Beast websocket, which
causes a considerable overhead. I'd consider a way to forward-declare
it or make it part of a separate header.
- Instantiations of async functions are pretty heavyweight.
asio::prepend is used a lot, which is not free. Did you experiment
with/without it and measure the benefit?
- The spirit headers/functions also appear in the flame graph
(although I thought they would be much higher).

Have you considered optimizations of compile times?

Kind regards,
Ruben.


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk