Boost logo

Boost-Build :

Subject: [Boost-build] ASIO How to convert to a multi-threaded server ?
From: Avi Bahra (avibahra_at_[hidden])
Date: 2012-10-31 12:36:57


I have a single threaded ASIO server based on the Serialization example
and this worked well, apart from some performance issue.

However so far my attempts to make this multi-threaded, seems
have caused a infinite loop around the handleRead() function.
Using Version boost 1.47

Ok I have must have misunderstood some issue. I was trying to
merge the HTTP Server 3 example with Serialisation example.
For briefness some classes have been left out
The connection class is modelled the same way as the
Serialization example.

Here is the cut down server code::
Can any one help ?

//--------------------------------------------------------

server::server( ServerEnvironment& serverEnv ) :
   io_service_(),
   signals_(io_service_),
   acceptor_(io_service_),
   strand_(io_service_),
   traverser_ (this, io_service_, serverEnv ),
   checkPtSaver_(this, io_service_, &serverEnv ),
    serverState_(SState::HALTED),
    serverEnv_(serverEnv)
{
   if (serverEnv_.debug())
      cout << "-->server::server starting server on port "
           << serverEnv.port() << endl;

   // Register to handle the signals.
   // Support for emergency check pointing during system session.
   signals_.add(SIGTERM);
   signals_.async_wait(boost::bind(&server::sigterm_signal_handler, this));

   boost::asio::ip::tcp::endpoint endpoint(serverEnv.tcp_protocol(),
                                           serverEnv.port());
   acceptor_.open(endpoint.protocol());
   acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
   acceptor_.bind(endpoint);
   acceptor_.listen();

   // Start an accept operation for a new connection.
   acceptNewConnection();
}

server::~server()
{
   if (serverEnv_.debug())
      cout << "<--server::~server exiting server on port "
             << serverEnv_.port() << endl;
   defs_.reset();
   assert(defs_.use_count() == 0);
}

void server::acceptNewConnection()
{
   if (serverEnv_.debug())
      cout << boost::this_thread::get_id()
           << " server::acceptNewConnection() "
           << endl;

   connection_ptr new_conn( new connection( io_service_ ) );
   acceptor_.async_accept( new_conn->socket(),
                           strand_.wrap(boost::bind(
                                 &server::handle_accept, this,
                                 boost::asio::placeholders::error,
                                 new_conn ) ));
}

void server::run()
{
  // -1 since we we use main thread
  size_t thread_pool_size_ = boost::thread::hardware_concurrency() - 1;

  // Create a pool of threads to run all of the io_services.
  std::vector<boost::shared_ptr<boost::thread> > threads;
  for (std::size_t i = 0; i < thread_pool_size_; ++i)
  {
    boost::shared_ptr<boost::thread> thread(new boost::thread(
          boost::bind(&boost::asio::io_service::run, &io_service_)));
    threads.push_back(thread);
  }

  io_service_.run(); // the main thread

  // Wait for all threads in the pool to exit.
  for (std::size_t i = 0; i < threads.size(); ++i)
    threads[i]->join();
}

void server::terminate()
{
   if (serverEnv_.debug())
      cout << " server::terminate()\n";
   io_service_.post(boost::bind(&server::handle_terminate, this));
}

void server::handle_terminate()
{
   if (serverEnv_.debug())
      cout << boost::this_thread::get_id()
           << " server::handle_terminate()\n";

   // Cancel signal
   signals_.clear();
   signals_.cancel();

   // Cancel async timers for check pointing and traversal
   traverser_.terminate();
   checkPtSaver_.terminate();

   acceptor_.close();
   io_service_.stop();
}

void server::handle_accept( const boost::system::error_code& e,
                            connection_ptr conn )
{
   if (!acceptor_.is_open()) {
      if (serverEnv_.debug())
         cout << boost::this_thread::get_id()
              << " server::handle_accept: acceptor is closed\n";
      return;
   }

   if ( !e ) {
      // Read and interpret message from the client
      if (serverEnv_.debug())
         cout << boost::this_thread::get_id()
              << " server::handle_accept\n";

      conn->async_read( inbound_request_,
                     strand_.wrap(
                        boost::bind(
                           &server::handle_read, this,
                           boost::asio::placeholders::error,conn)));
   }
   else {
      if (serverEnv_.debug())
         cout << boost::this_thread::get_id()
              << " server::handle_accept "
              << e.message() << "\n";
      if (e != boost::asio::error::operation_aborted) {
         LogToCout toCoutAsWell;
         LOG(Log::ERR, " server::handle_accept error occurred " <<
e.message());
      }
   }
   acceptNewConnection();
}

void server::handle_read(const boost::system::error_code& e,
                         connection_ptr conn )
{
   /// Handle completion of a write operation.
   if ( !e ) {

      // See what kind of message we got from the client
      if (serverEnv_.debug())
         std::cout << boost::this_thread::get_id()
                   << " server::handle_read : client request "
                   << inbound_request_ << "\n";
      try {
         // Service the in bound request,
         // handling the request will populate the outbound_response_
         outbound_response_.set_cmd(
            inbound_request_.handleRequest( this ) );
      }
      catch (exception& e) {
         outbound_response_.set_cmd(
            PreAllocatedReply::error_cmd( e.what() ));
      }

      // *Reply* back to the client:
      conn->async_write( outbound_response_,
                         strand_.wrap(boost::bind(&server::handle_write,
                                    this,
                                    boost::asio::placeholders::error,
                                    conn ) ));
   }
   else {
      LogToCout toCoutAsWell;
      LOG(Log::ERR, "server::handle_read error occurred : " << e.message());
   }
}

void server::handle_write( const boost::system::error_code& e,
                           connection_ptr conn )
{
    if (serverEnv_.debug())
      cout << boost::this_thread::get_id()
           << " server::handle_write: client request "
           << inbound_request_
           << " replying with "
           << outbound_response_ << "\n";

   conn->socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both);
   conn->socket().close();

   if (inbound_request_.terminateRequest() &&
       outbound_response_.get_cmd()->isOkCmd()) {
      terminate();
   }
}

Ta,
  Avi


Boost-Build list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk