Boost logo

Boost :

Subject: [boost] [asio] Order of asynchronous messages
From: Florian Lindner (mailinglists_at_[hidden])
Date: 2018-10-01 15:23:19


Hello,

in a distributed application (only local are network) we are experiencing issues which *may* stem from the order of asychronous messages sent over sockets.

We create a send buffer:

    auto buffer = std::make_shared<std::vector<double>>();

and fill it: buffer.push_back(some_data).

This buffer is then given to our communication routine

    request = aSend(*buffer, recipient);

to keep the send buffer alive we place it in a class attribute:

    bufferedRequests.emplace_back(request, buffer);

The aSend method from above works like (asio is boost asio, of course)

PtrRequest aSend(std::vector<double> const & itemsToSend, int recipient)
{
  PtrRequest request(new SocketRequest);

  asio::async_write(*_sockets[recipient],
                    asio::buffer(itemsToSend),
                    [request](boost::system::error_code const &, std::size_t) {
                      std::static_pointer_cast<SocketRequest>(request)->complete();
                    });
  return request;
}

This way, we can check for completion using request->test(), which is set from the callback request->complete().

For checking of completed requests we have a loop, which is called reqularly

void checkBufferedRequests(bool blocking)
{
  do {
    for (auto it = bufferedRequests.begin(); it != bufferedRequests.end();) {
      if (it->first->test())
        it = bufferedRequests.erase(it);
      else
        ++it;
    }
    if (bufferedRequests.empty())
      return;
    if (blocking)
      std::this_thread::yield(); // give up our time slice, so communication way work
  } while (blocking);
}

which operates on bufferedRequests, that is a list<pair<PtrRequest, PtrBuffer>>.

We use that, so that the sending peer is not blocked by a slow receiving peer.

The sockets are created like

   tcp::acceptor acceptor(*_ioService);

    {
      tcp::endpoint endpoint(tcp::v4(), _portNumber);

      acceptor.open(endpoint.protocol());
      acceptor.set_option(tcp::acceptor::reuse_address(_reuseAddress));
      acceptor.bind(endpoint);
      acceptor.listen();
    }

Now we are having problems with mangled data that are unexplainable right now.

- Is the order of requests guaranteed to be preserved? I think so, based that asio works on TCP.

- What happens if there is another read or write performed on the socket. I think that there is no way to guarantee that this does not interfere with the order of the asynchronous send or queued requests.

Last but not least: Do you see any potential problems with that code?

Thanks a lot!
Florian


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk