Boost logo

Boost Users :

Subject: [Boost-users] [asio] Problem with async_read/async_read_some not reacting to incoming data
From: Stig Sandø (ses_at_[hidden])
Date: 2009-03-19 07:27:20


Hei,

We're rewriting a client with boost asio but have run into some problems
when stresstesting the client. The client is fetching textual and
graphics from a server with one connection that is open at all time.
When the client is getting large amounts of graphics data it will after
awhile suddenly stop receiving data, and eventually it will hit our
timeouts and the last call sent is an async_read/async_read_some. We
are keeping the server well-fed with requests so there should be
graphics forthcoming without pause. The problem has been seen on win32,
linux and darwin when testing on a gigabit net fetching raw 1080i
graphics (4M for each field), and is most frequent on darwin. This is
naturally an absolute show-stopper for us.

So we are a bit loss what is going wrong and why
async_read/async_read_some stops reacting in the middle of the
fetch-queue, despite wireshark showing that the data is incoming. When
using compression on the data the problem is harder to reproduce, which
might suggest a race-condition somewhere. But our code is just using a
single thread for io_service and all async-communication is triggered
from this io-thread which has a work-object to keep the io_service
spinning. We're also making sure there is at most one async_read and
one async_write in effect at a time, roughly similar to the chat_client
sample.

Has anyone seen something similar or have any input on how best to
figure out what goes wrong? Are there invariants that says you cannot
read and write at the same time?

Some symptoms are the same in each test. When we get the last image
from the socket the buffersize is zero afterwards, and the next
async_read request is to transfer_at_least(1). The async_read never
calls the handler for completion of this byte, so Nagle would have
kicked in. It is also fairly hard to strip down to a small example
using a mock server.

I have included some stripped down code below in case that might be
helpful spotting something that we cant see.

Cheers,
Stig

Stacktrace during timeout of the asio-thread:
(gdb) bt
#0 0x91302f66 in kevent ()
#1 0x003e8010 in boost::asio::detail::kqueue_reactor<false>::run ()
#2 0x0045f70a in
boost::asio::detail::task_io_service<boost::asio::detail::kqueue_reactor<false>
>::do_one ()
#3 0x0045f8c3 in
boost::asio::detail::task_io_service<boost::asio::detail::kqueue_reactor<false>
>::run ()
#4 0x0045f972 in boost::asio::io_service::run ()
#5 0x003f8bbc in vcl::connection_pool::implementation::run ()
#6 0x003fa2fa in boost::_mfi::mf0<void,
vcl::connection_pool::implementation>::operator() ()
#7 0x003fa356 in
boost::_bi::list1<boost::_bi::value<vcl::connection_pool::implementation*>
>::operator()<boost::_mfi::mf0<void,
vcl::connection_pool::implementation>, boost::_bi::list0> ()
#8 0x003fa399 in boost::_bi::bind_t<void, boost::_mfi::mf0<void,
vcl::connection_pool::implementation>,
boost::_bi::list1<boost::_bi::value<vcl::connection_pool::implementation*>
> >::operator() ()
#9 0x003fa62e in boost::detail::thread_data<boost::_bi::bind_t<void,
boost::_mfi::mf0<void, vcl::connection_pool::implementation>,
boost::_bi::list1<boost::_bi::value<vcl::connection_pool::implementation*>
> > >::run ()
#10 0x003b97ce in thread_proxy ()
#11 0x913036f5 in _pthread_start ()
#12 0x913035b2 in thread_start ()

The connection headerfile:

#ifndef VCL_ASIO_CONNECTION_HPP
#define VCL_ASIO_CONNECTION_HPP

#include <vcl/connection.hpp>
#include <queue>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <boost/enable_shared_from_this.hpp>

namespace soul {
struct logger_base;
typedef boost::shared_ptr<logger_base> logger;
}

using boost::asio::ip::tcp;

namespace vcl {

class connection_manager;
class dispatcher;
class protocol;
class response;
typedef boost::shared_ptr<connection_manager> connection_manager_ptr;
typedef boost::weak_ptr<connection_manager> connection_manager_wptr;
typedef boost::shared_ptr<dispatcher> dispatcher_ptr;
typedef boost::shared_ptr<protocol> protocol_ptr;
typedef boost::weak_ptr<protocol> protocol_wptr;
typedef boost::shared_ptr<response> response_ptr;
typedef std::queue<request_ptr> req_queue;

enum read_status {
    rs_no_valid_data,
    rs_read_one_block,
    rs_error_occured
};

class asio_connection : public connection,
                        public
boost::enable_shared_from_this<asio_connection> {
public:
    asio_connection(const soul::logger &logger,
                    boost::asio::io_service& io_service,
                    connection_manager_wptr cmgr,
                    const dispatcher_ptr &disp,
                    const vbl::host_repr_ptr &host,
                    const protocol_ptr &cp);
    virtual ~asio_connection();

    void handle_resolve(const boost::system::error_code& error,
                        tcp::resolver::iterator endpoint_iterator);
    void handle_connect(const boost::system::error_code& error,
                        tcp::resolver::iterator endpoint_iterator);

    virtual void lose_connection();
    virtual void start_connecting();

    virtual vbl::host_repr_ptr get_host();
    virtual bool send(const request_ptr &req);

    static connection_ptr connect(const std::string &host,
                                  unsigned short port,
                                  const protocol_ptr &cp);

private:
    void handle_write_request(const boost::system::error_code& error);
    unsigned handle_capture_request();
    read_status handle_text_request();
    read_status handle_icon_request(unsigned &next_amount);
    void start_reading(unsigned bytes=1);
    void handle_read_info(const boost::system::error_code& error,
                          std::size_t tf);
    read_status _handle_read_info(unsigned &next_amount);

    void io_send(const request_ptr &req);
    bool io_dispatch_response(response_ptr myresponse);
    bool io_dispatch_error(const boost::system::error_code& error);
    std::string prepared_request(const request_ptr &msg) const;

private:
    soul::logger local_logger;
    boost::asio::io_service& m_io_service;
    connection_manager_wptr m_manager;
    dispatcher_ptr m_dispatcher;
    tcp::socket m_socket;

    vbl::host_repr_ptr m_host;
    tcp::resolver::iterator m_host_iterator;
    protocol_wptr wcp;

    req_queue outgoing_requests;
    req_queue incoming_requests;
    boost::asio::streambuf inbuffer;

    request_ptr currently_reading;
};

}

#endif /* VCL_ASIO_CONNECTION_HPP */

And the stripped down communication-part of the connection object:

#include "connection.hpp"
#include "../logging.hpp"
#include "../internal_response.hpp"
#include <vcl/protocol.hpp>
#include <vcl/request.hpp>
#include "../manager.hpp"
#include <vbl/host.hpp>
#include <vbl/exceptions.hpp>
#include <vcl/callback.hpp>
#include <vcl/dispatcher.hpp>
#include <boost/bind.hpp>

namespace vcl {

asio_connection::asio_connection(const soul::logger &logger,
                                 boost::asio::io_service& io_service,
                                 connection_manager_wptr cmgr,
                                 const dispatcher_ptr &disp,
                                 const vbl::host_repr_ptr &host,
                                 const protocol_ptr &cp)
    : local_logger(logger),
      m_io_service(io_service),
      m_manager(cmgr),
      m_dispatcher(disp),
      m_socket(io_service),
      m_host(host),
      wcp(cp) {

}

asio_connection::~asio_connection() { }

void
asio_connection::start_connecting() {
    tcp::resolver resolver(m_io_service);
    tcp::resolver::query query(m_host->get_hostname(),
m_host->get_port_as_string());
    boost::system::error_code ec;
    m_host_iterator = resolver.resolve(query, ec);
    handle_resolve(ec, m_host_iterator);
}

void
asio_connection::handle_resolve(const boost::system::error_code& error,
                                tcp::resolver::iterator endpoint_iterator) {
    // THIS CAN BE ANY THREAD
    if (!error) {
        tcp::endpoint endpoint = *endpoint_iterator;
        m_socket.async_connect(endpoint,
                               boost::bind(&asio_connection::handle_connect,
                                           shared_from_this(),
                                           boost::asio::placeholders::error,
                                           ++m_host_iterator));
    }
    else {
        
m_io_service.post(boost::bind(&asio_connection::io_dispatch_error,
shared_from_this(),error));
    }
}

void
asio_connection::handle_connect(const boost::system::error_code& error,
                                tcp::resolver::iterator endpoint_iterator) {
    if (!error) {
        if (protocol_ptr p = wcp.lock()) {
            p->set_connection(shared_from_this());
            m_dispatcher->dispatch(make_callback("connection made",
boost::bind(&protocol::connection_made, p)));
        }
        // Now we start reading
        start_reading();
    }
    else {
        io_dispatch_error(error);
        lose_connection();
    }
}

bool
asio_connection::io_dispatch_error(const boost::system::error_code& error) {
    if (protocol_ptr p = wcp.lock()) {
        m_dispatcher->dispatch(make_callback("Error occured",
boost::bind(&protocol::error, p, error)));
        return true;
    }
    else {
        return false;
    }
}

void
asio_connection::lose_connection() {
    /* vaious cleanup */
}

void
asio_connection::io_send(const request_ptr &msg) {
    // if the request expects an answer, push it in the incoming
requests queu
    if (msg->get_type() != noreply_request) {
        incoming_requests.push(msg);
    }

    bool write_in_progress = !outgoing_requests.empty();

    outgoing_requests.push(msg);
    if (!write_in_progress) {
        std::string data(prepared_request(outgoing_requests.front()));
        boost::asio::async_write(m_socket,
                                 boost::asio::buffer(data.data(),
data.size()),
                                 
boost::bind(&asio_connection::handle_write_request, shared_from_this(),
                                             
boost::asio::placeholders::error));
    }
}

void
asio_connection::handle_write_request(const boost::system::error_code&
error) {
    if (!error) {
        outgoing_requests.pop();
        if (!outgoing_requests.empty()) {
            std::string data(prepared_request(outgoing_requests.front()));
            boost::asio::async_write(m_socket,
                                     boost::asio::buffer(data.data(),
data.size()),
                                     
boost::bind(&asio_connection::handle_write_request, shared_from_this(),
                                                 
boost::asio::placeholders::error));
        }
    }
    else {
        lose_connection();
    }
}

bool
asio_connection::send(const request_ptr &msg) {
    if (!msg.get()) return false;

    // make sure we do the real stuff on the io-thread
    m_io_service.post(boost::bind(&asio_connection::io_send,
shared_from_this(), msg));

    return true;
}

void
asio_connection::start_reading(unsigned bytes) {
    boost::asio::async_read(m_socket,
                            inbuffer,
                            boost::asio::transfer_at_least(bytes),
                            
boost::bind(&asio_connection::handle_read_info, shared_from_this(),
                                        boost::asio::placeholders::error,
                                        
boost::asio::placeholders::bytes_transferred));
}

bool
asio_connection::io_dispatch_response(response_ptr myresponse) {
    if (protocol_ptr p = wcp.lock()) {
        // make a callback and see what happens
        m_dispatcher->dispatch(make_callback("response received",
                                             
boost::bind(&protocol::dispatch_response, p, myresponse)));
        return true;
    }
    else {
        return false;
    }
}

void
asio_connection::handle_read_info(const boost::system::error_code& error,
                                  std::size_t tf) {
    if (!error) {
        unsigned next_amount = 0;
        read_status rs = _handle_read_info(next_amount);
        while (rs == rs_read_one_block)
            rs = _handle_read_info(next_amount);

        if (next_amount < 2)
            next_amount = 1;
        start_reading(next_amount);
    }
    else if (error.value() == boost::system::errc::operation_canceled) {
        // ignore this error for now, resolve it later
        lose_connection();
    }
    else {
        lose_connection();
    }
}

}


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net