On Fri, May 6, 2011 at 9:57 PM, bill comment <billcomment@gmail.com> wrote:

I am having a problem writing a TCP socket message using boost:asio async_*

I am new to using boost asio sockets. reading what I could find (this list, samples, other forums) I have added boost::asio socket use to my application.

 

What works is: accept, receive, parse and reply to a message.

After my client sends a message and receives a response it remains connected and starts an async receive.

Several seconds later my server tries to send another message to the client (by calling server::write()). the code runs ok up to connection::handle_writeMessage() where error is: 2719 The file handle supplied is not valid.

 

I thought I read (links below) that using strand::wrap() to queue the async_* calls and io_service::post() to post the initial function call would allow a thread that didnt call io_service::run() to initiate the sending or a message.

http://stackoverflow.com/questions/4078484/using-boost-sockets-do-i-need-only-one-io-service

http://stackoverflow.com/questions/4090567/better-understanding-boosts-chat-client-example

http://groups.google.com/group/boost-list/browse_thread/thread/124a643a06d67fd9

 

did I interpret this incorrectly? Did I code it wrong?

 

Thank you for your time and advice.

 

 

to develop my code I started with the http3 and chat server samples. Changes include:

- server::threads is a class member

- server::run() returns without waiting for threads to stop so I start other threads in my program

- added server::stop() to stop threads.

- server::write() is called from another thread.

 

- connection::do_writeMessage() and connection::handle_writeMessage() are from the chat server sample.

 

A summary of my server code is below. its long, but should be familiar from the samples.

Server.hpp

  std::vector<boost::shared_ptr<boost::thread> > threads;

 

server::server

  : thread_pool_size_(2), // 1 thread for wait, read, reply. 1 for unsolicited writes.

    acceptor_(io_service_),

    new_connection_(new connection(io_service_, request_handler_)),

    request_handler_()

{

async_accept(new_connection_->socket(),

      boost::bind(&server::handle_accept, this,

        boost::asio::placeholders::error));

}

 

void server::run()

{

   // 1 thread for wait, read, reply. 1 for unsolicited writes.

 

  // Create a pool of threads to run all of the io_services.

  //std::vector<boost::shared_ptr<boost::thread> > threads;

  for (std::size_t i = 0; i < thread_pool_size_; ++i)

  {

    boost::shared_ptr<boost::thread> thread(new boost::thread(

          boost::bind(&boost::asio::io_service::run, &io_service_)));

    threads.push_back(thread);

  }

}

 

 

void server::stop()

{

  io_service_.stop();

 

  // Wait for all threads in the pool to exit.

  for (std::size_t i = 0; i < threads.size(); ++i)

  {

    threads[i]->join();

  }

}

 

 

server::handle_accept (const boost::system::error_code& e)

{

  if (!e)

  {

     // read, parse and reply

    new_connection_->start();

 

    new_connection_.reset(new connection(io_service_, request_handler_));

 

    // ready to accept another connection

     cout << "waiting for new accept ..." << endl;

    acceptor_.async_accept(new_connection_->socket(),

        boost::bind(&server::handle_accept, this,

          boost::asio::placeholders::error));

 }

}

 

// public

// called from another thread in my program to send an unsolicited message to my connected client

void server::write(const string& msg)

{

   io_service_.post(boost::bind(&connection::do_writeMessage, new_connection_, msg));

'new_connection_' always points to an invalid socket because after a connection is established you always replace it with a new one. 

}

 

- - - - - - - - - -

connection::connection(boost::asio::io_service& io_service,

    request_handler& handler)

  : strand_(io_service),

    socket_(io_service),

    request_handler_(handler)

{

   _running = true;

}

 

void connection::start()

{

   buffer_.fill (0);

  socket_.async_read_some(boost::asio::buffer(buffer_),

      strand_.wrap(

        boost::bind(&connection::handle_read, shared_from_this(),

          boost::asio::placeholders::error,

          boost::asio::placeholders::bytes_transferred)

       )

    );

}

 

void connection::handle_read(const boost::system::error_code& e,

    std::size_t bytes_transferred)

{

  if (!e)

  {

     // parse what we have so far. request_parser_.parse() is my code and it works correctly.

     // when successful result==true, request

    boost::tribool result;

    request_.reset();

    boost::tie(result, boost::tuples::ignore) = request_parser_.parse(

        request_, buffer_.data(), buffer_.data() + bytes_transferred);

 

    if (result)

    {

       // we read an entire message

 

       // this is my code to process the message and put fill in the reply_ buffer.

       request_handler_.handle_request(request_, reply_);

 

      // send the reply

      boost::asio::async_write(socket_, reply_.to_buffers(),

          strand_.wrap(

            boost::bind(&connection::handle_writeResponse, shared_from_this(),

              boost::asio::placeholders::error)

              )

              );

 

    }

    else if (!result)

    {

       // there was an error reading the message

      boost::asio::async_write(socket_, reply_.to_buffers(),

          strand_.wrap(

            boost::bind(&connection::handle_writeResponse, shared_from_this(),

              boost::asio::placeholders::error)

              )

              );

    }

    else

    {

       // read some ok. need to read some more.

      socket_.async_read_some(boost::asio::buffer(buffer_),

          strand_.wrap(

            boost::bind(&connection::handle_read, shared_from_this(),

              boost::asio::placeholders::error,

              boost::asio::placeholders::bytes_transferred)

              )

              );

    }

  }

  else

  {

      cout << "connection::handle_read: error: " << e.value() << " " << e.message() << endl;

  }

 

  // If an error occurs then no new asynchronous operations are started. This

  // means that all shared_ptr references to the connection object will

  // disappear and the object will be destroyed automatically after this

  // handler returns. The connection class's destructor closes the socket.

}

 

void connection::handle_writeResponse(const boost::system::error_code& e)

{

   if (!e)

   {

     // test. This message is received by my client.

      do_writeMessage ("hello world");

 

     // ready to read more

     start();

   }

   else

   {

      cout << "connection::handle_writeMessage: error sending to client: " << e.value() << " " << e.message() << endl;

   }

 

  // No new asynchronous operations are started. This means that all shared_ptr

  // references to the connection object will disappear and the object will be

  // destroyed automatically after this handler returns. The connection class's

  // destructor closes the socket.

}

 

 

- - - - - - - - - -

 

// call to this function is posed to io_service from server::write

// public

void connection::do_writeMessage(string msg)

{

  

   bool write_in_progress = !_messagesToWrite.empty();

   _messagesToWrite.push_back(msg);

 

   // if there are any in the queue, handle_writeMessage is already sending them.

   if (write_in_progress)

   {

      cout << "connection::do_writeMessage: write in progress." << endl;

   }

   else

   {

      // start the async write.

      boost::asio::async_write(socket_,

         boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),

      strand_.wrap(

         boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,

         boost::asio::placeholders::error)

         )

         );

   }

}

 

 

// async write is done. if no error and more messages to write, write them, too.

void connection::handle_writeMessage(const boost::system::error_code& error)

{

   if (!error)

   {

      cout << "connection::handle_writeMessage: message written ok." << endl;

      // remove the message we just wrote

      _messagesToWrite.pop_front();

 

      // any more?

      if (!_messagesToWrite.empty())

      {

         // write the next one and call this function when that'd done.

         boost::asio::async_write(socket_,

            boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),

      strand_.wrap(

            boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,

            boost::asio::placeholders::error)

            )

            );

      }

      else

      {

         cout << "connection::handle_writeMessage: no more messages" << endl;

      }

   }

   else

   {

      cout << "connection::handle_writeMessage: error: " << error.value() << " " << error.message() << endl;

   }

 

   // let this async chain end.

}

 

- - - - - - - - - -

main()

{

pTCPServer = new http::server3::server (argv[TCPPortNumber]);

if (pTCPServer)

{

pTCPServer->run();

}

 

 

// start thread that will initiate 2nd message

 … … …

}



_______________________________________________
Boost-users mailing list
Boost-users@lists.boost.org
http://lists.boost.org/mailman/listinfo.cgi/boost-users