Boost logo

Boost Users :

From: Dmitry (dbnikiforov_at_[hidden])
Date: 2023-12-16 05:52:05


If the number of consumers equals to the number of threads in executor, you
can just post tasks to executor, with one item in one task.

Dmitry

чт, 14 дек. 2023 г., 12:44 accelerator0099 via Boost-users <
boost-users_at_[hidden]>:

> Consider the producer-consumer model:
>
> producer -+- consumer1
> +- consumer2
> +- consumer3
> +- consumer4
> +- consumer5
>
> The producer periodly push items into a queue, and those consumers wait
> on that queue for items
>
> It's easy to implement in a synchronous, multi-threaded environment: the
> producer just locks the queue and push into it, and notify one consumer
> through a condition variable. Consumers waiting on that condition
> variable will be woken one by one, popping items and consume them
>
> How to do those in a asynchronous environment?
>
> Consumers async_pop() on that queue, posting their completion tokens
> into the executor. When there are items available, the executor notify
> one consumer by calling its completion token
>
> Is it right?
>
> How do the producer tell the executor "The asynchronous operation is
> done, You should call that completion token"?
>
> My implemention of a possible async queue (based on timers):
>
> #include <boost/asio.hpp>
> #include <deque>
> #include <iostream>
>
> using namespace std::literals;
> namespace io = boost::asio;
> using io::awaitable;
> auto& uawait = io::use_awaitable;
>
> template<class D>
> awaitable<void> delay(const D& dur) {
> io::steady_timer tm{ co_await io::this_coro::executor };
> tm.expires_after(dur);
> co_await tm.async_wait(uawait);
> }
>
> template<class T>
> class aqueue {
> std::deque<T> mq;
> public:
> using reference = T&;
> using const_reference = const T&;
> size_t size() const noexcept { return mq.size(); }
> bool empty() const noexcept { return mq.empty(); }
>
> awaitable<T> async_pop() {
> while (empty()) {
> co_await delay(1ms);
> }
> auto t = std::move(mq.front());
> mq.pop_front();
> co_return t;
> }
>
> awaitable<void> async_push(T t) {
> mq.push_back(std::move(t));
> co_return;
> }
> };
>
> class consumer {
> public:
> const int id;
> aqueue<int>& queue;
>
> consumer(int _i, aqueue<int>& _q) : id{ _i }, queue{ _q } {}
> awaitable<void> operator()() {
> std::cout << "Consumer " << id << " started\n";
> auto i = co_await queue.async_pop();
> std::cout << "Consumer " << id << " got " << i << '\n';
> }
> };
>
> int main() {
> io::io_context ctx;
> aqueue<int> aq;
> auto producer = [&]() -> awaitable<void> {
> std::cout << "Producer started\n";
> for (int i{}; i != 5; ++i) {
> co_await delay(1s);
> co_await aq.async_push(i);
> }
> };
> io::co_spawn(ctx, producer, io::detached);
> for (int i{}; i != 5; ++i)
> io::co_spawn(ctx, consumer{ i, aq }, io::detached);
> std::cout << "RUN\n";
> ctx.run();
> return 0;
> }
>
> Is it a proper way to do so using timers? I think there should be a
> better way
>
>
> _______________________________________________
> Boost-users mailing list
> Boost-users_at_[hidden]
> https://lists.boost.org/mailman/listinfo.cgi/boost-users
>



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net