Boost logo

Boost-Commit :

From: chris_at_[hidden]
Date: 2008-05-01 18:00:27


Author: chris_kohlhoff
Date: 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
New Revision: 44997
URL: http://svn.boost.org/trac/boost/changeset/44997

Log:
Add a fast path for some speculative read and write operations in the
epoll_reactor.

Text files modified:
   trunk/boost/asio/detail/dev_poll_reactor.hpp | 49 +++++++++-----------
   trunk/boost/asio/detail/epoll_reactor.hpp | 96 ++++++++++++++++++++++++++++-----------
   trunk/boost/asio/detail/kqueue_reactor.hpp | 56 +++++++---------------
   trunk/boost/asio/detail/reactive_descriptor_service.hpp | 22 +++++---
   trunk/boost/asio/detail/reactive_socket_service.hpp | 71 ++++++++++++-----------------
   trunk/boost/asio/detail/select_reactor.hpp | 82 +++++++++++++++++++++++++++------
   trunk/boost/asio/detail/win_iocp_socket_service.hpp | 44 +++++++----------
   7 files changed, 236 insertions(+), 184 deletions(-)

Modified: trunk/boost/asio/detail/dev_poll_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/dev_poll_reactor.hpp (original)
+++ trunk/boost/asio/detail/dev_poll_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -54,6 +54,11 @@
   : public boost::asio::detail::service_base<dev_poll_reactor<Own_Thread> >
 {
 public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ };
+
   // Constructor.
   dev_poll_reactor(boost::asio::io_service& io_service)
     : boost::asio::detail::service_base<
@@ -116,11 +121,11 @@
     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
       timer_queues_[i]->destroy_timers();
     timer_queues_.clear();
- }
+ }
 
   // Register a socket with the reactor. Returns 0 on success, system error
   // code on failure.
- int register_descriptor(socket_type descriptor)
+ int register_descriptor(socket_type, per_descriptor_data&)
   {
     return 0;
   }
@@ -128,8 +133,8 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool allow_speculative_read = true)
+ void start_read_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_read = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -156,8 +161,8 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool allow_speculative_write = true)
+ void start_write_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_write = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -184,7 +189,8 @@
   // Start a new exception operation. The handler object will be invoked when
   // the given descriptor has exception information, or an error has occurred.
   template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -203,26 +209,25 @@
     }
   }
 
- // Start new write and exception operations. The handler object will be
- // invoked when the given descriptor is ready for writing or has exception
+ // Start a new write operation. The handler object will be invoked when the
   // information available, or an error has occurred.
   template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- bool need_mod = write_op_queue_.enqueue_operation(descriptor, handler);
- need_mod = except_op_queue_.enqueue_operation(descriptor, handler)
- && need_mod;
- if (need_mod)
+ if (write_op_queue_.enqueue_operation(descriptor, handler))
     {
       ::pollfd& ev = add_pending_event_change(descriptor);
- ev.events = POLLOUT | POLLPRI | POLLERR | POLLHUP;
+ ev.events = POLLOUT | POLLERR | POLLHUP;
       if (read_op_queue_.has_operation(descriptor))
         ev.events |= POLLIN;
+ if (except_op_queue_.has_operation(descriptor))
+ ev.events |= POLLPRI;
       interrupter_.interrupt();
     }
   }
@@ -230,25 +235,15 @@
   // Cancel all operations associated with the given descriptor. The
   // handlers associated with the descriptor will be invoked with the
   // operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     cancel_ops_unlocked(descriptor);
   }
 
- // Enqueue cancellation of all operations associated with the given
- // descriptor. The handlers associated with the descriptor will be invoked
- // with the operation_aborted error. This function does not acquire the
- // dev_poll_reactor's mutex, and so should only be used from within a reactor
- // handler.
- void enqueue_cancel_ops_unlocked(socket_type descriptor)
- {
- pending_cancellations_.push_back(descriptor);
- }
-
   // Cancel any operations that are running against the descriptor and remove
   // its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 

Modified: trunk/boost/asio/detail/epoll_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/epoll_reactor.hpp (original)
+++ trunk/boost/asio/detail/epoll_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -54,6 +54,13 @@
   : public boost::asio::detail::service_base<epoll_reactor<Own_Thread> >
 {
 public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ bool allow_speculative_read;
+ bool allow_speculative_write;
+ };
+
   // Constructor.
   epoll_reactor(boost::asio::io_service& io_service)
     : boost::asio::detail::service_base<epoll_reactor<Own_Thread> >(io_service),
@@ -119,10 +126,14 @@
 
   // Register a socket with the reactor. Returns 0 on success, system error
   // code on failure.
- int register_descriptor(socket_type descriptor)
+ int register_descriptor(socket_type descriptor,
+ per_descriptor_data& descriptor_data)
   {
     // No need to lock according to epoll documentation.
 
+ descriptor_data.allow_speculative_read = true;
+ descriptor_data.allow_speculative_write = true;
+
     epoll_event ev = { 0, { 0 } };
     ev.events = 0;
     ev.data.fd = descriptor;
@@ -135,9 +146,19 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool allow_speculative_read = true)
+ void start_read_op(socket_type descriptor,
+ per_descriptor_data& descriptor_data,
+ Handler handler, bool allow_speculative_read = true)
   {
+ if (allow_speculative_read && descriptor_data.allow_speculative_read)
+ {
+ if (handler(boost::system::error_code()))
+ return;
+
+ // We only get one shot at a speculative read in this function.
+ allow_speculative_read = false;
+ }
+
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
@@ -146,8 +167,16 @@
     if (!allow_speculative_read)
       need_epoll_wait_ = true;
     else if (!read_op_queue_.has_operation(descriptor))
+ {
+ // Speculative reads are ok as there are no queued read operations.
+ descriptor_data.allow_speculative_read = true;
+
       if (handler(boost::system::error_code()))
         return;
+ }
+
+ // Speculative reads are not ok as there will be queued read operations.
+ descriptor_data.allow_speculative_read = false;
 
     if (read_op_queue_.enqueue_operation(descriptor, handler))
     {
@@ -174,9 +203,19 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool allow_speculative_write = true)
+ void start_write_op(socket_type descriptor,
+ per_descriptor_data& descriptor_data,
+ Handler handler, bool allow_speculative_write = true)
   {
+ if (allow_speculative_write && descriptor_data.allow_speculative_write)
+ {
+ if (handler(boost::system::error_code()))
+ return;
+
+ // We only get one shot at a speculative write in this function.
+ allow_speculative_write = false;
+ }
+
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
@@ -185,8 +224,16 @@
     if (!allow_speculative_write)
       need_epoll_wait_ = true;
     else if (!write_op_queue_.has_operation(descriptor))
+ {
+ // Speculative writes are ok as there are no queued write operations.
+ descriptor_data.allow_speculative_write = true;
+
       if (handler(boost::system::error_code()))
         return;
+ }
+
+ // Speculative writes are not ok as there will be queued write operations.
+ descriptor_data.allow_speculative_write = false;
 
     if (write_op_queue_.enqueue_operation(descriptor, handler))
     {
@@ -213,7 +260,8 @@
   // Start a new exception operation. The handler object will be invoked when
   // the given descriptor has exception information, or an error has occurred.
   template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -242,26 +290,29 @@
     }
   }
 
- // Start new write and exception operations. The handler object will be
- // invoked when the given descriptor is ready for writing or has exception
- // information available, or an error has occurred.
+ // Start a new write operation. The handler object will be invoked when the
+ // given descriptor is ready for writing or an error has occurred. Speculative
+ // writes are not allowed.
   template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data& descriptor_data, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
     if (shutdown_)
       return;
 
- bool need_mod = write_op_queue_.enqueue_operation(descriptor, handler);
- need_mod = except_op_queue_.enqueue_operation(descriptor, handler)
- && need_mod;
- if (need_mod)
+ // Speculative writes are not ok as there will be queued write operations.
+ descriptor_data.allow_speculative_write = false;
+
+ if (write_op_queue_.enqueue_operation(descriptor, handler))
     {
       epoll_event ev = { 0, { 0 } };
- ev.events = EPOLLOUT | EPOLLPRI | EPOLLERR | EPOLLHUP;
+ ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
       if (read_op_queue_.has_operation(descriptor))
         ev.events |= EPOLLIN;
+ if (except_op_queue_.has_operation(descriptor))
+ ev.events |= EPOLLPRI;
       ev.data.fd = descriptor;
 
       int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
@@ -272,7 +323,6 @@
         boost::system::error_code ec(errno,
             boost::asio::error::get_system_category());
         write_op_queue_.dispatch_all_operations(descriptor, ec);
- except_op_queue_.dispatch_all_operations(descriptor, ec);
       }
     }
   }
@@ -280,25 +330,15 @@
   // Cancel all operations associated with the given descriptor. The
   // handlers associated with the descriptor will be invoked with the
   // operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     cancel_ops_unlocked(descriptor);
   }
 
- // Enqueue cancellation of all operations associated with the given
- // descriptor. The handlers associated with the descriptor will be invoked
- // with the operation_aborted error. This function does not acquire the
- // epoll_reactor's mutex, and so should only be used from within a reactor
- // handler.
- void enqueue_cancel_ops_unlocked(socket_type descriptor)
- {
- pending_cancellations_.push_back(descriptor);
- }
-
   // Cancel any operations that are running against the descriptor and remove
   // its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 

Modified: trunk/boost/asio/detail/kqueue_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/kqueue_reactor.hpp (original)
+++ trunk/boost/asio/detail/kqueue_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -61,6 +61,11 @@
   : public boost::asio::detail::service_base<kqueue_reactor<Own_Thread> >
 {
 public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ };
+
   // Constructor.
   kqueue_reactor(boost::asio::io_service& io_service)
     : boost::asio::detail::service_base<
@@ -127,7 +132,7 @@
 
   // Register a socket with the reactor. Returns 0 on success, system error
   // code on failure.
- int register_descriptor(socket_type)
+ int register_descriptor(socket_type, per_descriptor_data&)
   {
     return 0;
   }
@@ -135,8 +140,8 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool allow_speculative_read = true)
+ void start_read_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_read = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -165,8 +170,8 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool allow_speculative_write = true)
+ void start_write_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_write = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -195,7 +200,8 @@
   // Start a new exception operation. The handler object will be invoked when
   // the given descriptor has exception information, or an error has occurred.
   template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -218,11 +224,11 @@
     }
   }
 
- // Start new write and exception operations. The handler object will be
- // invoked when the given descriptor is ready for writing or has exception
- // information available, or an error has occurred.
+ // Start a new write operation. The handler object will be invoked when the
+ // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 
@@ -240,46 +246,20 @@
         write_op_queue_.dispatch_all_operations(descriptor, ec);
       }
     }
-
- if (except_op_queue_.enqueue_operation(descriptor, handler))
- {
- struct kevent event;
- if (read_op_queue_.has_operation(descriptor))
- EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
- else
- EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
- if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
- {
- boost::system::error_code ec(errno,
- boost::asio::error::get_system_category());
- except_op_queue_.dispatch_all_operations(descriptor, ec);
- write_op_queue_.dispatch_all_operations(descriptor, ec);
- }
- }
   }
 
   // Cancel all operations associated with the given descriptor. The
   // handlers associated with the descriptor will be invoked with the
   // operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     cancel_ops_unlocked(descriptor);
   }
 
- // Enqueue cancellation of all operations associated with the given
- // descriptor. The handlers associated with the descriptor will be invoked
- // with the operation_aborted error. This function does not acquire the
- // kqueue_reactor's mutex, and so should only be used from within a reactor
- // handler.
- void enqueue_cancel_ops_unlocked(socket_type descriptor)
- {
- pending_cancellations_.push_back(descriptor);
- }
-
   // Cancel any operations that are running against the descriptor and remove
   // its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
 

Modified: trunk/boost/asio/detail/reactive_descriptor_service.hpp
==============================================================================
--- trunk/boost/asio/detail/reactive_descriptor_service.hpp (original)
+++ trunk/boost/asio/detail/reactive_descriptor_service.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -67,6 +67,9 @@
 
     // Flags indicating the current state of the descriptor.
     unsigned char flags_;
+
+ // Per-descriptor data used by the reactor.
+ typename Reactor::per_descriptor_data reactor_data_;
   };
 
   // The maximum number of buffers to support in a single operation.
@@ -97,7 +100,7 @@
   {
     if (impl.descriptor_ != -1)
     {
- reactor_.close_descriptor(impl.descriptor_);
+ reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
 
       if (impl.flags_ & implementation_type::internal_non_blocking)
       {
@@ -125,7 +128,8 @@
       return ec;
     }
 
- if (int err = reactor_.register_descriptor(native_descriptor))
+ if (int err = reactor_.register_descriptor(
+ native_descriptor, impl.reactor_data_))
     {
       ec = boost::system::error_code(err,
           boost::asio::error::get_system_category());
@@ -150,7 +154,7 @@
   {
     if (is_open(impl))
     {
- reactor_.close_descriptor(impl.descriptor_);
+ reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
 
       if (impl.flags_ & implementation_type::internal_non_blocking)
       {
@@ -187,7 +191,7 @@
       return ec;
     }
 
- reactor_.cancel_ops(impl.descriptor_);
+ reactor_.cancel_ops(impl.descriptor_, impl.reactor_data_);
     ec = boost::system::error_code();
     return ec;
   }
@@ -405,7 +409,7 @@
         impl.flags_ |= implementation_type::internal_non_blocking;
       }
 
- reactor_.start_write_op(impl.descriptor_,
+ reactor_.start_write_op(impl.descriptor_, impl.reactor_data_,
           write_handler<ConstBufferSequence, Handler>(
             impl.descriptor_, this->get_io_service(), buffers, handler));
     }
@@ -444,7 +448,7 @@
     }
     else
     {
- reactor_.start_write_op(impl.descriptor_,
+ reactor_.start_write_op(impl.descriptor_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }
@@ -526,7 +530,7 @@
 
   // Wait until data can be read without blocking.
   size_t read_some(implementation_type& impl,
- const null_buffers& buffers, boost::system::error_code& ec)
+ const null_buffers&, boost::system::error_code& ec)
   {
     if (!is_open(impl))
     {
@@ -644,7 +648,7 @@
         impl.flags_ |= implementation_type::internal_non_blocking;
       }
 
- reactor_.start_read_op(impl.descriptor_,
+ reactor_.start_read_op(impl.descriptor_, impl.reactor_data_,
           read_handler<MutableBufferSequence, Handler>(
             impl.descriptor_, this->get_io_service(), buffers, handler));
     }
@@ -662,7 +666,7 @@
     }
     else
     {
- reactor_.start_read_op(impl.descriptor_,
+ reactor_.start_read_op(impl.descriptor_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }

Modified: trunk/boost/asio/detail/reactive_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/reactive_socket_service.hpp (original)
+++ trunk/boost/asio/detail/reactive_socket_service.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -84,6 +84,9 @@
 
     // The protocol associated with the socket.
     protocol_type protocol_;
+
+ // Per-descriptor data used by the reactor.
+ typename Reactor::per_descriptor_data reactor_data_;
   };
 
   // The maximum number of buffers to support in a single operation.
@@ -114,7 +117,7 @@
   {
     if (impl.socket_ != invalid_socket)
     {
- reactor_.close_descriptor(impl.socket_);
+ reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
 
       if (impl.flags_ & implementation_type::internal_non_blocking)
       {
@@ -156,7 +159,7 @@
     if (sock.get() == invalid_socket)
       return ec;
 
- if (int err = reactor_.register_descriptor(sock.get()))
+ if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
     {
       ec = boost::system::error_code(err,
           boost::asio::error::get_system_category());
@@ -181,7 +184,8 @@
       return ec;
     }
 
- if (int err = reactor_.register_descriptor(native_socket))
+ if (int err = reactor_.register_descriptor(
+ native_socket, impl.reactor_data_))
     {
       ec = boost::system::error_code(err,
           boost::asio::error::get_system_category());
@@ -207,7 +211,7 @@
   {
     if (is_open(impl))
     {
- reactor_.close_descriptor(impl.socket_);
+ reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
 
       if (impl.flags_ & implementation_type::internal_non_blocking)
       {
@@ -243,7 +247,7 @@
       return ec;
     }
 
- reactor_.cancel_ops(impl.socket_);
+ reactor_.cancel_ops(impl.socket_, impl.reactor_data_);
     ec = boost::system::error_code();
     return ec;
   }
@@ -683,7 +687,7 @@
         impl.flags_ |= implementation_type::internal_non_blocking;
       }
 
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
           send_handler<ConstBufferSequence, Handler>(
             impl.socket_, this->get_io_service(), buffers, flags, handler));
     }
@@ -722,7 +726,7 @@
     }
     else
     {
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }
@@ -897,7 +901,7 @@
         impl.flags_ |= implementation_type::internal_non_blocking;
       }
 
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
           send_to_handler<ConstBufferSequence, Handler>(
             impl.socket_, this->get_io_service(), buffers,
             destination, flags, handler));
@@ -916,7 +920,7 @@
     }
     else
     {
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }
@@ -998,8 +1002,7 @@
   }
 
   // Wait until data can be received without blocking.
- size_t receive(implementation_type& impl,
- const null_buffers& buffers,
+ size_t receive(implementation_type& impl, const null_buffers&,
       socket_base::message_flags, boost::system::error_code& ec)
   {
     if (!is_open(impl))
@@ -1127,13 +1130,13 @@
 
       if (flags & socket_base::message_out_of_band)
       {
- reactor_.start_except_op(impl.socket_,
+ reactor_.start_except_op(impl.socket_, impl.reactor_data_,
             receive_handler<MutableBufferSequence, Handler>(
               impl.socket_, this->get_io_service(), buffers, flags, handler));
       }
       else
       {
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
             receive_handler<MutableBufferSequence, Handler>(
               impl.socket_, this->get_io_service(), buffers, flags, handler));
       }
@@ -1152,12 +1155,12 @@
     }
     else if (flags & socket_base::message_out_of_band)
     {
- reactor_.start_except_op(impl.socket_,
+ reactor_.start_except_op(impl.socket_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler));
     }
     else
     {
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }
@@ -1237,9 +1240,9 @@
   }
 
   // Wait until data can be received without blocking.
- size_t receive_from(implementation_type& impl,
- const null_buffers& buffers, endpoint_type& sender_endpoint,
- socket_base::message_flags, boost::system::error_code& ec)
+ size_t receive_from(implementation_type& impl, const null_buffers&,
+ endpoint_type& sender_endpoint, socket_base::message_flags,
+ boost::system::error_code& ec)
   {
     if (!is_open(impl))
     {
@@ -1352,7 +1355,7 @@
         impl.flags_ |= implementation_type::internal_non_blocking;
       }
 
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
           receive_from_handler<MutableBufferSequence, Handler>(
             impl.socket_, this->get_io_service(), buffers,
             sender_endpoint, flags, handler));
@@ -1377,12 +1380,12 @@
 
       if (flags & socket_base::message_out_of_band)
       {
- reactor_.start_except_op(impl.socket_,
+ reactor_.start_except_op(impl.socket_, impl.reactor_data_,
             null_buffers_handler<Handler>(this->get_io_service(), handler));
       }
       else
       {
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
             null_buffers_handler<Handler>(this->get_io_service(), handler),
             false);
       }
@@ -1590,7 +1593,7 @@
         impl.flags_ |= implementation_type::internal_non_blocking;
       }
 
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
           accept_handler<Socket, Handler>(
             impl.socket_, this->get_io_service(),
             peer, impl.protocol_, peer_endpoint,
@@ -1628,28 +1631,17 @@
   class connect_handler
   {
   public:
- connect_handler(socket_type socket, boost::shared_ptr<bool> completed,
- boost::asio::io_service& io_service, Reactor& reactor, Handler handler)
+ connect_handler(socket_type socket,
+ boost::asio::io_service& io_service, Handler handler)
       : socket_(socket),
- completed_(completed),
         io_service_(io_service),
         work_(io_service),
- reactor_(reactor),
         handler_(handler)
     {
     }
 
     bool operator()(const boost::system::error_code& result)
     {
- // Check whether a handler has already been called for the connection.
- // If it has, then we don't want to do anything in this handler.
- if (*completed_)
- return true;
-
- // Cancel the other reactor operation for the connection.
- *completed_ = true;
- reactor_.enqueue_cancel_ops_unlocked(socket_);
-
       // Check whether the operation was successful.
       if (result)
       {
@@ -1684,10 +1676,8 @@
 
   private:
     socket_type socket_;
- boost::shared_ptr<bool> completed_;
     boost::asio::io_service& io_service_;
     boost::asio::io_service::work work_;
- Reactor& reactor_;
     Handler handler_;
   };
 
@@ -1732,10 +1722,9 @@
     {
       // The connection is happening in the background, and we need to wait
       // until the socket becomes writeable.
- boost::shared_ptr<bool> completed(new bool(false));
- reactor_.start_write_and_except_ops(impl.socket_,
- connect_handler<Handler>(impl.socket_, completed,
- this->get_io_service(), reactor_, handler));
+ reactor_.start_connect_op(impl.socket_, impl.reactor_data_,
+ connect_handler<Handler>(impl.socket_,
+ this->get_io_service(), handler));
     }
     else
     {

Modified: trunk/boost/asio/detail/select_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/select_reactor.hpp (original)
+++ trunk/boost/asio/detail/select_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -23,6 +23,7 @@
 #include <cstddef>
 #include <boost/config.hpp>
 #include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/shared_ptr.hpp>
 #include <vector>
 #include <boost/asio/detail/pop_options.hpp>
 
@@ -51,6 +52,11 @@
   : public boost::asio::detail::service_base<select_reactor<Own_Thread> >
 {
 public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ };
+
   // Constructor.
   select_reactor(boost::asio::io_service& io_service)
     : boost::asio::detail::service_base<
@@ -107,7 +113,7 @@
 
   // Register a socket with the reactor. Returns 0 on success, system error
   // code on failure.
- int register_descriptor(socket_type descriptor)
+ int register_descriptor(socket_type, per_descriptor_data&)
   {
     return 0;
   }
@@ -115,8 +121,8 @@
   // Start a new read operation. The handler object will be invoked when the
   // given descriptor is ready to be read, or an error has occurred.
   template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool /*allow_speculative_read*/ = true)
+ void start_read_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool /*allow_speculative_read*/ = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     if (!shutdown_)
@@ -127,8 +133,8 @@
   // Start a new write operation. The handler object will be invoked when the
   // given descriptor is ready to be written, or an error has occurred.
   template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool /*allow_speculative_write*/ = true)
+ void start_write_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool /*allow_speculative_write*/ = true)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     if (!shutdown_)
@@ -139,7 +145,8 @@
   // Start a new exception operation. The handler object will be invoked when
   // the given descriptor has exception information, or an error has occurred.
   template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     if (!shutdown_)
@@ -147,18 +154,63 @@
         interrupter_.interrupt();
   }
 
+ // Wrapper for connect handlers to enable the handler object to be placed
+ // in both the write and the except operation queues, but ensure that only
+ // one of the handlers is called.
+ template <typename Handler>
+ class connect_handler_wrapper
+ {
+ public:
+ connect_handler_wrapper(socket_type descriptor,
+ boost::shared_ptr<bool> completed,
+ select_reactor<Own_Thread>& reactor, Handler handler)
+ : descriptor_(descriptor),
+ completed_(completed),
+ reactor_(reactor),
+ handler_(handler)
+ {
+ }
+
+ bool operator()(const boost::system::error_code& result)
+ {
+ // Check whether one of the handlers has already been called. If it has,
+ // then we don't want to do anything in this handler.
+ if (*completed_)
+ return true;
+
+ // Cancel the other reactor operation for the connection.
+ *completed_ = true;
+ reactor_.enqueue_cancel_ops_unlocked(descriptor_);
+
+ // Call the contained handler.
+ return handler_(result);
+ }
+
+ private:
+ socket_type descriptor_;
+ boost::shared_ptr<bool> completed_;
+ select_reactor<Own_Thread>& reactor_;
+ Handler handler_;
+ };
+
   // Start new write and exception operations. The handler object will be
   // invoked when the given descriptor is ready for writing or has exception
- // information available, or an error has occurred.
+ // information available, or an error has occurred. The handler will be called
+ // only once.
   template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     if (!shutdown_)
     {
- bool interrupt = write_op_queue_.enqueue_operation(descriptor, handler);
- interrupt = except_op_queue_.enqueue_operation(descriptor, handler)
- || interrupt;
+ boost::shared_ptr<bool> completed(new bool(false));
+ connect_handler_wrapper<Handler> wrapped_handler(
+ descriptor, completed, *this, handler);
+ bool interrupt = write_op_queue_.enqueue_operation(
+ descriptor, wrapped_handler);
+ interrupt = except_op_queue_.enqueue_operation(
+ descriptor, wrapped_handler) || interrupt;
       if (interrupt)
         interrupter_.interrupt();
     }
@@ -167,7 +219,7 @@
   // Cancel all operations associated with the given descriptor. The
   // handlers associated with the descriptor will be invoked with the
   // operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     cancel_ops_unlocked(descriptor);
@@ -176,8 +228,8 @@
   // Enqueue cancellation of all operations associated with the given
   // descriptor. The handlers associated with the descriptor will be invoked
   // with the operation_aborted error. This function does not acquire the
- // select_reactor's mutex, and so should only be used from within a reactor
- // handler.
+ // select_reactor's mutex, and so should only be used when the reactor lock is
+ // already held.
   void enqueue_cancel_ops_unlocked(socket_type descriptor)
   {
     pending_cancellations_.push_back(descriptor);
@@ -185,7 +237,7 @@
 
   // Cancel any operations that are running against the descriptor and remove
   // its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
   {
     boost::asio::detail::mutex::scoped_lock lock(mutex_);
     cancel_ops_unlocked(descriptor);

Modified: trunk/boost/asio/detail/win_iocp_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_socket_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_socket_service.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -114,6 +114,9 @@
     endpoint_type remote_endpoint_;
   };
 
+ // The type of the reactor used for connect operations.
+ typedef detail::select_reactor<true> reactor_type;
+
   // The implementation type of the socket.
   class implementation_type
   {
@@ -157,6 +160,9 @@
     // The protocol associated with the socket.
     protocol_type protocol_;
 
+ // Per-descriptor data used by the reactor.
+ reactor_type::per_descriptor_data reactor_data_;
+
 #if defined(BOOST_ASIO_ENABLE_CANCELIO)
     // The ID of the thread from which it is safe to cancel asynchronous
     // operations. 0 means no asynchronous operations have been started yet.
@@ -305,7 +311,7 @@
             interlocked_compare_exchange_pointer(
               reinterpret_cast<void**>(&reactor_), 0, 0));
       if (reactor)
- reactor->close_descriptor(impl.socket_);
+ reactor->close_descriptor(impl.socket_, impl.reactor_data_);
 
       if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
         return ec;
@@ -335,6 +341,7 @@
     if (!is_open(impl))
     {
       ec = boost::asio::error::bad_descriptor;
+ return ec;
     }
     else if (FARPROC cancel_io_ex_ptr = ::GetProcAddress(
           ::GetModuleHandleA("KERNEL32"), "CancelIoEx"))
@@ -922,7 +929,7 @@
             reinterpret_cast<void**>(&reactor_), reactor);
       }
 
- reactor->start_write_op(impl.socket_,
+ reactor->start_write_op(impl.socket_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }
@@ -1149,7 +1156,7 @@
             reinterpret_cast<void**>(&reactor_), reactor);
       }
 
- reactor->start_write_op(impl.socket_,
+ reactor->start_write_op(impl.socket_, impl.reactor_data_,
           null_buffers_handler<Handler>(this->get_io_service(), handler),
           false);
     }
@@ -1462,12 +1469,12 @@
 
       if (flags & socket_base::message_out_of_band)
       {
- reactor->start_except_op(impl.socket_,
+ reactor->start_except_op(impl.socket_, impl.reactor_data_,
             null_buffers_handler<Handler>(this->get_io_service(), handler));
       }
       else
       {
- reactor->start_read_op(impl.socket_,
+ reactor->start_read_op(impl.socket_, impl.reactor_data_,
             null_buffers_handler<Handler>(this->get_io_service(), handler),
             false);
       }
@@ -1735,12 +1742,12 @@
 
       if (flags & socket_base::message_out_of_band)
       {
- reactor->start_except_op(impl.socket_,
+ reactor->start_except_op(impl.socket_, impl.reactor_data_,
             null_buffers_handler<Handler>(this->get_io_service(), handler));
       }
       else
       {
- reactor->start_read_op(impl.socket_,
+ reactor->start_read_op(impl.socket_, impl.reactor_data_,
             null_buffers_handler<Handler>(this->get_io_service(), handler),
             false);
       }
@@ -2109,14 +2116,10 @@
   {
   public:
     connect_handler(socket_type socket, bool user_set_non_blocking,
- boost::shared_ptr<bool> completed,
- boost::asio::io_service& io_service,
- reactor_type& reactor, Handler handler)
+ boost::asio::io_service& io_service, Handler handler)
       : socket_(socket),
         user_set_non_blocking_(user_set_non_blocking),
- completed_(completed),
         io_service_(io_service),
- reactor_(reactor),
         work_(io_service),
         handler_(handler)
     {
@@ -2124,15 +2127,6 @@
 
     bool operator()(const boost::system::error_code& result)
     {
- // Check whether a handler has already been called for the connection.
- // If it has, then we don't want to do anything in this handler.
- if (*completed_)
- return true;
-
- // Cancel the other reactor operation for the connection.
- *completed_ = true;
- reactor_.enqueue_cancel_ops_unlocked(socket_);
-
       // Check whether the operation was successful.
       if (result)
       {
@@ -2180,9 +2174,7 @@
   private:
     socket_type socket_;
     bool user_set_non_blocking_;
- boost::shared_ptr<bool> completed_;
     boost::asio::io_service& io_service_;
- reactor_type& reactor_;
     boost::asio::io_service::work work_;
     Handler handler_;
   };
@@ -2250,11 +2242,11 @@
       // The connection is happening in the background, and we need to wait
       // until the socket becomes writeable.
       boost::shared_ptr<bool> completed(new bool(false));
- reactor->start_write_and_except_ops(impl.socket_,
+ reactor->start_connect_op(impl.socket_, impl.reactor_data_,
           connect_handler<Handler>(
             impl.socket_,
             (impl.flags_ & implementation_type::user_set_non_blocking) != 0,
- completed, this->get_io_service(), *reactor, handler));
+ this->get_io_service(), handler));
     }
     else
     {
@@ -2285,7 +2277,7 @@
             interlocked_compare_exchange_pointer(
               reinterpret_cast<void**>(&reactor_), 0, 0));
       if (reactor)
- reactor->close_descriptor(impl.socket_);
+ reactor->close_descriptor(impl.socket_, impl.reactor_data_);
 
       // The socket destructor must not block. If the user has changed the
       // linger option to block in the foreground, we will change it back to the


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk