I am having a problem writing a TCP socket message using boost:asio async_*
I am new to using boost asio sockets. reading what I could find (this list, samples, other forums) I have added boost::asio socket use to my application.
What works is: accept, receive, parse and reply to a message.
After my client sends a message and receives a response it remains connected and starts an async receive.
Several seconds later my server tries to send another message to the client (by calling server::write()). the code runs ok up to connection::handle_writeMessage() where error is: 2719 “The file handle supplied is not valid”.
I thought I read (links below) that using strand::wrap() to queue the async_* calls and io_service::post() to post the initial function call would allow a thread that didn’t call io_service::run() to initiate the sending or a message.
http://stackoverflow.com/questions/4078484/using-boost-sockets-do-i-need-only-one-io-service
http://stackoverflow.com/questions/4090567/better-understanding-boosts-chat-client-example
http://groups.google.com/group/boost-list/browse_thread/thread/124a643a06d67fd9
did I interpret this incorrectly? Did I code it wrong?
Thank you for your time and advice.
to develop my code I started with the http3 and chat server samples. Changes include:
- server::threads is a class member
- server::run() returns without waiting for threads to stop so I start other threads in my program
- added server::stop() to stop threads.
- server::write() is called from another thread.
- connection::do_writeMessage() and connection::handle_writeMessage() are from the chat server sample.
A summary of my server code is below. its long, but should be familiar from the samples.
Server.hpp
std::vector<boost::shared_ptr<boost::thread> > threads;
server::server
: thread_pool_size_(2), // 1 thread for wait, read, reply. 1 for unsolicited writes.
acceptor_(io_service_),
new_connection_(new connection(io_service_, request_handler_)),
request_handler_()
{
async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error));
}
void server::run()
{
// 1 thread for wait, read, reply. 1 for unsolicited writes.
// Create a pool of threads to run all of the io_services.
//std::vector<boost::shared_ptr<boost::thread> > threads;
for (std::size_t i = 0; i < thread_pool_size_; ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&boost::asio::io_service::run, &io_service_)));
threads.push_back(thread);
}
}
void server::stop()
{
io_service_.stop();
// Wait for all threads in the pool to exit.
for (std::size_t i = 0; i < threads.size(); ++i)
{
threads[i]->join();
}
}
server::handle_accept (const boost::system::error_code& e)
{
if (!e)
{
// read, parse and reply
new_connection_->start();
new_connection_.reset(new connection(io_service_, request_handler_));
// ready to accept another connection
cout << "waiting for new accept ..." << endl;
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this,
boost::asio::placeholders::error));
}
}
// public
// called from another thread in my program to send an unsolicited message to my connected client
void server::write(const string& msg)
{
io_service_.post(boost::bind(&connection::do_writeMessage, new_connection_, msg));
}
- - - - - - - - - -
connection::connection(boost::asio::io_service& io_service,
request_handler& handler)
: strand_(io_service),
socket_(io_service),
request_handler_(handler)
{
_running = true;
}
void connection::start()
{
buffer_.fill (0);
socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
)
);
}
void connection::handle_read(const boost::system::error_code& e,
std::size_t bytes_transferred)
{
if (!e)
{
// parse what we have so far. request_parser_.parse() is my code and it works correctly.
// when successful result==true, request
boost::tribool result;
request_.reset();
boost::tie(result, boost::tuples::ignore) = request_parser_.parse(
request_, buffer_.data(), buffer_.data() + bytes_transferred);
if (result)
{
// we read an entire message
// this is my code to process the message and put fill in the reply_ buffer.
request_handler_.handle_request(request_, reply_);
// send the reply
boost::asio::async_write(socket_, reply_.to_buffers(),
strand_.wrap(
boost::bind(&connection::handle_writeResponse, shared_from_this(),
boost::asio::placeholders::error)
)
);
}
else if (!result)
{
// there was an error reading the message
boost::asio::async_write(socket_, reply_.to_buffers(),
strand_.wrap(
boost::bind(&connection::handle_writeResponse, shared_from_this(),
boost::asio::placeholders::error)
)
);
}
else
{
// read some ok. need to read some more.
socket_.async_read_some(boost::asio::buffer(buffer_),
strand_.wrap(
boost::bind(&connection::handle_read, shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
)
);
}
}
else
{
cout << "connection::handle_read: error: " << e.value() << " " << e.message() << endl;
}
// If an error occurs then no new asynchronous operations are started. This
// means that all shared_ptr references to the connection object will
// disappear and the object will be destroyed automatically after this
// handler returns. The connection class's destructor closes the socket.
}
void connection::handle_writeResponse(const boost::system::error_code& e)
{
if (!e)
{
// test. This message is received by my client.
do_writeMessage ("hello world");
// ready to read more
start();
}
else
{
cout << "connection::handle_writeMessage: error sending to client: " << e.value() << " " << e.message() << endl;
}
// No new asynchronous operations are started. This means that all shared_ptr
// references to the connection object will disappear and the object will be
// destroyed automatically after this handler returns. The connection class's
// destructor closes the socket.
}
- - - - - - - - - -
// call to this function is posed to io_service from server::write
// public
void connection::do_writeMessage(string msg)
{
bool write_in_progress = !_messagesToWrite.empty();
_messagesToWrite.push_back(msg);
// if there are any in the queue, handle_writeMessage is already sending them.
if (write_in_progress)
{
cout << "connection::do_writeMessage: write in progress." << endl;
}
else
{
// start the async write.
boost::asio::async_write(socket_,
boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),
strand_.wrap(
boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,
boost::asio::placeholders::error)
)
);
}
}
// async write is done. if no error and more messages to write, write them, too.
void connection::handle_writeMessage(const boost::system::error_code& error)
{
if (!error)
{
cout << "connection::handle_writeMessage: message written ok." << endl;
// remove the message we just wrote
_messagesToWrite.pop_front();
// any more?
if (!_messagesToWrite.empty())
{
// write the next one and call this function when that'd done.
boost::asio::async_write(socket_,
boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),
strand_.wrap(
boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,
boost::asio::placeholders::error)
)
);
}
else
{
cout << "connection::handle_writeMessage: no more messages" << endl;
}
}
else
{
cout << "connection::handle_writeMessage: error: " << error.value() << " " << error.message() << endl;
}
// let this async chain end.
}
- - - - - - - - - -
main()
{
pTCPServer = new http::server3::server (argv[TCPPortNumber]);
if (pTCPServer)
{
pTCPServer->run();
}
// start thread that will initiate 2nd message
… … …
}