Boost logo

Boost Users :

Subject: [Boost-users] Boost.Asio -- combining sync read with async read
From: Sameer Parekh (sameer_at_[hidden])
Date: 2009-10-14 08:53:58


Hello everyone! I am writing a client for a protocol which goes
something like this:

The client connects to the server, and it submits "orders" -- every
time it gets an order it gets back an "ack". However, the server might
asynchronously send a "fill" at any time. So I want to be listening
for a fill even while the client might be doing other things
(listening to another server for one-way data stream. The client then
chooses to submit an order or not based on the information it gets in
the datastream, or it might choose to submit an order if it receives a
fill.)

I thought the way to do it would be to do

async_read_until() and have it call my fill handler when it receives
something.

Then the function that submits orders would call

write()

and then block waiting for the ack message using

read_until()

However, when I implement that, the client never seems to notice that
a fill message is sent. It receives the acks just fine, and it
successfully reads the datafeed coming in from the other tcp
connection. I thought maybe the read_until() would cancel the
async_read_until() so after I received the ack message I called
async_read_until() again, but that didn't work either...

(The server works as it should, I tested it with telnet.)

Is it possible to mix sync/async reads in this way? Can anyone tell if
I am doing something wrong? Do I need to do something else to make
this work?

Thanks, I appreciate your help!

Here is the code in question:

(as an aside, is it bad form to call connect() in the constructor?
Should I call connect() and setup the async_read_until() in a separate
start() method?)

MarketConnectionTCP::MarketConnectionTCP(boost::asio::io_service
&io_service_, std::string ip_,
                unsigned short agentPort_, unsigned short feedPort_) :
        _io_service(io_service_), _agentSocket(_io_service),
_feedSocket(_io_service),
        _timer(_io_service, boost::posix_time::seconds(1))
{

        boost::asio::ip::address
ipAddress(boost::asio::ip::address::from_string(
                        ip_));

        tcp::endpoint agentEndpoint(ipAddress, agentPort_);
        Logger::stream() << "Trying to connect to agent:" << agentEndpoint
                        << std::endl;

        _agentSocket.connect(agentEndpoint);
        Logger::stream() << "Agent connected:" <<
_agentSocket.remote_endpoint()
                        << std::endl;

        boost::asio::ip::tcp::endpoint feedEndpoint(ipAddress, feedPort_);

        Logger::stream() << "Trying to connect to feed:" << feedEndpoint
                        << std::endl;
        _feedSocket.connect(feedEndpoint);

        Logger::stream() << "Feed connected:" << _feedSocket.remote_endpoint()
                        << std::endl;

        boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
boost::bind(
                        &MarketConnectionTCP::handleReadFill, this,
                        boost::asio::placeholders::error));
        Logger::stream() << "Listening on agent connection:"
                        << _agentSocket.remote_endpoint() << std::endl;

        boost::asio::async_read_until(_feedSocket, _feedBuf, "\r\n",
boost::bind(
                        &MarketConnectionTCP::handleReadFeed, this,
                        boost::asio::placeholders::error));
        Logger::stream() << "Listening on feed connection:"
                        << _feedSocket.remote_endpoint() << std::endl;

        _timer.async_wait(boost::bind(&MarketConnectionTCP::handleTimer, this,
                        boost::asio::placeholders::error));
        Logger::stream() << "Timer is waiting" << std::endl;

}

OrderID MarketConnectionTCP::sendOrder(const Order &order)
{
        boost::asio::streambuf orderBuf;
        std::ostream orderStream(&orderBuf);
        orderStream << order << "\r\n";

        // This blocks until we get an ack with the order id
        Logger::stream() << "Sending order to:" <<
_agentSocket.remote_endpoint()
                        << ":" << order << std::endl;
        boost::asio::write(_agentSocket, orderBuf);

        Logger::stream() << "Waiting for ack from:"
                        << _agentSocket.remote_endpoint() << std::endl;

        boost::asio::streambuf ackBuf;
        std::istream ackStream(&ackBuf);
        boost::asio::read_until(_agentSocket, ackBuf, "\r\n");

        OrderID id;
        ackStream >> id;
        Logger::stream() << "Received ack from:" <<
_agentSocket.remote_endpoint()
                        << ":" << id << std::endl;

        // listen again
        boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
                        boost::bind(&MarketConnectionTCP::handleReadFill, this,
                                        boost::asio::placeholders::error));
        Logger::stream() << "Listening on agent connection:"
                        << _agentSocket.remote_endpoint() << std::endl;

        return id;
}

void MarketConnectionTCP::handleReadFill(const
boost::system::error_code& err)
{
        if (!err)
        {
                std::istream fillStream(&_fillBuf);
                Fill newFill;
                fillStream >> newFill;

                if (newFill.isValid())
                {
                        Logger::stream() << "Fill received on:"
                                        << _agentSocket.remote_endpoint() << ":" << newFill
                                        << std::endl;
                        receiveFill(newFill);
                }
                else
                {
                        Logger::stream() << "Invalid fill received on:"
                                        << _agentSocket.remote_endpoint() << std::endl;
                }

                // listen again
                boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
                                boost::bind(&MarketConnectionTCP::handleReadFill, this,
                                                boost::asio::placeholders::error));
                Logger::stream() << "Listening on agent connection:"
                                << _agentSocket.remote_endpoint() << std::endl;
        }
        else
        {
                delete this;
        }
}



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net