Boost logo

Boost :

From: Christopher Kohlhoff (chris_at_[hidden])
Date: 2005-12-17 06:56:14


Hi Arkadiy,

--- Arkadiy Vertleyb <vertleyb_at_[hidden]> wrote:
> "Christopher Kohlhoff" <chris_at_[hidden]> wrote
> > With asio, I was hoping to make asynchronous operations as
> > natural and easy to use as possible, compared to their
> > synchronous counterparts.
>
> I am afraid this might be impossible :-( People usually
> make/think about things sequentially.

I am more optimistic about people's abilities.

> "1 2 3 4" is much more intuitive than "1 2(call 3 when
> finished(and 3 will schedule 4).

What makes you think that the above approach is the only way to
use asynchronicity? :)

It has been stated elsewhere in this thread that applications
will generally either be synchronous or asynchronous. However, I
mentioned in an earlier email that asio's design lets you
develop mixed-mode programs.

Your entire program does not have to centre around a call to
demuxer::run() to benefit from the asynchronous operations. A
predominantly synchronous program can still make use of
asynchronous calls when concurrency is required.

I have written an example (attached) for a protocol I will call
AAoIP (ASCII Art over IP). This protocol uses a TCP control
connection for managing subscriptions, and UDP for delivering
the "frame" data.

The client program is written synchronously and implements port
hopping (like Skype). To ensure uninterrupted rendering of the
ASCII art, the client uses asynchronous operations to continue
to receive the frames while the (potentially long running) port
renegotiation is in progress. The asynchronous operations are
only used for the port renegotiation. Overall, the program flow
is still synchronous.

Concurrency without threads is what it's about.

Cheers,
Chris

#include <boost/asio.hpp>
#include <boost/lambda/lambda.hpp>
#include <boost/lambda/bind.hpp>
#include <boost/lambda/if.hpp>
#include <boost/shared_ptr.hpp>
#include <algorithm>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <string>
#include "protocol.hpp"

using namespace boost;

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 3)
    {
      std::cerr << "Usage: client <host> <port>\n";
      return 1;
    }
    using namespace std; // For atoi.
    std::string host_name = argv[1];
    unsigned short port = atoi(argv[2]);

    asio::demuxer demuxer;

    // Determine the location of the server.
    asio::ipv4::host_resolver host_resolver(demuxer);
    asio::ipv4::host host;
    host_resolver.get_host_by_name(host, host_name);
    asio::ipv4::tcp::endpoint remote_endpoint(port, host.address(0));

    // Establish the control connection to the server.
    asio::stream_socket control_socket(demuxer);
    control_socket.connect(remote_endpoint);

    // Create a datagram socket to receive data from the server.
    shared_ptr<asio::datagram_socket> data_socket(
        new asio::datagram_socket(demuxer, asio::ipv4::udp::endpoint(0)));

    // Determine what port we will receive data on.
    asio::ipv4::udp::endpoint data_endpoint;
    data_socket->get_local_endpoint(data_endpoint);

    // Ask the server to start sending us data.
    control_request start = control_request::start(data_endpoint.port());
    asio::write(control_socket, start.to_buffers());

    unsigned long last_frame_number = 0;
    for (;;)
    {
      // Receive 50 messages on the current data socket.
      for (int i = 0; i < 50; ++i)
      {
        // Receive a frame from the server.
        frame f;
        data_socket->receive(f.to_buffers(), 0);
        if (f.number() > last_frame_number)
        {
          last_frame_number = f.number();
          std::cout << "\n" << f.payload();
        }
      }

      // Time to switch to a new socket. To ensure seamless handover we will
      // continue to receive packets using the old socket until data arrives on
      // the new one.
      std::cout << " Starting renegotiation";

      // Create the new data socket.
      shared_ptr<asio::datagram_socket> new_data_socket(
          new asio::datagram_socket(demuxer, asio::ipv4::udp::endpoint(0)));

      // Determine the new port we will use to receive data.
      asio::ipv4::udp::endpoint new_data_endpoint;
      new_data_socket->get_local_endpoint(new_data_endpoint);

      // Ask the server to switch over to the new port.
      control_request change = control_request::change(
          data_endpoint.port(), new_data_endpoint.port());
      asio::error control_result;
      asio::async_write(control_socket, change.to_buffers(),
          lambda::var(control_result) = lambda::_1);

      // Try to receive a frame from the server on the new data socket. If we
      // successfully receive a frame on this new data socket we can consider
      // the renegotation complete. In that case we will close the old data
      // socket, which will cause any outstanding receive operation on it to be
      // cancelled.
      frame f1;
      asio::error new_data_socket_result;
      new_data_socket->async_receive(f1.to_buffers(), 0,
          (
            // Note: lambda::_1 is the first argument to the callback handler,
            // which in this case is the error code for the operation.
            lambda::var(new_data_socket_result) = lambda::_1,
            lambda::if_(!lambda::_1)
            [
              // We have successfully received a frame on the new data socket,
              // so we can close the old data socket. This will cancel any
              // outstanding receive operation on the old data socket.
              lambda::var(data_socket) = shared_ptr<asio::datagram_socket>()
            ]
          ));

      // This loop will continue until we have successfully completed the
      // renegotiation (i.e. received a frame on the new data socket), or some
      // unrecoverable error occurs.
      bool done = false;
      while (!done)
      {
        // Even though we're performing a renegotation, we want to continue
        // receiving data as smoothly as possible. Therefore we will continue to
        // try to receive a frame from the server on the old data socket. If we
        // receive a frame on this socket we will interrupt the demuxer,
        // print the frame, and resume waiting for the other operations to
        // complete.
        frame f2;
        done = true; // Let's be optimistic.
        if (data_socket) // Might have been closed by new_data_socket's handler.
        {
          data_socket->async_receive(f2.to_buffers(), 0,
              (
                lambda::if_(!lambda::_1)
                [
                  // We have successfully received a frame on the old data
                  // socket. Interrupt the demuxer so that we can print it.
                  lambda::bind(&asio::demuxer::interrupt, &demuxer),
                  lambda::var(done) = false
                ]
              ));
        }

        // Run the operations in parallel. This will block until all operations
        // have finished, or until the demuxer is interrupted. (No threads!)
        demuxer.reset();
        demuxer.run();

        // If the demuxer.run() was interrupted then we have received a frame on
        // the old data socket. We need to keep waiting for the renegotation
        // operations to complete.
        if (!done)
        {
          if (f2.number() > last_frame_number)
          {
            last_frame_number = f2.number();
            std::cout << "\n" << f2.payload();
          }
        }
      }

      // Since the loop has finished, we have either successfully completed
      // the renegotation, or an error has occurred. First we'll check for
      // errors.
      if (control_result != asio::error::success)
        throw control_result;
      if (new_data_socket_result != asio::error::success)
        throw new_data_socket_result;

      // If we get here it means we have successfully started receiving data on
      // the new data socket. This new data socket will be used from now on
      // (until the next time we renegotiate).
      std::cout << " Renegotiation complete";
      data_socket = new_data_socket;
      if (f1.number() > last_frame_number)
      {
        last_frame_number = f1.number();
        std::cout << "\n" << f1.payload();
      }
    }
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << std::endl;
  }

  return 0;
}


#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <cmath>
#include <cstdlib>
#include <exception>
#include <iostream>
#include <set>
#include "protocol.hpp"

typedef boost::shared_ptr<boost::asio::stream_socket> stream_socket_ptr;
typedef boost::shared_ptr<boost::asio::deadline_timer> deadline_timer_ptr;
typedef boost::shared_ptr<control_request> control_request_ptr;

class server
{
public:
  // Construct the server to wait for incoming control connections.
  server(boost::asio::demuxer& demuxer, unsigned short port)
    : acceptor_(demuxer, boost::asio::ipv4::tcp::endpoint(port)),
      timer_(demuxer),
      datagram_socket_(demuxer, boost::asio::ipv4::udp::endpoint(0)),
      next_frame_number_(1)
  {
    // Start waiting for a new control connection.
    stream_socket_ptr new_socket(
        new boost::asio::stream_socket(acceptor_.demuxer()));
    acceptor_.async_accept(*new_socket,
        boost::bind(&server::handle_accept, this,
          boost::asio::placeholders::error, new_socket));

    // Start the timer used to generate outgoing frames.
    timer_.expires_from_now(boost::posix_time::milliseconds(100));
    timer_.async_wait(boost::bind(&server::handle_timer, this));
  }

  // Handle a new control connection.
  void handle_accept(const boost::asio::error& e, stream_socket_ptr socket)
  {
    if (!e)
    {
      // Start receiving control requests on the connection.
      control_request_ptr request(new control_request);
      boost::asio::async_read(*socket, request->to_buffers(),
          boost::bind(&server::handle_control_request, this,
            boost::asio::placeholders::error, socket, request));

      // Start waiting for a new control connection.
      stream_socket_ptr new_socket(
          new boost::asio::stream_socket(acceptor_.demuxer()));
      acceptor_.async_accept(*new_socket,
          boost::bind(&server::handle_accept, this,
            boost::asio::placeholders::error, new_socket));
    }
    else if (e == boost::asio::error::connection_aborted)
    {
      // Try again.
      acceptor_.async_accept(*socket,
          boost::bind(&server::handle_accept, this,
            boost::asio::placeholders::error, socket));
    }
  }

  // Handle a new control request.
  void handle_control_request(const boost::asio::error& e,
      stream_socket_ptr socket, control_request_ptr request)
  {
    if (!e)
    {
      // Delay handling of the control request to simulate latency on the
      // network.
      deadline_timer_ptr delay_timer(
          new boost::asio::deadline_timer(acceptor_.demuxer()));
      delay_timer->expires_from_now(boost::posix_time::seconds(2));
      delay_timer->async_wait(
          boost::bind(&server::handle_control_request_timer, this,
            socket, request, delay_timer));
    }
  }

  void handle_control_request_timer(stream_socket_ptr socket,
      control_request_ptr request, deadline_timer_ptr delay_timer)
  {
    // Determine what address this client is connected from, since
    // subscriptions must be stored on the server as a complete endpoint, not
    // just a port.
    boost::asio::ipv4::tcp::endpoint remote_endpoint;
    socket->get_remote_endpoint(remote_endpoint);

    // Remove old port subscription, if any.
    if (unsigned short old_port = request->old_port())
    {
      boost::asio::ipv4::udp::endpoint old_endpoint(
          old_port, remote_endpoint.address());
      subscribers_.erase(old_endpoint);
      std::cout << "Removing subscription " << old_endpoint << std::endl;
    }

    // Add new port subscription, if any.
    if (unsigned short new_port = request->new_port())
    {
      boost::asio::ipv4::udp::endpoint new_endpoint(
          new_port, remote_endpoint.address());
      subscribers_.insert(new_endpoint);
      std::cout << "Adding subscription " << new_endpoint << std::endl;
    }

    // Wait for next control request on this connection.
    boost::asio::async_read(*socket, request->to_buffers(),
        boost::bind(&server::handle_control_request, this,
          boost::asio::placeholders::error, socket, request));
  }

  // Every time the timer fires we will generate a new frame and send it to all
  // subscribers.
  void handle_timer()
  {
    // Generate payload.
    double x = next_frame_number_ * 0.2;
    double y = std::sin(x);
    int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
    std::string payload;
    for (int i = 0; i < frame::payload_size; ++i)
      payload += (i == char_index ? '*' : '.');

    // Create the frame to be sent to all subscribers.
    frame f(next_frame_number_++, payload);

    // Send frame to all subscribers. We can use synchronous calls here since
    // UDP send operations typically do not block.
    std::set<boost::asio::ipv4::udp::endpoint>::iterator j;
    for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
    {
      datagram_socket_.send_to(f.to_buffers(), 0, *j,
          boost::asio::ignore_error());
    }

    // Wait for next timeout.
    timer_.expires_from_now(boost::posix_time::milliseconds(100));
    timer_.async_wait(boost::bind(&server::handle_timer, this));
  }

private:
  // The acceptor used to accept incoming control connections.
  boost::asio::socket_acceptor acceptor_;

  // The timer used for generating data.
  boost::asio::deadline_timer timer_;

  // The socket used to send data to subscribers.
  boost::asio::datagram_socket datagram_socket_;

  // The next frame number.
  unsigned long next_frame_number_;

  // The set of endpoints that are subscribed.
  std::set<boost::asio::ipv4::udp::endpoint> subscribers_;
};

int main(int argc, char* argv[])
{
  try
  {
    if (argc != 2)
    {
      std::cerr << "Usage: server <port>\n";
      return 1;
    }

    boost::asio::demuxer d;

    using namespace std; // For atoi.
    server s(d, atoi(argv[1]));

    d.run();
  }
  catch (std::exception& e)
  {
    std::cerr << "Exception: " << e.what() << std::endl;
  }

  return 0;
}


#ifndef AAOIP_PROTOCOL_HPP
#define AAOIP_PROTOCOL_HPP

#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <cstring>
#include <iomanip>
#include <string>
#include <strstream>

// This request is sent by the client to the server over a TCP connection.
// The client uses it to perform three functions:
// - To request that data start being sent to a given port.
// - To request that data is no longer sent to a given port.
// - To change the target port to another.
class control_request
{
public:
  // Construct an empty request. Used when receiving.
  control_request()
  {
  }

  // Create a request to start sending data to a given port.
  static const control_request start(unsigned short port)
  {
    return control_request(0, port);
  }

  // Create a request to stop sending data to a given port.
  static const control_request stop(unsigned short port)
  {
    return control_request(port, 0);
  }

  // Create a request to change the port that data is sent to.
  static const control_request change(
      unsigned short old_port, unsigned short new_port)
  {
    return control_request(old_port, new_port);
  }

  // Get the old port. Returns 0 for start requests.
  unsigned short old_port() const
  {
    std::istrstream is(data_, encoded_port_size);
    unsigned short port = 0;
    is >> std::setw(encoded_port_size) >> std::hex >> port;
    return port;
  }

  // Get the new port. Returns 0 for stop requests.
  unsigned short new_port() const
  {
    std::istrstream is(data_ + encoded_port_size, encoded_port_size);
    unsigned short port = 0;
    is >> std::setw(encoded_port_size) >> std::hex >> port;
    return port;
  }

  // Obtain buffers for reading from or writing to a socket.
  boost::array<boost::asio::mutable_buffer, 1> to_buffers()
  {
    boost::array<boost::asio::mutable_buffer, 1> buffers
      = { boost::asio::buffer(data_) };
    return buffers;
  }

private:
  // Construct with specified old and new ports.
  control_request(unsigned short old_port, unsigned short new_port)
  {
    std::ostrstream os(data_, control_request_size);
    os << std::setw(encoded_port_size) << std::hex << old_port;
    os << std::setw(encoded_port_size) << std::hex << new_port;
  }

  // The length in bytes of a control_request and its components.
  enum
  {
    encoded_port_size = 4, // 16-bit port in hex.
    control_request_size = encoded_port_size * 2
  };

  // The encoded request data.
  char data_[control_request_size];
};

// This frame is sent from the server to subscribed clients over UDP.
class frame
{
public:
  // The maximum allowable length of the payload.
  enum { payload_size = 32 };

  // Construct an empty frame. Used when receiving.
  frame()
  {
  }

  // Construct a frame with specified frame number and payload.
  frame(unsigned long number, const std::string& payload)
  {
    std::ostrstream os(data_, frame_size);
    os << std::setw(encoded_number_size) << std::hex << number;
    os << std::setw(payload_size)
      << std::setfill(' ') << payload.substr(0, payload_size);
  }

  // Get the frame number.
  unsigned long number() const
  {
    std::istrstream is(data_, encoded_number_size);
    unsigned long number = 0;
    is >> std::setw(encoded_number_size) >> std::hex >> number;
    return number;
  }

  // Get the payload data.
  const std::string payload() const
  {
    return std::string(data_ + encoded_number_size, payload_size);
  }

  // Obtain buffers for reading from or writing to a socket.
  boost::array<boost::asio::mutable_buffer, 1> to_buffers()
  {
    boost::array<boost::asio::mutable_buffer, 1> buffers
      = { boost::asio::buffer(data_) };
    return buffers;
  }

private:
  // The length in bytes of a frame and its components.
  enum
  {
    encoded_number_size = 8, // Frame number in hex.
    frame_size = encoded_number_size + payload_size
  };

  // The encoded frame data.
  char data_[frame_size];
};

#endif // AAOIP_PROTOCOL_HPP


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk