Boost logo

Boost :

From: Ruben Perez (rubenperez038_at_[hidden])
Date: 2024-10-20 13:03:02


> Now that I've looked at the implementation, it seems like the stream's
> async_write will use the executor bound to the first message found in
> the queue. This can yield surprising results. For instance:
>
> cli.async_publish(..., asio::bind_executor(ex1, tok1));
> cli.async_publish(..., asio::bind_executor(ex2, tok2)); // might complete on ex1
>
> I think having this limitation is dangerous, as it's not present in
> any other Asio-based library, and it's not enforced anywhere (as far
> as I've read).
>
> I think that this restriction can be lifted by making the stream's
> async_write a child agent of async_run. That would imply changing
> async_sender::async_send
> (https://github.com/mireo/async-mqtt5/blob/0b935bd1a2ba65427d1fb491aa1133b07c076cd6/include/async_mqtt5/impl/async_sender.hpp#L156)
> to push the message to the queue (as is today), then notifying a
> writer task (e.g. using a channel or a timer), then waiting for the
> writer task to complete the write (again using a channel or a timer).
> If I'm not mistaken, this is how Boost.Redis works.
>
>
> You are correct that the second handler might complete on ex1, and from that perspective, the code doesn't fully adhere to typical Asio standards. The issue can indeed be resolved as you suggested, either by using a channel or a timer, or possibly even more efficiently by dispatching the message to the writer queue using the stream's executor via asio::dispatch.
>
> However, conceptually, the right solution is still unclear. Technically, we understand how it can be done, but the question is more about the intended design. I recall Richard Hodges once explaining the role of a bound executor to a handler, stating that "the final handler, as well as all intermediate handlers of the async operation, will be invoked using the bound executor." While the initiation code of the async function may not use the bound executor, once the internal code calls another internal async function, every subsequent async function, including the final handler, will be executed using the originally bound executor.

I think the point here is the parent-child relationship of the
involved async agents. Read operations are children of async_run. When
an async operation involving only a read, like async_receive, is
invoked, it waits on its channel until the required message arrives.
It is relying on an internal async operation, but it does not
propagate its bound properties to it, because there's no parent-child
relationship between them. The channel serves as an isolator. I fully
agree on this being a good design.

I'd advise you to apply this same logic to writing. At the moment,
writing a message does not have a clear parent. If 2 messages are
being written concurrently, the parent is chosen at random. In my
opinion, writing should be symmetrical to reading: a child of
async_run. As you do with reading, writing should never be invoked
directly by async_publish, but maintained by async_run. This avoids
spuriously propagating properties.

> async_publish actually supports total cancellation. So, if you write
>
> async_publish(..., asio::cancel_after(5s, cancellation_type::total));
>
> it will work as you expect. Support for total cancellation type is quite complicated for PUBLISH messages and I've described the reasons on the Slack channel earlier. For the reference, I'll put the explanation here as well:

Oh, you're right. I debugged and got the (false) impression that total
signals were being blocked by a filter. Thanks for pointing that out.

>
> 12. I've noticed that the implementation doesn't use
> asio::async_compose, but has its own version. Is there a reason for
> it? Also, note that cancellable_handler is implemented as always
> having a bound executor, regardless of what the underlying handler has
> (note that initiating an operation with bind_executor(ex_default, tok)
> and with tok is not strictly equivalent, even if
> get_associated_executor(tok, ex_default) yields the same value).
>
>
> I wrote a rather lengthy explanation fo the reasons why we don't use async_compose in an PDF document shared by Klemens on the mailing list. Here's the link to it in case you've missed it:
>
> https://www.mireo.com/cdn/Async.MQTT5.pdf

I completely did, thanks for pointing it out. Note however that
async_compose is not tied to "coroutine like" behavior - you can
implement the same overloading scheme you currently have with
async_compose, too. I agree that it's under-documented, and that doing
custom cancellation handling is non-trivial, so you probably made the
right choice.

>
> cancellable_handler uses our internal tracking_executor type which is the result of the operation
> asio::prefer(
> asio::get_associated_executor(handler, ex),
> asio::execution::outstanding_work.tracked
> )
>
> It's here to prevent handler's execution context from running out of job. If handler doesn't have an associated executor it will be dispatched using default mqtt_client executor and, to my understanding, the above tracking executor will prevent client's execution context to run out of job (although this is not strictly necessary).

Yes, I understand that, and yes, it's required for correctness. You
might consider using asio::executor_work_guard, too, which is
compatible with old-style executors.

My point here is that, if you want cancellable_handler to propagate
the original handler's executor, immediate executor and allocator, the
usual practice is to specialize asio::associated_executor,
asio::associated_immediate_executor, and asio::associated_allocator.
For instance, and supposing that cancellable_handler::handler is the
original completion handler passed by the user:

template <class Handler, typename DefaultCandidate>
struct associated_executor<cancellable_handler<Handler>, DefaultCandidate>
    : associated_executor<Handler, DefaultCandidate>
{
    static typename associated_executor<Handler, DefaultCandidate>::type get(
        const cancellable_handler<Handler>& h
    ) noexcept
    {
        return associated_executor<Handler, DefaultCandidate>::get(h.handler);
    }

    static auto get(const cancellable_handler<Handler>& h, const
DefaultCandidate& c)
    {
        return associated_executor<Handler,
DefaultCandidate>::get(h.handler, c);
    }
};

Inheriting from associated_executor<Handler, DefaultCandidate> has the
effect that if the original handler didn't have an associated
executor, cancellable_handler won't have an associated executor,
either. There are some if constexpr's in Asio taking advantage of this
to make small optimizations. It probably doesn't make a difference in
your case though, so feel free to ignore my comment if you think it's
not worth it.

>
> 14. I've measured build times in the linked repo. Using clang-18 with
> C++23 on my machine, it takes 22s to build the sender in debug mode,
> and 30s in release mode. This is a little bit high IMO. Some
> observations here:
> - The client is unconditionally including Boost.Beast websocket, which
> causes a considerable overhead. I'd consider a way to forward-declare
> it or make it part of a separate header.
> - Instantiations of async functions are pretty heavyweight.
> asio::prepend is used a lot, which is not free. Did you experiment
> with/without it and measure the benefit?
> - The spirit headers/functions also appear in the flame graph
> (although I thought they would be much higher).
>
> Have you considered optimizations of compile times?
>
>
> We've already tried to minimize the inclusion of Boost.Beast stream (websocket) headers, but forward declarations didn't work well in this case. The code in connect_op uses if constexpr to generate the necessary boilerplate when the stream is a websocket, and unfortunately, we couldn't find a reasonable solution that avoids including the Boost.Beast stream headers.

Oh. I think this might be indicating that connect_op is too strongly
coupled to Boost.Beast websocket. I know that some guys in Boost are
developing another HTTP/websocket library, so you might end up with
messy code if this library ever gets traction.

I'd advise to split these chunks of code into a customizable trait.
You might choose two traits (tls_handshake_traits and
ws_connect_traits) or a single one (connect_traits). For instance:

// Trait declaration
template <class Stream>
struct ws_connect_traits : detail::no_ws_connect_traits {};

// Trait definition for Beast
template <class Stream>
struct ws_connect_traits<boost::beast::websocket::stream<Stream>> {
    template <class CompletionToken>
    auto async_handshake(boost::beast::websocket::stream<Stream>& ws,
const authority_path& ap, CompletionToken&&) {
          // what you currently have in do_ws_handshake
    }
};

You can now split the trait specialization to a separate, optional
websocket.hpp header, thus making Beast an optional peer dependency
(instead of a hard one). In this header, you can provide helper
typedefs to make things less verbose. E.g. using ws_client =
mqtt_client<boost::beast::websocket::stream<boost::asio::ip::tcp>>;

If you combine this with my comment on placing TLS hooks in optional
headers, you'll end up with 4 optional headers, with a typedef each
(i.e. tcp_client, tcp_tls_client, ws_client, ws_tls_client). I think
this is great for usability.

>
> You are correct that the use of asio::prepend, consign, and append is the main contributor to the longer-than-desired compilation times. These constructs are fundamental to our code since we don't use async_compose, and we're unsure how to avoid them in these scenarios.

I'd advise to experiment with a callback wrapper that propagates
associated properties with asio::associator. For example:

// The type of handler to be passed to async operations. This replaces
asio::append and asio::prepend
template <class Op, class... Args>
struct intermediate_handler
{
    // The operation object, whose associated properties will be propagated
    Op op;

    // The bound arguments. This works like std::bind_front, so bound
arguments should come first
    std::tuple<Args...> args;

    // Helper to invoke the handler
    template <class... SuppliedArgs, std::size_t... I>
    void invoke_impl(std::index_sequence<I...>, SuppliedArgs&&... supplied_args)
    {
        std::move(op)(std::get<I>(std::move(args))...,
std::forward<SuppliedArgs>(supplied_args)...);
    }

    // Invoke the handler
    template <class... SuppliedArgs>
    void operator()(SuppliedArgs&&... supplied_args)
    {
        invoke_impl(
            std::make_index_sequence<sizeof...(Args)>{},
            std::forward<SuppliedArgs>(supplied_args)...
        );
    }
};

// Factory function
template <class Op, class... Args>
auto make_intermediate_handler(Op&& op, Args&&... args)
{
    return intermediate_handler<std::decay_t<Op>, std::decay_t<Args>...>{
        std::move(op),
        {std::forward<Args>(args)...}
    };
}

// Ensure that our handler propagates associated properties (executor,
allocator...)
template <template <typename, typename> class Associator, class Op,
class... Args, class DefaultCandidate>
struct boost::asio::associator<Associator, intermediate_handler<Op,
Args...>, DefaultCandidate>
    : Associator<Op, DefaultCandidate>
{
    static typename Associator<Op, DefaultCandidate>::type get(const
intermediate_handler<Op, Args...>& h
    ) noexcept
    {
        return Associator<Op, DefaultCandidate>::get(h.op);
    }

    static auto get(const intermediate_handler<Op, Args...>& h, const
DefaultCandidate& c) noexcept
    {
        return Associator<Op, DefaultCandidate>::get(h.op, c);
    }
};

// Taking connect_op::on_connect as an example, this:
lowest_layer(_stream).async_connect(
    *std::begin(eps),
    asio::append(
        asio::prepend(std::move(*this), on_connect {}),
        *std::begin(eps), std::move(ap)
    )
);

// Can be replaced by
lowest_layer(_stream).async_connect(
    *std::begin(eps),
    make_intermediate_handler(
        std::move(*this), on_connect {}, *std::begin(eps), std::move(ap)
    )
);

It probably requires you to re-arrange callback args in some functions, though.

I don't know whether this provides substantial gains. But may be worth trying.

Regards,
Ruben.


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk