Boost logo

Boost Users :

From: Mohammad Nejati [ashtum] (ashtumashtum_at_[hidden])
Date: 2023-12-15 19:26:36


What you want to achieve seems doable with:
https://www.boost.org/doc/libs/1_84_0/doc/html/boost_asio/reference/experimental__basic_channel.html
There is an example of it in Asio repository as well.
You might consider using the concurrent version if you are utilizing a
multi-threaded executor.

On Fri, Dec 15, 2023, 7:12 PM accelerator0099 via Boost-users <
boost-users_at_[hidden]> wrote:

> Consider the producer-consumer model:
>
> producer -+- consumer1
> +- consumer2
> +- consumer3
> +- consumer4
> +- consumer5
>
> The producer periodly push items into a queue, and those consumers wait
> on that queue for items
>
> It's easy to implement in a synchronous, multi-threaded environment: the
> producer just locks the queue and push into it, and notify one consumer
> through a condition variable. Consumers waiting on that condition
> variable will be woken one by one, popping items and consume them
>
> How to do those in a asynchronous environment?
>
> Consumers async_pop() on that queue, posting their completion tokens
> into the executor. When there are items available, the executor notify
> one consumer by calling its completion token
>
> Is it right?
>
> How do the producer tell the executor "The asynchronous operation is
> done, You should call that completion token"?
>
> My implemention of a possible async queue (based on timers):
>
> #include <boost/asio.hpp>
> #include <deque>
> #include <iostream>
>
> using namespace std::literals;
> namespace io = boost::asio;
> using io::awaitable;
> auto& uawait = io::use_awaitable;
>
> template<class D>
> awaitable<void> delay(const D& dur) {
> io::steady_timer tm{ co_await io::this_coro::executor };
> tm.expires_after(dur);
> co_await tm.async_wait(uawait);
> }
>
> template<class T>
> class aqueue {
> std::deque<T> mq;
> public:
> using reference = T&;
> using const_reference = const T&;
> size_t size() const noexcept { return mq.size(); }
> bool empty() const noexcept { return mq.empty(); }
>
> awaitable<T> async_pop() {
> while (empty()) {
> co_await delay(1ms);
> }
> auto t = std::move(mq.front());
> mq.pop_front();
> co_return t;
> }
>
> awaitable<void> async_push(T t) {
> mq.push_back(std::move(t));
> co_return;
> }
> };
>
> class consumer {
> public:
> const int id;
> aqueue<int>& queue;
>
> consumer(int _i, aqueue<int>& _q) : id{ _i }, queue{ _q } {}
> awaitable<void> operator()() {
> std::cout << "Consumer " << id << " started\n";
> auto i = co_await queue.async_pop();
> std::cout << "Consumer " << id << " got " << i << '\n';
> }
> };
>
> int main() {
> io::io_context ctx;
> aqueue<int> aq;
> auto producer = [&]() -> awaitable<void> {
> std::cout << "Producer started\n";
> for (int i{}; i != 5; ++i) {
> co_await delay(1s);
> co_await aq.async_push(i);
> }
> };
> io::co_spawn(ctx, producer, io::detached);
> for (int i{}; i != 5; ++i)
> io::co_spawn(ctx, consumer{ i, aq }, io::detached);
> std::cout << "RUN\n";
> ctx.run();
> return 0;
> }
>
> Is it a proper way to do so using timers? I think there should be a
> better way
>
>
> _______________________________________________
> Boost-users mailing list
> Boost-users_at_[hidden]
> https://lists.boost.org/mailman/listinfo.cgi/boost-users
>



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net