Boost logo

Boost Users :

From: Willem Mitchell (willem.mitchell_at_[hidden])
Date: 2024-04-06 00:01:31


I'm trying to write a simple ASIO send/receiver that uses message queueing. The goal is to have a number of virtual streams over a single TCP connection. This is pretty simple using callbacks, but I can't figure out the coroutines alternative. Here is my simplified sample code. Note that I'm using asio::experimental::promise the same way I would use std::promise which obviously doesn't work, but it illustrates what I'm trying to do. What is the simplest solution to my problem?

#pragma once
#include "asio.hpp"
#include "asio/co_spawn.hpp"
#include "asio/detached.hpp"
#include "asio/experimental/co_composed.hpp"
#include "asio/experimental/promise.hpp"
#include "asio/use_awaitable.hpp"
#include <coroutine>
#include <cstdint>
#include <map>
#include <memory>
#include <queue>
struct Message {
  uint64_t id;
  uint64_t streamId;
  uint64_t priority;
};

class NetworkConnection {
public:
  NetworkConnection(asio::io_context &ioContext)
      : ioContext_(ioContext), sendTimer_(ioContext_),
        receiveTimer_(ioContext_){};

  asio::awaitable<std::unique_ptr<Message>> receive(uint64_t streamId) {
    auto promise = std::make_shared<
        asio::experimental::promise<std::unique_ptr<Message>>>();
    auto future = promise->get_future();
    receives_[streamId] = promise;
    if (!receiving_) {
      receiving_ = true;
      co_spawn(ioContext_, startReceiving(), asio::detached);
    }
    co_return co_await future;
  }

  asio::awaitable<void> send(std::unique_ptr<Message> message) {
    auto promise = std::make_shared<asio::experimental::promise<void>>();
    auto future = promise->get_future();
    sends_[message->id] = promise;
    if (!sending_) {
      sending_ = true;
      co_spawn(ioContext_, startSending(), asio::detached);
    }
    co_await future;
    co_return;
  }

private:
  asio::io_context &ioContext_;
  asio::steady_timer receiveTimer_;
  asio::steady_timer sendTimer_;
  std::map<uint64_t, std::shared_ptr<
                         asio::experimental::promise<std::unique_ptr<Message>>>>
      receives_;
  std::map<uint64_t, std::shared_ptr<asio::experimental::promise<void>>> sends_;
  std::queue<std::unique_ptr<Message>> sendQueue_;
  bool receiving_ = false;
  bool sending_ = false;

  void completeSend(uint64_t messageId) {
    auto it = sends_.find(messageId);
    if (it != sends_.end()) {
      it->second->set_value(); // This resumes the send operation
      sends_.erase(it); // Clean up
    }
  }

  void completeReceive(std::unique_ptr<Message> message) {
    auto it = receives_.find(message->streamId);
    if (it != receives_.end()) {
      it->second->set_value(
          std::move(message)); // This resumes the send operation
      sends_.erase(it); // Clean up
    }
  }

  asio::awaitable<void> startReceiving() {
    while (!receives_.empty()) {
      // Simulate receiving a message
      sendTimer_.expires_after(std::chrono::seconds(1));
      co_await sendTimer_.async_wait(asio::use_awaitable);
      auto message = std::make_unique<Message>();
      message->id = 1;
      message->streamId = 1;
      message->priority = 1;
      completeReceive(std::move(message));
    }
    receiving_ = false;
    co_return;
  }

  asio::awaitable<void> startSending() {
    while (!sends_.empty()) {
      sendTimer_.expires_after(std::chrono::seconds(1));
      auto message = std::move(sendQueue_.front());
      sendQueue_.pop();
      co_await sendTimer_.async_wait(asio::use_awaitable);
      completeSend(message->id);
    }
    sending_ = false;
    co_return;
  }
};

class Stream {
public:
  Stream(std::shared_ptr<NetworkConnection> connection)
      : connection_(std::move(connection)) {}
  asio::awaitable<std::unique_ptr<Message>> receive() {
    co_return co_await connection_->receive(id_);
  }
  asio::awaitable<void> send(std::unique_ptr<Message> message) {
    message->streamId = id_;
    co_return co_await connection_->send(std::move(message));
  }

private:
  uint64_t id_;
  std::shared_ptr<NetworkConnection> connection_;
};



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net