I encountered this problem too.

Chris recommended using a steady_timer to construct an async semaphore. I wasn't happy with that, so wrote an async_semaphore type.

Klemens then picked up on that and refined it.

https://github.com/klemens-morgenstern/sam


On Tue, 23 May 2023 at 00:44, Suraaj K S via Boost-users <boost-users@lists.boost.org> wrote:
Hi all,

In boost-asio, I realized that there was no easy way to have something that resembles a condition variable.

However, I realized that I could get something very similar using stackful coroutines (https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/overview/core/spawn.html), while using an io_context as a 'queue' for completion tokens. Below is my approach:

#include "asio_types.hpp"
#include <atomic>
#include <iostream>

class async_pending_queue {
public:
  async_pending_queue()
      : pending_handlers_(0), strand_(pending_queue_), wg_(asio::make_work_guard(pending_queue_)) {}

  template <typename CompletionToken>
  auto async_submit(
      CompletionToken &&token,
      std::function<void(void)> atomic_action = [] {}) {
    auto init = [this, &atomic_action](auto completion_handler) {
      auto posted_lambda = [handler = std::move(completion_handler),
                            this]() mutable {
        pending_handlers_--;
        asio_sys_err ec;
        handler(ec);
        };

      post(strand_,std::move(posted_lambda));

      pending_handlers_++;
      atomic_action();
    };

    return asio::async_initiate<CompletionToken, void(asio_sys_err)>(init,
                                                                     token);
  }

  int pending_count() { return pending_handlers_.load(); }

  // It may not run 1 and run 0
  bool try_run_one() {
    auto cnt = pending_queue_.poll_one();
    std::cout << "completion token result" << cnt << std::endl;
    bool ret = (cnt == 1);
    return ret;
  }

private:
  std::atomic<unsigned int> pending_handlers_;
  asio_ioctx pending_queue_;
  asio_ioctx::strand strand_;
  decltype(asio::make_work_guard(pending_queue_)) wg_;
};

Here, one simply uses calls `my_async_pending_queue.async_submit(yield)`, if calling from a stackful coroutine. The coroutine can be continued by calling `my_async_pending_queue.try_run_one()`.

Using this, I wanted to build a 'memory checker'. It has two functions -> `request_space` and `free_space`. A coroutine calls `request_space`, which may block if there is no space left. Meanwhile, another thread / coroutine can call `free_space`, which will run blocked coroutines if possible.

I built a toy memory checker wrapper as follows:
#ifndef MEM_CHECK_HPP
#define MEM_CHECK_HPP

#include <cstddef>
#include <mutex>
#include <queue>

#include <boost/asio.hpp>
#include <boost/asio/spawn.hpp>

namespace asio = boost::asio;
using asio_ioctx = asio::io_context;
using asio_sys_err = boost::system::error_code;
using asio::yield_context;

#include "async_pending_queue.hpp"

class MemoryChecker {
public:
  using bytes_cnt = size_t;

  MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem = 1024, bytes_cnt initial_fill = 0);

  // This requests some space, and possibly yields;
  void request_space(bytes_cnt cnt, yield_context yield);
  void free_space(bytes_cnt cnt);

private:
  bytes_cnt get_available_mem();

  const bytes_cnt total_mem_;
  bytes_cnt mem_used_;
  std::queue<bytes_cnt> request_queue_;
  async_pending_queue request_routines_;
  std::mutex lock_;
  asio_ioctx::strand fifo_strand_;
  asio_ioctx &completion_ioctx_;
};



inline MemoryChecker::MemoryChecker(asio_ioctx &ioctx, bytes_cnt total_mem,
                             bytes_cnt initial_fill)
    : total_mem_(total_mem), mem_used_(initial_fill), request_queue_{}, lock_{},
      request_routines_{}, completion_ioctx_(ioctx), fifo_strand_(ioctx) {}

inline MemoryChecker::bytes_cnt MemoryChecker::get_available_mem() {
  assert(total_mem_ >= mem_used_);
  return total_mem_ - mem_used_;
}

inline void MemoryChecker::request_space(bytes_cnt cnt, yield_context yield) {
  // if (cnt > total_mem_) throw logic_error
  lock_.lock();
  assert(cnt <= total_mem_);
  assert(cnt > 0);

  if (request_queue_.empty()) {
    assert(request_routines_.pending_count() == 0);
    if (get_available_mem() >= cnt) {
      // We bypass the pending queue
      mem_used_ += cnt;
      lock_.unlock();
      return;
    }
  }

  assert(request_queue_.size() == request_routines_.pending_count());

  std::cout << "Pushing " << cnt << std::endl;
  request_queue_.push(cnt);

  auto wg = asio::make_work_guard(completion_ioctx_);
  request_routines_.async_submit(yield, [this] { lock_.unlock(); });


  auto oldest_req{request_queue_.front()};
  assert(cnt == oldest_req);
  request_queue_.pop();
 
  mem_used_ += cnt;
  assert(request_queue_.size() == request_routines_.pending_count());
  asio::post(fifo_strand_, yield);
}

inline void MemoryChecker::free_space(bytes_cnt cnt) {
  {
    std::lock_guard<std::mutex> lg{lock_};
    mem_used_ -= cnt;
    // Here, we own the lock, and free as many coroutines as we can
    while (true) {
      if (request_queue_.size() == 0) {
        std::cout << "No pending requests. Bailing" << std::endl;
        break;
      }

      assert(request_queue_.size() == request_routines_.pending_count());

      auto oldest_req{request_queue_.front()};
      auto available_mem{get_available_mem()};

      if (available_mem < oldest_req) {
        std::cout << "Oldest request is larger than available_mem. Bailing" << std::endl;
        break;
      }
      assert(request_routines_.try_run_one() == true);
    }
  }
}

#endif /* MEM_CHECK_HPP */



Here is a test program that can run it:

#include "mem_check.hpp"
#include <thread>
#include <unistd.h>

constexpr size_t mc_size{4};
asio_ioctx ioctx;
size_t total{0};
MemoryChecker mc{ioctx, mc_size};

void requestor_coroutine(size_t rq,yield_context yield) {
  asio::steady_timer t(ioctx);
  while (true) {
    total += rq;
    mc.request_space(rq, yield);
    std::cout << "Got requested space ";

    asio_sys_err ec;
  }
}

int main() {
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(1,yield); });
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(2,yield); });
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(3,yield); });
  asio::spawn(ioctx, [](yield_context yield) { requestor_coroutine(4,yield); });

  std::thread t([] { ioctx.run(); });

  while (true) {
    getchar();
    std::cout << total << std::endl;
    if (total > 0) {
      std::cout << "freeing" << std::endl;
      mc.free_space(1);
      total -= 1;
    }
  }
  t.join();
}


Finally, the problem we face is as follows. When we run the program, the assertion `total_mem >= mem_used_` fails. On some further investigation, I realized that our completion token was being called even when we do not call `try_run_one`, which was very weird.

Finally, somewhat more surprisingly, If I replace post(stand_,std::move(posted_lambda)); by  post(pending_queue_,std::move(posted_lambda));, things seem to work. However, the asio documentation says that only strands guarantee a FIFO execution order. I am not sure if using a simple `io_context` will work as a FIFO queue (even though it seems to in these examples).

Any inputs would be helpful - I am happy to hear the problem in this implementation, as well as other implementations (for example, using a std::queue as a proper queue instead of this io_context hack).

This question has also been posted here: https://stackoverflow.com/questions/76310252/c-boost-asio-building-a-conditional-variable-using-io-context-strand.

Thanks,
Suraaj
_______________________________________________
Boost-users mailing list
Boost-users@lists.boost.org
https://lists.boost.org/mailman/listinfo.cgi/boost-users