Boost logo

Boost-Commit :

From: chris_at_[hidden]
Date: 2008-04-21 00:02:38


Author: chris_kohlhoff
Date: 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
New Revision: 44673
URL: http://svn.boost.org/trac/boost/changeset/44673

Log:
Add a special null_buffers type that allows read and write operations to
be used to indicate the socket's readiness to read or write without
blocking.

Added:
   trunk/libs/asio/example/nonblocking/
   trunk/libs/asio/example/nonblocking/Jamfile (contents, props changed)
   trunk/libs/asio/example/nonblocking/Jamfile.v2 (contents, props changed)
   trunk/libs/asio/example/nonblocking/third_party_lib.cpp (contents, props changed)
Text files modified:
   trunk/boost/asio/buffer.hpp | 27 +++
   trunk/boost/asio/detail/consuming_buffers.hpp | 20 ++
   trunk/boost/asio/detail/dev_poll_reactor.hpp | 20 +-
   trunk/boost/asio/detail/epoll_reactor.hpp | 14 +
   trunk/boost/asio/detail/kqueue_reactor.hpp | 14 +
   trunk/boost/asio/detail/reactive_socket_service.hpp | 180 ++++++++++++++++++++++++++
   trunk/boost/asio/detail/select_reactor.hpp | 6
   trunk/boost/asio/detail/win_iocp_socket_service.hpp | 275 +++++++++++++++++++++++++++++++++++++++
   trunk/libs/asio/test/ip/tcp.cpp | 15 ++
   trunk/libs/asio/test/ip/udp.cpp | 37 +++++
   10 files changed, 589 insertions(+), 19 deletions(-)

Modified: trunk/boost/asio/buffer.hpp
==============================================================================
--- trunk/boost/asio/buffer.hpp (original)
+++ trunk/boost/asio/buffer.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -379,6 +379,33 @@
   }
 };
 
+/// An implementation of both the ConstBufferSequence and MutableBufferSequence
+/// concepts to represent a null buffer sequence.
+class null_buffers
+{
+public:
+ /// The type for each element in the list of buffers.
+ typedef mutable_buffer value_type;
+
+ /// A random-access iterator type that may be used to read elements.
+ typedef const mutable_buffer* const_iterator;
+
+ /// Get a random-access iterator to the first element.
+ const_iterator begin() const
+ {
+ return &buf_;
+ }
+
+ /// Get a random-access iterator for one past the last element.
+ const_iterator end() const
+ {
+ return &buf_;
+ }
+
+private:
+ mutable_buffer buf_;
+};
+
 #if defined(BOOST_ASIO_ENABLE_BUFFER_DEBUGGING)
 namespace detail {
 

Modified: trunk/boost/asio/detail/consuming_buffers.hpp
==============================================================================
--- trunk/boost/asio/detail/consuming_buffers.hpp (original)
+++ trunk/boost/asio/detail/consuming_buffers.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -24,6 +24,8 @@
 #include <boost/iterator/iterator_facade.hpp>
 #include <boost/asio/detail/pop_options.hpp>
 
+#include <boost/asio/buffer.hpp>
+
 namespace boost {
 namespace asio {
 namespace detail {
@@ -198,6 +200,24 @@
   typename Buffers::const_iterator begin_remainder_;
 };
 
+// Specialisation for null_buffers to ensure that the null_buffers type is
+// always passed through to the underlying read or write operation.
+template <typename Buffer>
+class consuming_buffers<Buffer, boost::asio::null_buffers>
+ : public boost::asio::null_buffers
+{
+public:
+ consuming_buffers(const boost::asio::null_buffers&)
+ {
+ // No-op.
+ }
+
+ void consume(std::size_t)
+ {
+ // No-op.
+ }
+};
+
 } // namespace detail
 } // namespace asio
 } // namespace boost

Modified: trunk/boost/asio/detail/dev_poll_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/dev_poll_reactor.hpp (original)
+++ trunk/boost/asio/detail/dev_poll_reactor.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -128,16 +128,18 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler)
+ void start_read_op(socket_type descriptor, Handler handler,
+ bool allow_speculative_read = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- if (!read_op_queue_.has_operation(descriptor))
- if (handler(boost::system::error_code()))
- return;
+ if (allow_speculative_read)
+ if (!read_op_queue_.has_operation(descriptor))
+ if (handler(boost::system::error_code()))
+ return;
 
     if (read_op_queue_.enqueue_operation(descriptor, handler))
     {
@@ -154,16 +156,18 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler)
+ void start_write_op(socket_type descriptor, Handler handler,
+ bool allow_speculative_write = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- if (!write_op_queue_.has_operation(descriptor))
- if (handler(boost::system::error_code()))
- return;
+ if (allow_speculative_write)
+ if (!write_op_queue_.has_operation(descriptor))
+ if (handler(boost::system::error_code()))
+ return;
 
     if (write_op_queue_.enqueue_operation(descriptor, handler))
     {

Modified: trunk/boost/asio/detail/epoll_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/epoll_reactor.hpp (original)
+++ trunk/boost/asio/detail/epoll_reactor.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -135,14 +135,17 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler)
+ void start_read_op(socket_type descriptor, Handler handler,
+ bool allow_speculative_read = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- if (!read_op_queue_.has_operation(descriptor))
+ if (!allow_speculative_read)
+ need_epoll_wait_ = true;
+ else if (!read_op_queue_.has_operation(descriptor))
       if (handler(boost::system::error_code()))
         return;
 
@@ -171,14 +174,17 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler)
+ void start_write_op(socket_type descriptor, Handler handler,
+ bool allow_speculative_write = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- if (!write_op_queue_.has_operation(descriptor))
+ if (!allow_speculative_write)
+ need_epoll_wait_ = true;
+ else if (!write_op_queue_.has_operation(descriptor))
       if (handler(boost::system::error_code()))
         return;
 

Modified: trunk/boost/asio/detail/kqueue_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/kqueue_reactor.hpp (original)
+++ trunk/boost/asio/detail/kqueue_reactor.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -135,14 +135,17 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler)
+ void start_read_op(socket_type descriptor, Handler handler,
+ bool allow_speculative_read = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- if (!read_op_queue_.has_operation(descriptor))
+ if (!allow_speculative_read)
+ need_kqueue_wait_ = true;
+ else if (!read_op_queue_.has_operation(descriptor))
       if (handler(boost::system::error_code()))
         return;
 
@@ -162,14 +165,17 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler)
+ void start_write_op(socket_type descriptor, Handler handler,
+ bool allow_speculative_write = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- if (!write_op_queue_.has_operation(descriptor))
+ if (!allow_speculative_write)
+ need_kqueue_wait_ = true;
+ else if (!write_op_queue_.has_operation(descriptor))
       if (handler(boost::system::error_code()))
         return;
 

Modified: trunk/boost/asio/detail/reactive_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/reactive_socket_service.hpp (original)
+++ trunk/boost/asio/detail/reactive_socket_service.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -559,6 +559,22 @@
     }
   }
 
+ // Wait until data can be sent without blocking.
+ size_t send(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_write(impl.socket_, ec);
+
+ return 0;
+ }
+
   template <typename ConstBufferSequence, typename Handler>
   class send_handler
   {
@@ -673,6 +689,45 @@
     }
   }
 
+ template <typename Handler>
+ class null_buffers_handler
+ {
+ public:
+ null_buffers_handler(boost::asio::io_service& io_service, Handler handler)
+ : work_(io_service),
+ handler_(handler)
+ {
+ }
+
+ bool operator()(const boost::system::error_code& result)
+ {
+ work_.get_io_service().post(bind_handler(handler_, result, 0));
+ return true;
+ }
+
+ private:
+ boost::asio::io_service::work work_;
+ Handler handler_;
+ };
+
+ // Start an asynchronous wait until data can be sent without blocking.
+ template <typename Handler>
+ void async_send(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else
+ {
+ reactor_.start_write_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+
   // Send a datagram to the specified endpoint. Returns the number of bytes
   // sent.
   template <typename ConstBufferSequence>
@@ -734,6 +789,23 @@
     }
   }
 
+ // Wait until data can be sent without blocking.
+ size_t send_to(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, const endpoint_type&,
+ boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_write(impl.socket_, ec);
+
+ return 0;
+ }
+
   template <typename ConstBufferSequence, typename Handler>
   class send_to_handler
   {
@@ -832,6 +904,24 @@
     }
   }
 
+ // Start an asynchronous wait until data can be sent without blocking.
+ template <typename Handler>
+ void async_send_to(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, const endpoint_type&, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else
+ {
+ reactor_.start_write_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+
   // Receive some data from the peer. Returns the number of bytes received.
   template <typename MutableBufferSequence>
   size_t receive(implementation_type& impl,
@@ -907,6 +997,23 @@
     }
   }
 
+ // Wait until data can be received without blocking.
+ size_t receive(implementation_type& impl,
+ const null_buffers& buffers,
+ socket_base::message_flags, boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_read(impl.socket_, ec);
+
+ return 0;
+ }
+
   template <typename MutableBufferSequence, typename Handler>
   class receive_handler
   {
@@ -1033,6 +1140,29 @@
     }
   }
 
+ // Wait until data can be received without blocking.
+ template <typename Handler>
+ void async_receive(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags flags, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else if (flags & socket_base::message_out_of_band)
+ {
+ reactor_.start_except_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler));
+ }
+ else
+ {
+ reactor_.start_read_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+
   // Receive a datagram with the endpoint of the sender. Returns the number of
   // bytes received.
   template <typename MutableBufferSequence>
@@ -1106,6 +1236,26 @@
     }
   }
 
+ // Wait until data can be received without blocking.
+ size_t receive_from(implementation_type& impl,
+ const null_buffers& buffers, endpoint_type& sender_endpoint,
+ socket_base::message_flags, boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_read(impl.socket_, ec);
+
+ // Reset endpoint since it can be given no sensible value at this time.
+ sender_endpoint = endpoint_type();
+
+ return 0;
+ }
+
   template <typename MutableBufferSequence, typename Handler>
   class receive_from_handler
   {
@@ -1209,6 +1359,36 @@
     }
   }
 
+ // Wait until data can be received without blocking.
+ template <typename Handler>
+ void async_receive_from(implementation_type& impl,
+ const null_buffers&, endpoint_type& sender_endpoint,
+ socket_base::message_flags flags, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else
+ {
+ // Reset endpoint since it can be given no sensible value at this time.
+ sender_endpoint = endpoint_type();
+
+ if (flags & socket_base::message_out_of_band)
+ {
+ reactor_.start_except_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler));
+ }
+ else
+ {
+ reactor_.start_read_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+ }
+
   // Accept a new connection.
   template <typename Socket>
   boost::system::error_code accept(implementation_type& impl,

Modified: trunk/boost/asio/detail/select_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/select_reactor.hpp (original)
+++ trunk/boost/asio/detail/select_reactor.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -115,7 +115,8 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler)
+ void start_read_op(socket_type descriptor, Handler handler,
+ bool /*allow_speculative_read*/ = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     if (!shutdown_)
@@ -126,7 +127,8 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler)
+ void start_write_op(socket_type descriptor, Handler handler,
+ bool /*allow_speculative_write*/ = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     if (!shutdown_)

Modified: trunk/boost/asio/detail/win_iocp_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_socket_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_socket_service.hpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -24,6 +24,7 @@
 #include <boost/asio/detail/push_options.hpp>
 #include <cstring>
 #include <boost/shared_ptr.hpp>
+#include <boost/type_traits/is_same.hpp>
 #include <boost/weak_ptr.hpp>
 #include <boost/asio/detail/pop_options.hpp>
 
@@ -700,6 +701,22 @@
     return bytes_transferred;
   }
 
+ // Wait until data can be sent without blocking.
+ size_t send(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_write(impl.socket_, ec);
+
+ return 0;
+ }
+
   template <typename ConstBufferSequence, typename Handler>
   class send_operation
     : public operation
@@ -858,6 +875,57 @@
     }
   }
 
+ template <typename Handler>
+ class null_buffers_handler
+ {
+ public:
+ null_buffers_handler(boost::asio::io_service& io_service, Handler handler)
+ : work_(io_service),
+ handler_(handler)
+ {
+ }
+
+ bool operator()(const boost::system::error_code& result)
+ {
+ work_.get_io_service().post(bind_handler(handler_, result, 0));
+ return true;
+ }
+
+ private:
+ boost::asio::io_service::work work_;
+ Handler handler_;
+ };
+
+ // Start an asynchronous wait until data can be sent without blocking.
+ template <typename Handler>
+ void async_send(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else
+ {
+ // Check if the reactor was already obtained from the io_service.
+ reactor_type* reactor = static_cast<reactor_type*>(
+ interlocked_compare_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), 0, 0));
+ if (!reactor)
+ {
+ reactor = &(boost::asio::use_service<reactor_type>(
+ this->get_io_service()));
+ interlocked_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), reactor);
+ }
+
+ reactor->start_write_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+
   // Send a datagram to the specified endpoint. Returns the number of bytes
   // sent.
   template <typename ConstBufferSequence>
@@ -902,6 +970,23 @@
     return bytes_transferred;
   }
 
+ // Wait until data can be sent without blocking.
+ size_t send_to(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, const endpoint_type&,
+ boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_write(impl.socket_, ec);
+
+ return 0;
+ }
+
   template <typename ConstBufferSequence, typename Handler>
   class send_to_operation
     : public operation
@@ -1038,6 +1123,36 @@
     }
   }
 
+ // Start an asynchronous wait until data can be sent without blocking.
+ template <typename Handler>
+ void async_send_to(implementation_type& impl, const null_buffers&,
+ socket_base::message_flags, const endpoint_type&, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else
+ {
+ // Check if the reactor was already obtained from the io_service.
+ reactor_type* reactor = static_cast<reactor_type*>(
+ interlocked_compare_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), 0, 0));
+ if (!reactor)
+ {
+ reactor = &(boost::asio::use_service<reactor_type>(
+ this->get_io_service()));
+ interlocked_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), reactor);
+ }
+
+ reactor->start_write_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+
   // Receive some data from the peer. Returns the number of bytes received.
   template <typename MutableBufferSequence>
   size_t receive(implementation_type& impl,
@@ -1097,6 +1212,23 @@
     return bytes_transferred;
   }
 
+ // Wait until data can be received without blocking.
+ size_t receive(implementation_type& impl,
+ const null_buffers& buffers,
+ socket_base::message_flags, boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_read(impl.socket_, ec);
+
+ return 0;
+ }
+
   template <typename MutableBufferSequence, typename Handler>
   class receive_operation
     : public operation
@@ -1157,7 +1289,8 @@
       }
 
       // Check for connection closed.
- else if (!ec && bytes_transferred == 0)
+ else if (!ec && bytes_transferred == 0
+ && !boost::is_same<MutableBufferSequence, null_buffers>::value)
       {
         ec = boost::asio::error::eof;
       }
@@ -1262,6 +1395,84 @@
     }
   }
 
+ // Wait until data can be received without blocking.
+ template <typename Handler>
+ void async_receive(implementation_type& impl, const null_buffers& buffers,
+ socket_base::message_flags flags, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else if (impl.protocol_.type() == SOCK_STREAM)
+ {
+ // For stream sockets on Windows, we may issue a 0-byte overlapped
+ // WSARecv to wait until there is data available on the socket.
+
+#if defined(BOOST_ASIO_ENABLE_CANCELIO)
+ // Update the ID of the thread from which cancellation is safe.
+ if (impl.safe_cancellation_thread_id_ == 0)
+ impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
+ else if (impl.safe_cancellation_thread_id_ != ::GetCurrentThreadId())
+ impl.safe_cancellation_thread_id_ = ~DWORD(0);
+#endif // defined(BOOST_ASIO_ENABLE_CANCELIO)
+
+ // Allocate and construct an operation to wrap the handler.
+ typedef receive_operation<null_buffers, Handler> value_type;
+ typedef handler_alloc_traits<Handler, value_type> alloc_traits;
+ raw_handler_ptr<alloc_traits> raw_ptr(handler);
+ handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
+ impl.cancel_token_, buffers, handler);
+
+ // Issue a receive operation with an empty buffer.
+ ::WSABUF buf = { 0, 0 };
+ DWORD bytes_transferred = 0;
+ DWORD recv_flags = flags;
+ int result = ::WSARecv(impl.socket_, &buf, 1,
+ &bytes_transferred, &recv_flags, ptr.get(), 0);
+ DWORD last_error = ::WSAGetLastError();
+ if (result != 0 && last_error != WSA_IO_PENDING)
+ {
+ boost::asio::io_service::work work(this->get_io_service());
+ ptr.reset();
+ boost::system::error_code ec(last_error,
+ boost::asio::error::get_system_category());
+ iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ }
+ else
+ {
+ ptr.release();
+ }
+ }
+ else
+ {
+ // Check if the reactor was already obtained from the io_service.
+ reactor_type* reactor = static_cast<reactor_type*>(
+ interlocked_compare_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), 0, 0));
+ if (!reactor)
+ {
+ reactor = &(boost::asio::use_service<reactor_type>(
+ this->get_io_service()));
+ interlocked_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), reactor);
+ }
+
+ if (flags & socket_base::message_out_of_band)
+ {
+ reactor->start_except_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler));
+ }
+ else
+ {
+ reactor->start_read_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+ }
+
   // Receive a datagram with the endpoint of the sender. Returns the number of
   // bytes received.
   template <typename MutableBufferSequence>
@@ -1315,6 +1526,26 @@
     return bytes_transferred;
   }
 
+ // Wait until data can be received without blocking.
+ size_t receive_from(implementation_type& impl,
+ const null_buffers& buffers, endpoint_type& sender_endpoint,
+ socket_base::message_flags, boost::system::error_code& ec)
+ {
+ if (!is_open(impl))
+ {
+ ec = boost::asio::error::bad_descriptor;
+ return 0;
+ }
+
+ // Wait for socket to become ready.
+ socket_ops::poll_read(impl.socket_, ec);
+
+ // Reset endpoint since it can be given no sensible value at this time.
+ sender_endpoint = endpoint_type();
+
+ return 0;
+ }
+
   template <typename MutableBufferSequence, typename Handler>
   class receive_from_operation
     : public operation
@@ -1473,6 +1704,48 @@
     }
   }
 
+ // Wait until data can be received without blocking.
+ template <typename Handler>
+ void async_receive_from(implementation_type& impl,
+ const null_buffers&, endpoint_type& sender_endpoint,
+ socket_base::message_flags flags, Handler handler)
+ {
+ if (!is_open(impl))
+ {
+ this->get_io_service().post(bind_handler(handler,
+ boost::asio::error::bad_descriptor, 0));
+ }
+ else
+ {
+ // Check if the reactor was already obtained from the io_service.
+ reactor_type* reactor = static_cast<reactor_type*>(
+ interlocked_compare_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), 0, 0));
+ if (!reactor)
+ {
+ reactor = &(boost::asio::use_service<reactor_type>(
+ this->get_io_service()));
+ interlocked_exchange_pointer(
+ reinterpret_cast<void**>(&reactor_), reactor);
+ }
+
+ // Reset endpoint since it can be given no sensible value at this time.
+ sender_endpoint = endpoint_type();
+
+ if (flags & socket_base::message_out_of_band)
+ {
+ reactor->start_except_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler));
+ }
+ else
+ {
+ reactor->start_read_op(impl.socket_,
+ null_buffers_handler<Handler>(this->get_io_service(), handler),
+ false);
+ }
+ }
+ }
+
   // Accept a new connection.
   template <typename Socket>
   boost::system::error_code accept(implementation_type& impl, Socket& peer,

Added: trunk/libs/asio/example/nonblocking/Jamfile
==============================================================================
--- (empty file)
+++ trunk/libs/asio/example/nonblocking/Jamfile 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -0,0 +1,33 @@
+#
+# Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+#
+# Distributed under the Boost Software License, Version 1.0. (See accompanying
+# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+#
+
+subproject libs/asio/example/nonblocking ;
+
+project boost : $(BOOST_ROOT) ;
+
+if $(UNIX)
+{
+ switch $(JAMUNAME)
+ {
+ case SunOS* :
+ {
+ SOCKET_LIBS = <find-library>socket <find-library>nsl ;
+ }
+ }
+}
+
+exe third_party_lib
+ : <lib>@boost/libs/system/build/boost_system
+ third_party_lib.cpp
+ : <include>$(BOOST_ROOT)
+ <include>../../../..
+ <define>BOOST_ALL_NO_LIB=1
+ <threading>multi
+ <mingw><*><find-library>ws2_32
+ <mingw><*><find-library>mswsock
+ $(SOCKET_LIBS)
+ ;

Added: trunk/libs/asio/example/nonblocking/Jamfile.v2
==============================================================================
--- (empty file)
+++ trunk/libs/asio/example/nonblocking/Jamfile.v2 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -0,0 +1,38 @@
+#
+# Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+#
+# Distributed under the Boost Software License, Version 1.0. (See accompanying
+# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+#
+
+import os ;
+
+if [ os.name ] = SOLARIS
+{
+ lib socket ;
+ lib nsl ;
+}
+else if [ os.name ] = NT
+{
+ lib ws2_32 ;
+ lib mswsock ;
+}
+else if [ os.name ] = HPUX
+{
+ lib ipv6 ;
+}
+
+exe third_party_lib
+ : third_party_lib.cpp
+ /boost/system//boost_system
+ : <define>BOOST_ALL_NO_LIB=1
+ <threading>multi
+ <os>SOLARIS:<library>socket
+ <os>SOLARIS:<library>nsl
+ <os>NT:<define>_WIN32_WINNT=0x0501
+ <os>NT,<toolset>gcc:<library>ws2_32
+ <os>NT,<toolset>gcc:<library>mswsock
+ <os>NT,<toolset>gcc-cygwin:<define>__USE_W32_SOCKETS
+ <os>HPUX,<toolset>gcc:<define>_XOPEN_SOURCE_EXTENDED
+ <os>HPUX:<library>ipv6
+ ;

Added: trunk/libs/asio/example/nonblocking/third_party_lib.cpp
==============================================================================
--- (empty file)
+++ trunk/libs/asio/example/nonblocking/third_party_lib.cpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -0,0 +1,241 @@
+//
+// third_party_lib.cpp
+// ~~~~~~~~~~~~~~~~~~~
+//
+// Copyright (c) 2003-2008 Christopher M. Kohlhoff (chris at kohlhoff dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/asio.hpp>
+#include <boost/array.hpp>
+#include <boost/bind.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
+
+using boost::asio::ip::tcp;
+
+namespace third_party_lib {
+
+// Simulation of a third party library that wants to perform read and write
+// operations directly on a socket. It needs to be polled to determine whether
+// it requires a read or write operation, and notified when the socket is ready
+// for reading or writing.
+class session
+{
+public:
+ session(tcp::socket& socket)
+ : socket_(socket),
+ state_(reading)
+ {
+ }
+
+ // Returns true if the third party library wants to be notified when the
+ // socket is ready for reading.
+ bool want_read() const
+ {
+ return state_ == reading;
+ }
+
+ // Notify that third party library that it should perform its read operation.
+ void do_read(boost::system::error_code& ec)
+ {
+ if (std::size_t len = socket_.read_some(boost::asio::buffer(data_), ec))
+ {
+ write_buffer_ = boost::asio::buffer(data_, len);
+ state_ = writing;
+ }
+ }
+
+ // Returns true if the third party library wants to be notified when the
+ // socket is ready for writing.
+ bool want_write() const
+ {
+ return state_ == writing;
+ }
+
+ // Notify that third party library that it should perform its write operation.
+ void do_write(boost::system::error_code& ec)
+ {
+ if (std::size_t len = socket_.write_some(
+ boost::asio::buffer(write_buffer_), ec))
+ {
+ write_buffer_ = write_buffer_ + len;
+ state_ = boost::asio::buffer_size(write_buffer_) > 0 ? writing : reading;
+ }
+ }
+
+private:
+ tcp::socket& socket_;
+ enum { reading, writing } state_;
+ boost::array<char, 128> data_;
+ boost::asio::const_buffer write_buffer_;
+};
+
+} // namespace third_party_lib
+
+// The glue between asio's sockets and the third party library.
+class connection
+ : public boost::enable_shared_from_this<connection>
+{
+public:
+ typedef boost::shared_ptr<connection> pointer;
+
+ static pointer create(boost::asio::io_service& io_service)
+ {
+ return pointer(new connection(io_service));
+ }
+
+ tcp::socket& socket()
+ {
+ return socket_;
+ }
+
+ void start()
+ {
+ // Put the socket into non-blocking mode.
+ tcp::socket::non_blocking_io non_blocking_io(true);
+ socket_.io_control(non_blocking_io);
+
+ start_operations();
+ }
+
+private:
+ connection(boost::asio::io_service& io_service)
+ : socket_(io_service),
+ session_impl_(socket_),
+ read_in_progress_(false),
+ write_in_progress_(false)
+ {
+ }
+
+ void start_operations()
+ {
+ // Start a read operation if the third party library wants one.
+ if (session_impl_.want_read() && !read_in_progress_)
+ {
+ read_in_progress_ = true;
+ socket_.async_read_some(
+ boost::asio::null_buffers(),
+ boost::bind(&connection::handle_read,
+ shared_from_this(),
+ boost::asio::placeholders::error));
+ }
+
+ // Start a write operation if the third party library wants one.
+ if (session_impl_.want_write() && !write_in_progress_)
+ {
+ write_in_progress_ = true;
+ socket_.async_write_some(
+ boost::asio::null_buffers(),
+ boost::bind(&connection::handle_write,
+ shared_from_this(),
+ boost::asio::placeholders::error));
+ }
+ }
+
+ void handle_read(boost::system::error_code ec)
+ {
+ read_in_progress_ = false;
+
+ // Notify third party library that it can perform a read.
+ if (!ec)
+ session_impl_.do_read(ec);
+
+ // The third party library successfully performed a read on the socket.
+ // Start new read or write operations based on what it now wants.
+ if (!ec || ec == boost::asio::error::would_block)
+ start_operations();
+
+ // Otherwise, an error occurred. Closing the socket cancels any outstanding
+ // asynchronous read or write operations. The connection object will be
+ // destroyed once automatically once those outstanding operations complete.
+ else
+ socket_.close();
+ }
+
+ void handle_write(boost::system::error_code ec)
+ {
+ write_in_progress_ = false;
+
+ // Notify third party library that it can perform a write.
+ if (!ec)
+ session_impl_.do_write(ec);
+
+ // The third party library successfully performed a write on the socket.
+ // Start new read or write operations based on what it now wants.
+ if (!ec || ec == boost::asio::error::would_block)
+ start_operations();
+
+ // Otherwise, an error occurred. Closing the socket cancels any outstanding
+ // asynchronous read or write operations. The connection object will be
+ // destroyed once automatically once those outstanding operations complete.
+ else
+ socket_.close();
+ }
+
+private:
+ tcp::socket socket_;
+ third_party_lib::session session_impl_;
+ bool read_in_progress_;
+ bool write_in_progress_;
+};
+
+class server
+{
+public:
+ server(boost::asio::io_service& io_service, unsigned short port)
+ : acceptor_(io_service, tcp::endpoint(tcp::v4(), port))
+ {
+ start_accept();
+ }
+
+private:
+ void start_accept()
+ {
+ connection::pointer new_connection =
+ connection::create(acceptor_.io_service());
+
+ acceptor_.async_accept(new_connection->socket(),
+ boost::bind(&server::handle_accept, this, new_connection,
+ boost::asio::placeholders::error));
+ }
+
+ void handle_accept(connection::pointer new_connection,
+ const boost::system::error_code& error)
+ {
+ if (!error)
+ {
+ new_connection->start();
+ start_accept();
+ }
+ }
+
+ tcp::acceptor acceptor_;
+};
+
+int main(int argc, char* argv[])
+{
+ try
+ {
+ if (argc != 2)
+ {
+ std::cerr << "Usage: third_party_lib <port>\n";
+ return 1;
+ }
+
+ boost::asio::io_service io_service;
+
+ using namespace std; // For atoi.
+ server s(io_service, atoi(argv[1]));
+
+ io_service.run();
+ }
+ catch (std::exception& e)
+ {
+ std::cerr << "Exception: " << e.what() << "\n";
+ }
+
+ return 0;
+}

Modified: trunk/libs/asio/test/ip/tcp.cpp
==============================================================================
--- trunk/libs/asio/test/ip/tcp.cpp (original)
+++ trunk/libs/asio/test/ip/tcp.cpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -239,36 +239,51 @@
 
     socket1.send(buffer(mutable_char_buffer));
     socket1.send(buffer(const_char_buffer));
+ socket1.send(null_buffers());
     socket1.send(buffer(mutable_char_buffer), in_flags);
     socket1.send(buffer(const_char_buffer), in_flags);
+ socket1.send(null_buffers(), in_flags);
     socket1.send(buffer(mutable_char_buffer), in_flags, ec);
     socket1.send(buffer(const_char_buffer), in_flags, ec);
+ socket1.send(null_buffers(), in_flags, ec);
 
     socket1.async_send(buffer(mutable_char_buffer), send_handler);
     socket1.async_send(buffer(const_char_buffer), send_handler);
+ socket1.async_send(null_buffers(), send_handler);
     socket1.async_send(buffer(mutable_char_buffer), in_flags, send_handler);
     socket1.async_send(buffer(const_char_buffer), in_flags, send_handler);
+ socket1.async_send(null_buffers(), in_flags, send_handler);
 
     socket1.receive(buffer(mutable_char_buffer));
+ socket1.receive(null_buffers());
     socket1.receive(buffer(mutable_char_buffer), in_flags);
+ socket1.receive(null_buffers(), in_flags);
     socket1.receive(buffer(mutable_char_buffer), in_flags, ec);
+ socket1.receive(null_buffers(), in_flags, ec);
 
     socket1.async_receive(buffer(mutable_char_buffer), receive_handler);
+ socket1.async_receive(null_buffers(), receive_handler);
     socket1.async_receive(buffer(mutable_char_buffer), in_flags,
         receive_handler);
+ socket1.async_receive(null_buffers(), in_flags, receive_handler);
 
     socket1.write_some(buffer(mutable_char_buffer));
     socket1.write_some(buffer(const_char_buffer));
+ socket1.write_some(null_buffers());
     socket1.write_some(buffer(mutable_char_buffer), ec);
     socket1.write_some(buffer(const_char_buffer), ec);
+ socket1.write_some(null_buffers(), ec);
 
     socket1.async_write_some(buffer(mutable_char_buffer), write_some_handler);
     socket1.async_write_some(buffer(const_char_buffer), write_some_handler);
+ socket1.async_write_some(null_buffers(), write_some_handler);
 
     socket1.read_some(buffer(mutable_char_buffer));
     socket1.read_some(buffer(mutable_char_buffer), ec);
+ socket1.read_some(null_buffers(), ec);
 
     socket1.async_read_some(buffer(mutable_char_buffer), read_some_handler);
+ socket1.async_read_some(null_buffers(), read_some_handler);
   }
   catch (std::exception&)
   {

Modified: trunk/libs/asio/test/ip/udp.cpp
==============================================================================
--- trunk/libs/asio/test/ip/udp.cpp (original)
+++ trunk/libs/asio/test/ip/udp.cpp 2008-04-21 00:02:37 EDT (Mon, 21 Apr 2008)
@@ -144,15 +144,20 @@
 
     socket1.send(buffer(mutable_char_buffer));
     socket1.send(buffer(const_char_buffer));
+ socket1.send(null_buffers());
     socket1.send(buffer(mutable_char_buffer), in_flags);
     socket1.send(buffer(const_char_buffer), in_flags);
+ socket1.send(null_buffers(), in_flags);
     socket1.send(buffer(mutable_char_buffer), in_flags, ec);
     socket1.send(buffer(const_char_buffer), in_flags, ec);
+ socket1.send(null_buffers(), in_flags, ec);
 
     socket1.async_send(buffer(mutable_char_buffer), send_handler);
     socket1.async_send(buffer(const_char_buffer), send_handler);
+ socket1.async_send(null_buffers(), send_handler);
     socket1.async_send(buffer(mutable_char_buffer), in_flags, send_handler);
     socket1.async_send(buffer(const_char_buffer), in_flags, send_handler);
+ socket1.async_send(null_buffers(), in_flags, send_handler);
 
     socket1.send_to(buffer(mutable_char_buffer),
         ip::udp::endpoint(ip::udp::v4(), 0));
@@ -162,6 +167,10 @@
         ip::udp::endpoint(ip::udp::v4(), 0));
     socket1.send_to(buffer(const_char_buffer),
         ip::udp::endpoint(ip::udp::v6(), 0));
+ socket1.send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v4(), 0));
+ socket1.send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v6(), 0));
     socket1.send_to(buffer(mutable_char_buffer),
         ip::udp::endpoint(ip::udp::v4(), 0), in_flags);
     socket1.send_to(buffer(mutable_char_buffer),
@@ -170,6 +179,10 @@
         ip::udp::endpoint(ip::udp::v4(), 0), in_flags);
     socket1.send_to(buffer(const_char_buffer),
         ip::udp::endpoint(ip::udp::v6(), 0), in_flags);
+ socket1.send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v4(), 0), in_flags);
+ socket1.send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v6(), 0), in_flags);
     socket1.send_to(buffer(mutable_char_buffer),
         ip::udp::endpoint(ip::udp::v4(), 0), in_flags, ec);
     socket1.send_to(buffer(mutable_char_buffer),
@@ -178,6 +191,10 @@
         ip::udp::endpoint(ip::udp::v4(), 0), in_flags, ec);
     socket1.send_to(buffer(const_char_buffer),
         ip::udp::endpoint(ip::udp::v6(), 0), in_flags, ec);
+ socket1.send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v4(), 0), in_flags, ec);
+ socket1.send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v6(), 0), in_flags, ec);
 
     socket1.async_send_to(buffer(mutable_char_buffer),
         ip::udp::endpoint(ip::udp::v4(), 0), send_handler);
@@ -187,6 +204,10 @@
         ip::udp::endpoint(ip::udp::v4(), 0), send_handler);
     socket1.async_send_to(buffer(const_char_buffer),
         ip::udp::endpoint(ip::udp::v6(), 0), send_handler);
+ socket1.async_send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v4(), 0), send_handler);
+ socket1.async_send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v6(), 0), send_handler);
     socket1.async_send_to(buffer(mutable_char_buffer),
         ip::udp::endpoint(ip::udp::v4(), 0), in_flags, send_handler);
     socket1.async_send_to(buffer(mutable_char_buffer),
@@ -195,24 +216,40 @@
         ip::udp::endpoint(ip::udp::v4(), 0), in_flags, send_handler);
     socket1.async_send_to(buffer(const_char_buffer),
         ip::udp::endpoint(ip::udp::v6(), 0), in_flags, send_handler);
+ socket1.async_send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v4(), 0), in_flags, send_handler);
+ socket1.async_send_to(null_buffers(),
+ ip::udp::endpoint(ip::udp::v6(), 0), in_flags, send_handler);
 
     socket1.receive(buffer(mutable_char_buffer));
+ socket1.receive(null_buffers());
     socket1.receive(buffer(mutable_char_buffer), in_flags);
+ socket1.receive(null_buffers(), in_flags);
     socket1.receive(buffer(mutable_char_buffer), in_flags, ec);
+ socket1.receive(null_buffers(), in_flags, ec);
 
     socket1.async_receive(buffer(mutable_char_buffer), receive_handler);
+ socket1.async_receive(null_buffers(), receive_handler);
     socket1.async_receive(buffer(mutable_char_buffer), in_flags,
         receive_handler);
+ socket1.async_receive(null_buffers(), in_flags, receive_handler);
 
     ip::udp::endpoint endpoint;
     socket1.receive_from(buffer(mutable_char_buffer), endpoint);
+ socket1.receive_from(null_buffers(), endpoint);
     socket1.receive_from(buffer(mutable_char_buffer), endpoint, in_flags);
+ socket1.receive_from(null_buffers(), endpoint, in_flags);
     socket1.receive_from(buffer(mutable_char_buffer), endpoint, in_flags, ec);
+ socket1.receive_from(null_buffers(), endpoint, in_flags, ec);
 
     socket1.async_receive_from(buffer(mutable_char_buffer),
         endpoint, receive_handler);
+ socket1.async_receive_from(null_buffers(),
+ endpoint, receive_handler);
     socket1.async_receive_from(buffer(mutable_char_buffer),
         endpoint, in_flags, receive_handler);
+ socket1.async_receive_from(null_buffers(),
+ endpoint, in_flags, receive_handler);
   }
   catch (std::exception&)
   {


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk