I'm trying to write a simple ASIO send/receiver that uses message queueing. The goal is to have a number of virtual streams over a single TCP connection. This is pretty simple using callbacks, but I can't figure out the coroutines alternative. Here is my simplified
sample code. Note that I'm using asio::experimental::promise the same way I would use std::promise which obviously doesn't work, but it illustrates what I'm trying to do. What is the simplest solution to my problem?
#pragma
once
#include "asio.hpp"
#include "asio/co_spawn.hpp"
#include "asio/detached.hpp"
#include "asio/experimental/co_composed.hpp"
#include "asio/experimental/promise.hpp"
#include "asio/use_awaitable.hpp"
#include <coroutine>
#include <cstdint>
#include <map>
#include <memory>
#include <queue>
struct
Message {
uint64_t
id;
uint64_t
streamId;
uint64_t
priority;
};
class
NetworkConnection {
public:
NetworkConnection(asio::io_context
&ioContext)
: ioContext_(ioContext),
sendTimer_(ioContext_),
receiveTimer_(ioContext_){};
asio::awaitable<std::unique_ptr<Message>>
receive(uint64_t
streamId) {
auto
promise
=
std::make_shared<
asio::experimental::promise<std::unique_ptr<Message>>>();
auto
future
=
promise->get_future();
receives_[streamId]
=
promise;
if (!receiving_)
{
receiving_
=
true;
co_spawn(ioContext_,
startReceiving(),
asio::detached);
}
co_return
co_await
future;
}
asio::awaitable<void>
send(std::unique_ptr<Message>
message) {
auto
promise
=
std::make_shared<asio::experimental::promise<void>>();
auto
future
=
promise->get_future();
sends_[message->id]
=
promise;
if (!sending_)
{
sending_
=
true;
co_spawn(ioContext_,
startSending(),
asio::detached);
}
co_await
future;
co_return;
}
private:
asio::io_context
&ioContext_;
asio::steady_timer
receiveTimer_;
asio::steady_timer
sendTimer_;
std::map<uint64_t,
std::shared_ptr<
asio::experimental::promise<std::unique_ptr<Message>>>>
receives_;
std::map<uint64_t,
std::shared_ptr<asio::experimental::promise<void>>>
sends_;
std::queue<std::unique_ptr<Message>>
sendQueue_;
bool
receiving_
=
false;
bool
sending_
=
false;
void
completeSend(uint64_t
messageId) {
auto
it
=
sends_.find(messageId);
if (it
!=
sends_.end()) {
it->second->set_value();
// This resumes the send operation
sends_.erase(it);
// Clean up
}
}
void
completeReceive(std::unique_ptr<Message>
message) {
auto
it
=
receives_.find(message->streamId);
if (it
!=
receives_.end()) {
it->second->set_value(
std::move(message));
// This resumes the send operation
sends_.erase(it);
// Clean up
}
}
asio::awaitable<void>
startReceiving() {
while (!receives_.empty())
{
// Simulate receiving a message
sendTimer_.expires_after(std::chrono::seconds(1));
co_await
sendTimer_.async_wait(asio::use_awaitable);
auto
message
=
std::make_unique<Message>();
message->id
=
1;
message->streamId
=
1;
message->priority
=
1;
completeReceive(std::move(message));
}
receiving_
=
false;
co_return;
}
asio::awaitable<void>
startSending() {
while (!sends_.empty())
{
sendTimer_.expires_after(std::chrono::seconds(1));
auto
message
=
std::move(sendQueue_.front());
sendQueue_.pop();
co_await
sendTimer_.async_wait(asio::use_awaitable);
completeSend(message->id);
}
sending_
=
false;
co_return;
}
};
class
Stream {
public:
Stream(std::shared_ptr<NetworkConnection>
connection)
: connection_(std::move(connection))
{}
asio::awaitable<std::unique_ptr<Message>>
receive() {
co_return
co_await
connection_->receive(id_);
}
asio::awaitable<void>
send(std::unique_ptr<Message>
message) {
message->streamId
=
id_;
co_return
co_await
connection_->send(std::move(message));
}
private:
uint64_t
id_;
std::shared_ptr<NetworkConnection>
connection_;
};