Boost logo

Boost Users :

Subject: [Boost-users] [asio] [thread] Code review for correct use of boost sockets and threads
From: Konrad Reiche (konrad.reiche_at_[hidden])
Date: 2011-07-03 17:43:57


Hello everyone,
I am using the boost library now for a couple of weeks. Unfortunately
I've run into some problems with my C++ code and I would like to make
sure that these problems are not caused due to incorrect use of boost.
So basically I've written a small class which is suppossed to take
messages and send them asynchronously. If there are more messages than
can be processed, they queue up. The code is mainly based on the
tutorial code provided on the boost website:
http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/tutorial/tutdaytime3.html

I have 2 questions:

1. The constructor launches the threads, from this point on I want to be
able to queue up messages, which are processed afer that by using the
function MessageService::write. Is there any possible problem which
could occur due to buffer or thread issues? Because sometimes the
messages are not processed until the end and when they are processed
they are corrupted.

2. Sometimes the messages are not processed when the application would
quit. I have to make sure, that the messages are sent at least at the
end. Sometimes there are 10.000+ messages queued up, so this might take
a bit time. For this I've wrote MessageService::await(). It seems really
bogus to me. I am checking the pending messages left for the following
reason:

When the host I am connecting to does not listen on the port I am
connecting to, I don't receive an error by the functions I am using. So
the messages are queued up and nothing happens. I don't know why. Have I
forgot any checking whether the connection could be established?

In the hope my question is not too generic, I would be really grateful
for any advice. I am sorry I could not provide any specific errors, but
this is due to the reason all the problems evolve on the client side by
receiving corrupted messages.

- Konrad

/*
  * MessageService.cpp
  *
  * Created on: 04.06.2011
  * Author: Konrad Johannes Reiche
  */

#include "MessageService.h"

using namespace google::protobuf::io;
using boost::asio::ip::tcp;

MessageService::MessageService(std::string ip, std::string port) :
     work(io_service), resolver(io_service), socket(io_service) {

     messageQueue = new std::deque<AgentMessage>;
     tcp::resolver::query query(ip, port);
     endpoint_iterator = resolver.resolve(query);

     tcp::endpoint endpoint = *endpoint_iterator;

     socket.async_connect(endpoint,
boost::bind(&MessageService::handle_connect,
             this, boost::asio::placeholders::error, ++endpoint_iterator));

     boost::thread t(boost::bind(&boost::asio::io_service::run,
&io_service));
}

void MessageService::await() {

     while (!messageQueue->empty()) {

         signal(SIGINT, exit);

         int messagesLeft = messageQueue->size();
         sleep(3);
         std::cout << "Pending Profiler Agents Messages: " <<
messageQueue->size() << std::endl;
         if (messagesLeft == messageQueue->size()) {
             std::cout << "Connection Error" << std::endl;
             break;
         }
     }
}

void MessageService::write(AgentMessage agentMessage, long systemTime,
     int JVM_ID) {

           
agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
           agentMessage.set_jvm_id(JVM_ID);
           agentMessage.set_systemtime(systemTime);
           io_service.post(boost::bind(&MessageService::do_write, this,
agentMessage));
}

void MessageService::do_close() {
    socket.close();
}

void MessageService::transmitMessage(AgentMessage agentMessage) {

boost::asio::streambuf b;
std::ostream os(&b);

ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);

delete coded_output;
delete raw_output;

boost::system::error_code ignored_error;

boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this, boost::asio::placeholders::error));
}

void MessageService::do_write(AgentMessage agentMessage) {

bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);

if (!write_in_progress) {
     transmitMessage(agentMessage);
}
}

void MessageService::handle_write(const boost::system::error_code &error) {

if (!error) {
     messageQueue->pop_front();
     if (!messageQueue->empty()) {
         transmitMessage(messageQueue->front());
     }
} else {
     std::cout << error << std::endl;
     do_close();
}
}

void MessageService::handle_connect(const boost::system::error_code &error,
     tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler interface
}

MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net