Hello everyone,
I am using the boost library now for a couple of weeks. Unfortunately I've run into some problems with my C++ code and I would like to make sure that these problems are not caused due to incorrect use of boost. So basically I've written a small class which is suppossed to take messages and send them asynchronously. If there are more messages than can be processed, they queue up. The code is mainly based on the tutorial code provided on the boost website: http://www.boost.org/doc/libs/1_39_0/doc/html/boost_asio/tutorial/tutdaytime3.html

I have 2 questions:

1. The constructor launches the threads, from this point on I want to be able to queue up messages, which are processed afer that by using the function MessageService::write. Is there any possible problem which could occur due to buffer or thread issues? Because sometimes the messages are not processed until the end and when they are processed they are corrupted.

2. Sometimes the messages are not processed when the application would quit. I have to make sure, that the messages are sent at least at the end. Sometimes there are 10.000+ messages queued up, so this might take a bit time. For this I've wrote MessageService::await(). It seems really bogus to me. I am checking the pending messages left for the following reason:

When the host I am connecting to does not listen on the port I am connecting to, I don't receive an error by the functions I am using. So the messages are queued up and nothing happens. I don't know why. Have I forgot any checking whether the connection could be established?

In the hope my question is not too generic, I would be really grateful for any advice.  I am sorry I could not provide any specific errors, but this is due to the reason all the problems evolve on the client side by receiving corrupted messages.

- Konrad

/*
 * MessageService.cpp
 *
 *  Created on: 04.06.2011
 *      Author: Konrad Johannes Reiche
 */

#include "MessageService.h"

using namespace google::protobuf::io;
using boost::asio::ip::tcp;


MessageService::MessageService(std::string ip, std::string port) :
    work(io_service), resolver(io_service), socket(io_service) {

    messageQueue = new std::deque<AgentMessage>;
    tcp::resolver::query query(ip, port);
    endpoint_iterator = resolver.resolve(query);

    tcp::endpoint endpoint = *endpoint_iterator;

    socket.async_connect(endpoint, boost::bind(&MessageService::handle_connect,
            this, boost::asio::placeholders::error, ++endpoint_iterator));

    boost::thread t(boost::bind(&boost::asio::io_service::run, &io_service));
}

void MessageService::await() {

    while (!messageQueue->empty()) {

        signal(SIGINT, exit);

        int messagesLeft = messageQueue->size();
        sleep(3);
        std::cout << "Pending Profiler Agents Messages: " << messageQueue->size() << std::endl;
        if (messagesLeft == messageQueue->size()) {
            std::cout << "Connection Error" << std::endl;
            break;
        }
    }
}

void MessageService::write(AgentMessage agentMessage, long systemTime,
    int JVM_ID) {

          agentMessage.set_timestamp(Agent::Helper::getCurrentClockCycle());
          agentMessage.set_jvm_id(JVM_ID);
          agentMessage.set_systemtime(systemTime);
          io_service.post(boost::bind(&MessageService::do_write, this, agentMessage));
}

void MessageService::do_close() {
   socket.close();
}

void MessageService::transmitMessage(AgentMessage agentMessage) {

boost::asio::streambuf b;
std::ostream os(&b);

ZeroCopyOutputStream *raw_output = new OstreamOutputStream(&os);
CodedOutputStream *coded_output = new CodedOutputStream(raw_output);

coded_output->WriteVarint32(agentMessage.ByteSize());
agentMessage.SerializeToCodedStream(coded_output);

delete coded_output;
delete raw_output;

boost::system::error_code ignored_error;

boost::asio::async_write(socket, b.data(), boost::bind(
&MessageService::handle_write, this, boost::asio::placeholders::error));
}

void MessageService::do_write(AgentMessage agentMessage) {

bool write_in_progress = !messageQueue->empty();
messageQueue->push_back(agentMessage);

if (!write_in_progress) {
    transmitMessage(agentMessage);
}
}

void MessageService::handle_write(const boost::system::error_code &error) {

if (!error) {
    messageQueue->pop_front();
    if (!messageQueue->empty()) {
        transmitMessage(messageQueue->front());
    }
} else {
    std::cout << error << std::endl;
    do_close();
}
}

void MessageService::handle_connect(const boost::system::error_code &error,
    tcp::resolver::iterator endpoint_iterator) {
// can be used to receive commands from the Java profiler interface
}

MessageService::~MessageService() {
// TODO Auto-generated destructor stub
}