Boost logo

Boost Users :

Subject: Re: [Boost-users] boost::asio blocking socket read with timeout with multiple threads
From: Richard Hodges (hodges.r_at_[hidden])
Date: 2018-03-16 12:57:38


Something like this should do it (I have used streambuf mechanics, but any
would suffice)

You need to ensure that some other thread is servicing the io_context

std::string timed_read_line(boost::asio::streambuf& buffer,
boost::asio::ip::tcp::socket& sock) {
    namespace asio = boost::asio;
    using boost::system::error_code;

    std::condition_variable cv;
    std::mutex mut;
    error_code op_error = error_code();
    bool done = false;

    auto get_lock = [&] { return std::unique_lock<std::mutex>(mut); };

    auto read_handler = [&](error_code ec, std::size_t sz)
    {
        auto lock = get_lock();
        if (not done)
        {
            done = true;
            op_error = ec;
        }
        lock.unlock();
        cv.notify_one();
    };

    auto timer_handler = [&](error_code ec)
    {
        auto lock = get_lock();
        if (not done)
        {
            done = true;
            op_error = asio::error::timed_out;
        }
    };

    asio::async_read_until(sock, buffer, '\n', read_handler);
    asio::deadline_timer timer(sock.get_io_context(),
boost::posix_time::seconds(3));
    timer.async_wait(timer_handler);

    auto lock = get_lock();
    cv.wait(lock, [&]{return done; });

    if (op_error)
    {
        throw boost::system::system_error(op_error);
    } else {
        std::istream is(std::addressof(buffer));
        std::string result;
        std::getline(is, result);
        return result;
    }
};

On 16 March 2018 at 07:46, Jeremi Piotrowski via Boost-users <
boost-users_at_[hidden]> wrote:

> On Thu, Mar 15, 2018 at 07:04:19PM +0000, Thomas Quarendon via Boost-users
> wrote:
> > The examples all revolve around the technique of starting an *async*
> read, then performing a nested loop of io_service::run_one. However, so far
> I've been unable to find a form of code that works reliably in a
> multithreaded environment.
> >
>
> I played around with this, and I don't really see how this can work
> reliably when called from _within_ the io_service. I don't belive the
> io_service was intended to be used in this re-entrant manner.
>
> > The basis for my experimentation is here: https://gist.github.com/
> anonymous/1160c11f8ed9c29b9184325191a3a63b
> > It starts a server thread, then starts a client that makes a connection
> and then writes nothing, to simulate a "bad" client and to provoke a
> "timeout" condition.
> >
> > [snip]
> >
> > With multiple threads, the handleReadTimeout/handleReadComplete
> callbacks are run on other threads. So the while loop here just blocks, as
> there is never anything to run. That's my surmise of what's going on
> anyway. I've experimented with strands to try and force it all onto the
> same thread, but so far failed (if the above code is called in the context
> of the same strand, it just seems to block the handleReadTimeout and
> handleReadComplete callbacks from being called).
> >
>
> Strands don't force it to the same thread, they just force the handlers to
> not be run concurrently. Anyway, I found I can make your example work if
> you add a separate io_service to execute the handlers for the blocking
> connection. I believe all the example solutions that you linked to also
> made the assumption that you wouldn't try to call the blocking reads from
> within the io_service as well.
>
> --- boost_asio_read_timeout 2018-03-16 08:15:18.877050171 +0100
> +++ asio2.cpp 2018-03-16 08:41:02.677071294 +0100
> @@ -31,12 +31,13 @@ public:
> };
>
> class BlockingConnection : public boost::enable_shared_from_this<BlockingConnection>
> {
> + boost::asio::io_service svc_;
> boost::asio::strand strand_;
> tcp::socket socket_;
> public:
>
> BlockingConnection(io_service& ioservice)
> - : strand_(ioservice), socket_(ioservice)
> + : strand_(svc_), socket_(svc_)
> {}
>
> tcp::socket& socket() {
> @@ -62,7 +63,8 @@ public:
> async_read(socket_, buffer(b),
> strand_.wrap(boost::bind(handleReadComplete, &ec,
> &bytesRead, &readComplete, &timer, _1, _2)));
>
> - while (socket_.get_io_service().run_one()) {
> + boost::asio::io_service::work work(svc_);
> + while (svc_.run_one()) {
> if (timedOut && readComplete) {
> break;
> }
> ---
>
> One thing to note: I thought I could get away with keeping
> BlockingConnection.socket_ in the initial io_service, but found that this
> will deadlock if all the threads of the initial io_service happen to be
> executing this code at the same time. In that case there may be no thread
> to service the actual 'async_read/timer' handlers (that in turn call
> strand_.dispatch). Moving the BlockingConnection.socket_ to
> BlockingConnection.svc_ fixes that.
>
> > The alternative formulation, the one I started with actually, is to do
> an async wait on the timer, and a normal synchronous read on the socket.
> The timer callback performs a cancel (or close -- I tried both) on the
> socket hoping to cause the socket read to error. This is the kind of thing
> you'd do if you were programming raw sockets. That works fine on Windows,
> but won't work on Linux. The documentation for cancel does say it cancels
> any outstanding *async* calls, but I'm surprised calling close doesn't work
> and cause the read to wake.
> >
>
> The documentation also states that socket objects are _not_ thread safe.
> Thats the real reason this doesn't work.
> _______________________________________________
> Boost-users mailing list
> Boost-users_at_[hidden]
> https://lists.boost.org/mailman/listinfo.cgi/boost-users
>



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net