|
Boost-Commit : |
From: chris_at_[hidden]
Date: 2008-05-01 18:00:27
Author: chris_kohlhoff
Date: 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
New Revision: 44997
URL: http://svn.boost.org/trac/boost/changeset/44997
Log:
Add a fast path for some speculative read and write operations in the
epoll_reactor.
Text files modified:
trunk/boost/asio/detail/dev_poll_reactor.hpp | 49 +++++++++-----------
trunk/boost/asio/detail/epoll_reactor.hpp | 96 ++++++++++++++++++++++++++++-----------
trunk/boost/asio/detail/kqueue_reactor.hpp | 56 +++++++---------------
trunk/boost/asio/detail/reactive_descriptor_service.hpp | 22 +++++---
trunk/boost/asio/detail/reactive_socket_service.hpp | 71 ++++++++++++-----------------
trunk/boost/asio/detail/select_reactor.hpp | 82 +++++++++++++++++++++++++++------
trunk/boost/asio/detail/win_iocp_socket_service.hpp | 44 +++++++----------
7 files changed, 236 insertions(+), 184 deletions(-)
Modified: trunk/boost/asio/detail/dev_poll_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/dev_poll_reactor.hpp (original)
+++ trunk/boost/asio/detail/dev_poll_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -54,6 +54,11 @@
: public boost::asio::detail::service_base<dev_poll_reactor<Own_Thread> >
{
public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ };
+
// Constructor.
dev_poll_reactor(boost::asio::io_service& io_service)
: boost::asio::detail::service_base<
@@ -116,11 +121,11 @@
for (std::size_t i = 0; i < timer_queues_.size(); ++i)
timer_queues_[i]->destroy_timers();
timer_queues_.clear();
- }
+ }
// Register a socket with the reactor. Returns 0 on success, system error
// code on failure.
- int register_descriptor(socket_type descriptor)
+ int register_descriptor(socket_type, per_descriptor_data&)
{
return 0;
}
@@ -128,8 +133,8 @@
// Start a new read operation. The handler object will be invoked when the
// given descriptor is ready to be read, or an error has occurred.
template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool allow_speculative_read = true)
+ void start_read_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_read = true)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -156,8 +161,8 @@
// Start a new write operation. The handler object will be invoked when the
// given descriptor is ready to be written, or an error has occurred.
template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool allow_speculative_write = true)
+ void start_write_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_write = true)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -184,7 +189,8 @@
// Start a new exception operation. The handler object will be invoked when
// the given descriptor has exception information, or an error has occurred.
template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -203,26 +209,25 @@
}
}
- // Start new write and exception operations. The handler object will be
- // invoked when the given descriptor is ready for writing or has exception
+ // Start a new write operation. The handler object will be invoked when the
// information available, or an error has occurred.
template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (shutdown_)
return;
- bool need_mod = write_op_queue_.enqueue_operation(descriptor, handler);
- need_mod = except_op_queue_.enqueue_operation(descriptor, handler)
- && need_mod;
- if (need_mod)
+ if (write_op_queue_.enqueue_operation(descriptor, handler))
{
::pollfd& ev = add_pending_event_change(descriptor);
- ev.events = POLLOUT | POLLPRI | POLLERR | POLLHUP;
+ ev.events = POLLOUT | POLLERR | POLLHUP;
if (read_op_queue_.has_operation(descriptor))
ev.events |= POLLIN;
+ if (except_op_queue_.has_operation(descriptor))
+ ev.events |= POLLPRI;
interrupter_.interrupt();
}
}
@@ -230,25 +235,15 @@
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the
// operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
cancel_ops_unlocked(descriptor);
}
- // Enqueue cancellation of all operations associated with the given
- // descriptor. The handlers associated with the descriptor will be invoked
- // with the operation_aborted error. This function does not acquire the
- // dev_poll_reactor's mutex, and so should only be used from within a reactor
- // handler.
- void enqueue_cancel_ops_unlocked(socket_type descriptor)
- {
- pending_cancellations_.push_back(descriptor);
- }
-
// Cancel any operations that are running against the descriptor and remove
// its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
Modified: trunk/boost/asio/detail/epoll_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/epoll_reactor.hpp (original)
+++ trunk/boost/asio/detail/epoll_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -54,6 +54,13 @@
: public boost::asio::detail::service_base<epoll_reactor<Own_Thread> >
{
public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ bool allow_speculative_read;
+ bool allow_speculative_write;
+ };
+
// Constructor.
epoll_reactor(boost::asio::io_service& io_service)
: boost::asio::detail::service_base<epoll_reactor<Own_Thread> >(io_service),
@@ -119,10 +126,14 @@
// Register a socket with the reactor. Returns 0 on success, system error
// code on failure.
- int register_descriptor(socket_type descriptor)
+ int register_descriptor(socket_type descriptor,
+ per_descriptor_data& descriptor_data)
{
// No need to lock according to epoll documentation.
+ descriptor_data.allow_speculative_read = true;
+ descriptor_data.allow_speculative_write = true;
+
epoll_event ev = { 0, { 0 } };
ev.events = 0;
ev.data.fd = descriptor;
@@ -135,9 +146,19 @@
// Start a new read operation. The handler object will be invoked when the
// given descriptor is ready to be read, or an error has occurred.
template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool allow_speculative_read = true)
+ void start_read_op(socket_type descriptor,
+ per_descriptor_data& descriptor_data,
+ Handler handler, bool allow_speculative_read = true)
{
+ if (allow_speculative_read && descriptor_data.allow_speculative_read)
+ {
+ if (handler(boost::system::error_code()))
+ return;
+
+ // We only get one shot at a speculative read in this function.
+ allow_speculative_read = false;
+ }
+
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (shutdown_)
@@ -146,8 +167,16 @@
if (!allow_speculative_read)
need_epoll_wait_ = true;
else if (!read_op_queue_.has_operation(descriptor))
+ {
+ // Speculative reads are ok as there are no queued read operations.
+ descriptor_data.allow_speculative_read = true;
+
if (handler(boost::system::error_code()))
return;
+ }
+
+ // Speculative reads are not ok as there will be queued read operations.
+ descriptor_data.allow_speculative_read = false;
if (read_op_queue_.enqueue_operation(descriptor, handler))
{
@@ -174,9 +203,19 @@
// Start a new write operation. The handler object will be invoked when the
// given descriptor is ready to be written, or an error has occurred.
template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool allow_speculative_write = true)
+ void start_write_op(socket_type descriptor,
+ per_descriptor_data& descriptor_data,
+ Handler handler, bool allow_speculative_write = true)
{
+ if (allow_speculative_write && descriptor_data.allow_speculative_write)
+ {
+ if (handler(boost::system::error_code()))
+ return;
+
+ // We only get one shot at a speculative write in this function.
+ allow_speculative_write = false;
+ }
+
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (shutdown_)
@@ -185,8 +224,16 @@
if (!allow_speculative_write)
need_epoll_wait_ = true;
else if (!write_op_queue_.has_operation(descriptor))
+ {
+ // Speculative writes are ok as there are no queued write operations.
+ descriptor_data.allow_speculative_write = true;
+
if (handler(boost::system::error_code()))
return;
+ }
+
+ // Speculative writes are not ok as there will be queued write operations.
+ descriptor_data.allow_speculative_write = false;
if (write_op_queue_.enqueue_operation(descriptor, handler))
{
@@ -213,7 +260,8 @@
// Start a new exception operation. The handler object will be invoked when
// the given descriptor has exception information, or an error has occurred.
template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -242,26 +290,29 @@
}
}
- // Start new write and exception operations. The handler object will be
- // invoked when the given descriptor is ready for writing or has exception
- // information available, or an error has occurred.
+ // Start a new write operation. The handler object will be invoked when the
+ // given descriptor is ready for writing or an error has occurred. Speculative
+ // writes are not allowed.
template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data& descriptor_data, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (shutdown_)
return;
- bool need_mod = write_op_queue_.enqueue_operation(descriptor, handler);
- need_mod = except_op_queue_.enqueue_operation(descriptor, handler)
- && need_mod;
- if (need_mod)
+ // Speculative writes are not ok as there will be queued write operations.
+ descriptor_data.allow_speculative_write = false;
+
+ if (write_op_queue_.enqueue_operation(descriptor, handler))
{
epoll_event ev = { 0, { 0 } };
- ev.events = EPOLLOUT | EPOLLPRI | EPOLLERR | EPOLLHUP;
+ ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;
if (read_op_queue_.has_operation(descriptor))
ev.events |= EPOLLIN;
+ if (except_op_queue_.has_operation(descriptor))
+ ev.events |= EPOLLPRI;
ev.data.fd = descriptor;
int result = epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev);
@@ -272,7 +323,6 @@
boost::system::error_code ec(errno,
boost::asio::error::get_system_category());
write_op_queue_.dispatch_all_operations(descriptor, ec);
- except_op_queue_.dispatch_all_operations(descriptor, ec);
}
}
}
@@ -280,25 +330,15 @@
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the
// operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
cancel_ops_unlocked(descriptor);
}
- // Enqueue cancellation of all operations associated with the given
- // descriptor. The handlers associated with the descriptor will be invoked
- // with the operation_aborted error. This function does not acquire the
- // epoll_reactor's mutex, and so should only be used from within a reactor
- // handler.
- void enqueue_cancel_ops_unlocked(socket_type descriptor)
- {
- pending_cancellations_.push_back(descriptor);
- }
-
// Cancel any operations that are running against the descriptor and remove
// its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
Modified: trunk/boost/asio/detail/kqueue_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/kqueue_reactor.hpp (original)
+++ trunk/boost/asio/detail/kqueue_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -61,6 +61,11 @@
: public boost::asio::detail::service_base<kqueue_reactor<Own_Thread> >
{
public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ };
+
// Constructor.
kqueue_reactor(boost::asio::io_service& io_service)
: boost::asio::detail::service_base<
@@ -127,7 +132,7 @@
// Register a socket with the reactor. Returns 0 on success, system error
// code on failure.
- int register_descriptor(socket_type)
+ int register_descriptor(socket_type, per_descriptor_data&)
{
return 0;
}
@@ -135,8 +140,8 @@
// Start a new read operation. The handler object will be invoked when the
// given descriptor is ready to be read, or an error has occurred.
template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool allow_speculative_read = true)
+ void start_read_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_read = true)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -165,8 +170,8 @@
// Start a new write operation. The handler object will be invoked when the
// given descriptor is ready to be written, or an error has occurred.
template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool allow_speculative_write = true)
+ void start_write_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool allow_speculative_write = true)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -195,7 +200,8 @@
// Start a new exception operation. The handler object will be invoked when
// the given descriptor has exception information, or an error has occurred.
template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -218,11 +224,11 @@
}
}
- // Start new write and exception operations. The handler object will be
- // invoked when the given descriptor is ready for writing or has exception
- // information available, or an error has occurred.
+ // Start a new write operation. The handler object will be invoked when the
+ // given descriptor is ready to be written, or an error has occurred.
template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
@@ -240,46 +246,20 @@
write_op_queue_.dispatch_all_operations(descriptor, ec);
}
}
-
- if (except_op_queue_.enqueue_operation(descriptor, handler))
- {
- struct kevent event;
- if (read_op_queue_.has_operation(descriptor))
- EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, 0, 0, 0);
- else
- EV_SET(&event, descriptor, EVFILT_READ, EV_ADD, EV_OOBAND, 0, 0);
- if (::kevent(kqueue_fd_, &event, 1, 0, 0, 0) == -1)
- {
- boost::system::error_code ec(errno,
- boost::asio::error::get_system_category());
- except_op_queue_.dispatch_all_operations(descriptor, ec);
- write_op_queue_.dispatch_all_operations(descriptor, ec);
- }
- }
}
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the
// operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
cancel_ops_unlocked(descriptor);
}
- // Enqueue cancellation of all operations associated with the given
- // descriptor. The handlers associated with the descriptor will be invoked
- // with the operation_aborted error. This function does not acquire the
- // kqueue_reactor's mutex, and so should only be used from within a reactor
- // handler.
- void enqueue_cancel_ops_unlocked(socket_type descriptor)
- {
- pending_cancellations_.push_back(descriptor);
- }
-
// Cancel any operations that are running against the descriptor and remove
// its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
Modified: trunk/boost/asio/detail/reactive_descriptor_service.hpp
==============================================================================
--- trunk/boost/asio/detail/reactive_descriptor_service.hpp (original)
+++ trunk/boost/asio/detail/reactive_descriptor_service.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -67,6 +67,9 @@
// Flags indicating the current state of the descriptor.
unsigned char flags_;
+
+ // Per-descriptor data used by the reactor.
+ typename Reactor::per_descriptor_data reactor_data_;
};
// The maximum number of buffers to support in a single operation.
@@ -97,7 +100,7 @@
{
if (impl.descriptor_ != -1)
{
- reactor_.close_descriptor(impl.descriptor_);
+ reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
if (impl.flags_ & implementation_type::internal_non_blocking)
{
@@ -125,7 +128,8 @@
return ec;
}
- if (int err = reactor_.register_descriptor(native_descriptor))
+ if (int err = reactor_.register_descriptor(
+ native_descriptor, impl.reactor_data_))
{
ec = boost::system::error_code(err,
boost::asio::error::get_system_category());
@@ -150,7 +154,7 @@
{
if (is_open(impl))
{
- reactor_.close_descriptor(impl.descriptor_);
+ reactor_.close_descriptor(impl.descriptor_, impl.reactor_data_);
if (impl.flags_ & implementation_type::internal_non_blocking)
{
@@ -187,7 +191,7 @@
return ec;
}
- reactor_.cancel_ops(impl.descriptor_);
+ reactor_.cancel_ops(impl.descriptor_, impl.reactor_data_);
ec = boost::system::error_code();
return ec;
}
@@ -405,7 +409,7 @@
impl.flags_ |= implementation_type::internal_non_blocking;
}
- reactor_.start_write_op(impl.descriptor_,
+ reactor_.start_write_op(impl.descriptor_, impl.reactor_data_,
write_handler<ConstBufferSequence, Handler>(
impl.descriptor_, this->get_io_service(), buffers, handler));
}
@@ -444,7 +448,7 @@
}
else
{
- reactor_.start_write_op(impl.descriptor_,
+ reactor_.start_write_op(impl.descriptor_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -526,7 +530,7 @@
// Wait until data can be read without blocking.
size_t read_some(implementation_type& impl,
- const null_buffers& buffers, boost::system::error_code& ec)
+ const null_buffers&, boost::system::error_code& ec)
{
if (!is_open(impl))
{
@@ -644,7 +648,7 @@
impl.flags_ |= implementation_type::internal_non_blocking;
}
- reactor_.start_read_op(impl.descriptor_,
+ reactor_.start_read_op(impl.descriptor_, impl.reactor_data_,
read_handler<MutableBufferSequence, Handler>(
impl.descriptor_, this->get_io_service(), buffers, handler));
}
@@ -662,7 +666,7 @@
}
else
{
- reactor_.start_read_op(impl.descriptor_,
+ reactor_.start_read_op(impl.descriptor_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
Modified: trunk/boost/asio/detail/reactive_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/reactive_socket_service.hpp (original)
+++ trunk/boost/asio/detail/reactive_socket_service.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -84,6 +84,9 @@
// The protocol associated with the socket.
protocol_type protocol_;
+
+ // Per-descriptor data used by the reactor.
+ typename Reactor::per_descriptor_data reactor_data_;
};
// The maximum number of buffers to support in a single operation.
@@ -114,7 +117,7 @@
{
if (impl.socket_ != invalid_socket)
{
- reactor_.close_descriptor(impl.socket_);
+ reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
if (impl.flags_ & implementation_type::internal_non_blocking)
{
@@ -156,7 +159,7 @@
if (sock.get() == invalid_socket)
return ec;
- if (int err = reactor_.register_descriptor(sock.get()))
+ if (int err = reactor_.register_descriptor(sock.get(), impl.reactor_data_))
{
ec = boost::system::error_code(err,
boost::asio::error::get_system_category());
@@ -181,7 +184,8 @@
return ec;
}
- if (int err = reactor_.register_descriptor(native_socket))
+ if (int err = reactor_.register_descriptor(
+ native_socket, impl.reactor_data_))
{
ec = boost::system::error_code(err,
boost::asio::error::get_system_category());
@@ -207,7 +211,7 @@
{
if (is_open(impl))
{
- reactor_.close_descriptor(impl.socket_);
+ reactor_.close_descriptor(impl.socket_, impl.reactor_data_);
if (impl.flags_ & implementation_type::internal_non_blocking)
{
@@ -243,7 +247,7 @@
return ec;
}
- reactor_.cancel_ops(impl.socket_);
+ reactor_.cancel_ops(impl.socket_, impl.reactor_data_);
ec = boost::system::error_code();
return ec;
}
@@ -683,7 +687,7 @@
impl.flags_ |= implementation_type::internal_non_blocking;
}
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
send_handler<ConstBufferSequence, Handler>(
impl.socket_, this->get_io_service(), buffers, flags, handler));
}
@@ -722,7 +726,7 @@
}
else
{
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -897,7 +901,7 @@
impl.flags_ |= implementation_type::internal_non_blocking;
}
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
send_to_handler<ConstBufferSequence, Handler>(
impl.socket_, this->get_io_service(), buffers,
destination, flags, handler));
@@ -916,7 +920,7 @@
}
else
{
- reactor_.start_write_op(impl.socket_,
+ reactor_.start_write_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -998,8 +1002,7 @@
}
// Wait until data can be received without blocking.
- size_t receive(implementation_type& impl,
- const null_buffers& buffers,
+ size_t receive(implementation_type& impl, const null_buffers&,
socket_base::message_flags, boost::system::error_code& ec)
{
if (!is_open(impl))
@@ -1127,13 +1130,13 @@
if (flags & socket_base::message_out_of_band)
{
- reactor_.start_except_op(impl.socket_,
+ reactor_.start_except_op(impl.socket_, impl.reactor_data_,
receive_handler<MutableBufferSequence, Handler>(
impl.socket_, this->get_io_service(), buffers, flags, handler));
}
else
{
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
receive_handler<MutableBufferSequence, Handler>(
impl.socket_, this->get_io_service(), buffers, flags, handler));
}
@@ -1152,12 +1155,12 @@
}
else if (flags & socket_base::message_out_of_band)
{
- reactor_.start_except_op(impl.socket_,
+ reactor_.start_except_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler));
}
else
{
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -1237,9 +1240,9 @@
}
// Wait until data can be received without blocking.
- size_t receive_from(implementation_type& impl,
- const null_buffers& buffers, endpoint_type& sender_endpoint,
- socket_base::message_flags, boost::system::error_code& ec)
+ size_t receive_from(implementation_type& impl, const null_buffers&,
+ endpoint_type& sender_endpoint, socket_base::message_flags,
+ boost::system::error_code& ec)
{
if (!is_open(impl))
{
@@ -1352,7 +1355,7 @@
impl.flags_ |= implementation_type::internal_non_blocking;
}
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
receive_from_handler<MutableBufferSequence, Handler>(
impl.socket_, this->get_io_service(), buffers,
sender_endpoint, flags, handler));
@@ -1377,12 +1380,12 @@
if (flags & socket_base::message_out_of_band)
{
- reactor_.start_except_op(impl.socket_,
+ reactor_.start_except_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler));
}
else
{
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -1590,7 +1593,7 @@
impl.flags_ |= implementation_type::internal_non_blocking;
}
- reactor_.start_read_op(impl.socket_,
+ reactor_.start_read_op(impl.socket_, impl.reactor_data_,
accept_handler<Socket, Handler>(
impl.socket_, this->get_io_service(),
peer, impl.protocol_, peer_endpoint,
@@ -1628,28 +1631,17 @@
class connect_handler
{
public:
- connect_handler(socket_type socket, boost::shared_ptr<bool> completed,
- boost::asio::io_service& io_service, Reactor& reactor, Handler handler)
+ connect_handler(socket_type socket,
+ boost::asio::io_service& io_service, Handler handler)
: socket_(socket),
- completed_(completed),
io_service_(io_service),
work_(io_service),
- reactor_(reactor),
handler_(handler)
{
}
bool operator()(const boost::system::error_code& result)
{
- // Check whether a handler has already been called for the connection.
- // If it has, then we don't want to do anything in this handler.
- if (*completed_)
- return true;
-
- // Cancel the other reactor operation for the connection.
- *completed_ = true;
- reactor_.enqueue_cancel_ops_unlocked(socket_);
-
// Check whether the operation was successful.
if (result)
{
@@ -1684,10 +1676,8 @@
private:
socket_type socket_;
- boost::shared_ptr<bool> completed_;
boost::asio::io_service& io_service_;
boost::asio::io_service::work work_;
- Reactor& reactor_;
Handler handler_;
};
@@ -1732,10 +1722,9 @@
{
// The connection is happening in the background, and we need to wait
// until the socket becomes writeable.
- boost::shared_ptr<bool> completed(new bool(false));
- reactor_.start_write_and_except_ops(impl.socket_,
- connect_handler<Handler>(impl.socket_, completed,
- this->get_io_service(), reactor_, handler));
+ reactor_.start_connect_op(impl.socket_, impl.reactor_data_,
+ connect_handler<Handler>(impl.socket_,
+ this->get_io_service(), handler));
}
else
{
Modified: trunk/boost/asio/detail/select_reactor.hpp
==============================================================================
--- trunk/boost/asio/detail/select_reactor.hpp (original)
+++ trunk/boost/asio/detail/select_reactor.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -23,6 +23,7 @@
#include <cstddef>
#include <boost/config.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
+#include <boost/shared_ptr.hpp>
#include <vector>
#include <boost/asio/detail/pop_options.hpp>
@@ -51,6 +52,11 @@
: public boost::asio::detail::service_base<select_reactor<Own_Thread> >
{
public:
+ // Per-descriptor data.
+ struct per_descriptor_data
+ {
+ };
+
// Constructor.
select_reactor(boost::asio::io_service& io_service)
: boost::asio::detail::service_base<
@@ -107,7 +113,7 @@
// Register a socket with the reactor. Returns 0 on success, system error
// code on failure.
- int register_descriptor(socket_type descriptor)
+ int register_descriptor(socket_type, per_descriptor_data&)
{
return 0;
}
@@ -115,8 +121,8 @@
// Start a new read operation. The handler object will be invoked when the
// given descriptor is ready to be read, or an error has occurred.
template <typename Handler>
- void start_read_op(socket_type descriptor, Handler handler,
- bool /*allow_speculative_read*/ = true)
+ void start_read_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool /*allow_speculative_read*/ = true)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (!shutdown_)
@@ -127,8 +133,8 @@
// Start a new write operation. The handler object will be invoked when the
// given descriptor is ready to be written, or an error has occurred.
template <typename Handler>
- void start_write_op(socket_type descriptor, Handler handler,
- bool /*allow_speculative_write*/ = true)
+ void start_write_op(socket_type descriptor, per_descriptor_data&,
+ Handler handler, bool /*allow_speculative_write*/ = true)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (!shutdown_)
@@ -139,7 +145,8 @@
// Start a new exception operation. The handler object will be invoked when
// the given descriptor has exception information, or an error has occurred.
template <typename Handler>
- void start_except_op(socket_type descriptor, Handler handler)
+ void start_except_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (!shutdown_)
@@ -147,18 +154,63 @@
interrupter_.interrupt();
}
+ // Wrapper for connect handlers to enable the handler object to be placed
+ // in both the write and the except operation queues, but ensure that only
+ // one of the handlers is called.
+ template <typename Handler>
+ class connect_handler_wrapper
+ {
+ public:
+ connect_handler_wrapper(socket_type descriptor,
+ boost::shared_ptr<bool> completed,
+ select_reactor<Own_Thread>& reactor, Handler handler)
+ : descriptor_(descriptor),
+ completed_(completed),
+ reactor_(reactor),
+ handler_(handler)
+ {
+ }
+
+ bool operator()(const boost::system::error_code& result)
+ {
+ // Check whether one of the handlers has already been called. If it has,
+ // then we don't want to do anything in this handler.
+ if (*completed_)
+ return true;
+
+ // Cancel the other reactor operation for the connection.
+ *completed_ = true;
+ reactor_.enqueue_cancel_ops_unlocked(descriptor_);
+
+ // Call the contained handler.
+ return handler_(result);
+ }
+
+ private:
+ socket_type descriptor_;
+ boost::shared_ptr<bool> completed_;
+ select_reactor<Own_Thread>& reactor_;
+ Handler handler_;
+ };
+
// Start new write and exception operations. The handler object will be
// invoked when the given descriptor is ready for writing or has exception
- // information available, or an error has occurred.
+ // information available, or an error has occurred. The handler will be called
+ // only once.
template <typename Handler>
- void start_write_and_except_ops(socket_type descriptor, Handler handler)
+ void start_connect_op(socket_type descriptor,
+ per_descriptor_data&, Handler handler)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
if (!shutdown_)
{
- bool interrupt = write_op_queue_.enqueue_operation(descriptor, handler);
- interrupt = except_op_queue_.enqueue_operation(descriptor, handler)
- || interrupt;
+ boost::shared_ptr<bool> completed(new bool(false));
+ connect_handler_wrapper<Handler> wrapped_handler(
+ descriptor, completed, *this, handler);
+ bool interrupt = write_op_queue_.enqueue_operation(
+ descriptor, wrapped_handler);
+ interrupt = except_op_queue_.enqueue_operation(
+ descriptor, wrapped_handler) || interrupt;
if (interrupt)
interrupter_.interrupt();
}
@@ -167,7 +219,7 @@
// Cancel all operations associated with the given descriptor. The
// handlers associated with the descriptor will be invoked with the
// operation_aborted error.
- void cancel_ops(socket_type descriptor)
+ void cancel_ops(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
cancel_ops_unlocked(descriptor);
@@ -176,8 +228,8 @@
// Enqueue cancellation of all operations associated with the given
// descriptor. The handlers associated with the descriptor will be invoked
// with the operation_aborted error. This function does not acquire the
- // select_reactor's mutex, and so should only be used from within a reactor
- // handler.
+ // select_reactor's mutex, and so should only be used when the reactor lock is
+ // already held.
void enqueue_cancel_ops_unlocked(socket_type descriptor)
{
pending_cancellations_.push_back(descriptor);
@@ -185,7 +237,7 @@
// Cancel any operations that are running against the descriptor and remove
// its registration from the reactor.
- void close_descriptor(socket_type descriptor)
+ void close_descriptor(socket_type descriptor, per_descriptor_data&)
{
boost::asio::detail::mutex::scoped_lock lock(mutex_);
cancel_ops_unlocked(descriptor);
Modified: trunk/boost/asio/detail/win_iocp_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_socket_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_socket_service.hpp 2008-05-01 18:00:26 EDT (Thu, 01 May 2008)
@@ -114,6 +114,9 @@
endpoint_type remote_endpoint_;
};
+ // The type of the reactor used for connect operations.
+ typedef detail::select_reactor<true> reactor_type;
+
// The implementation type of the socket.
class implementation_type
{
@@ -157,6 +160,9 @@
// The protocol associated with the socket.
protocol_type protocol_;
+ // Per-descriptor data used by the reactor.
+ reactor_type::per_descriptor_data reactor_data_;
+
#if defined(BOOST_ASIO_ENABLE_CANCELIO)
// The ID of the thread from which it is safe to cancel asynchronous
// operations. 0 means no asynchronous operations have been started yet.
@@ -305,7 +311,7 @@
interlocked_compare_exchange_pointer(
reinterpret_cast<void**>(&reactor_), 0, 0));
if (reactor)
- reactor->close_descriptor(impl.socket_);
+ reactor->close_descriptor(impl.socket_, impl.reactor_data_);
if (socket_ops::close(impl.socket_, ec) == socket_error_retval)
return ec;
@@ -335,6 +341,7 @@
if (!is_open(impl))
{
ec = boost::asio::error::bad_descriptor;
+ return ec;
}
else if (FARPROC cancel_io_ex_ptr = ::GetProcAddress(
::GetModuleHandleA("KERNEL32"), "CancelIoEx"))
@@ -922,7 +929,7 @@
reinterpret_cast<void**>(&reactor_), reactor);
}
- reactor->start_write_op(impl.socket_,
+ reactor->start_write_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -1149,7 +1156,7 @@
reinterpret_cast<void**>(&reactor_), reactor);
}
- reactor->start_write_op(impl.socket_,
+ reactor->start_write_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -1462,12 +1469,12 @@
if (flags & socket_base::message_out_of_band)
{
- reactor->start_except_op(impl.socket_,
+ reactor->start_except_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler));
}
else
{
- reactor->start_read_op(impl.socket_,
+ reactor->start_read_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -1735,12 +1742,12 @@
if (flags & socket_base::message_out_of_band)
{
- reactor->start_except_op(impl.socket_,
+ reactor->start_except_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler));
}
else
{
- reactor->start_read_op(impl.socket_,
+ reactor->start_read_op(impl.socket_, impl.reactor_data_,
null_buffers_handler<Handler>(this->get_io_service(), handler),
false);
}
@@ -2109,14 +2116,10 @@
{
public:
connect_handler(socket_type socket, bool user_set_non_blocking,
- boost::shared_ptr<bool> completed,
- boost::asio::io_service& io_service,
- reactor_type& reactor, Handler handler)
+ boost::asio::io_service& io_service, Handler handler)
: socket_(socket),
user_set_non_blocking_(user_set_non_blocking),
- completed_(completed),
io_service_(io_service),
- reactor_(reactor),
work_(io_service),
handler_(handler)
{
@@ -2124,15 +2127,6 @@
bool operator()(const boost::system::error_code& result)
{
- // Check whether a handler has already been called for the connection.
- // If it has, then we don't want to do anything in this handler.
- if (*completed_)
- return true;
-
- // Cancel the other reactor operation for the connection.
- *completed_ = true;
- reactor_.enqueue_cancel_ops_unlocked(socket_);
-
// Check whether the operation was successful.
if (result)
{
@@ -2180,9 +2174,7 @@
private:
socket_type socket_;
bool user_set_non_blocking_;
- boost::shared_ptr<bool> completed_;
boost::asio::io_service& io_service_;
- reactor_type& reactor_;
boost::asio::io_service::work work_;
Handler handler_;
};
@@ -2250,11 +2242,11 @@
// The connection is happening in the background, and we need to wait
// until the socket becomes writeable.
boost::shared_ptr<bool> completed(new bool(false));
- reactor->start_write_and_except_ops(impl.socket_,
+ reactor->start_connect_op(impl.socket_, impl.reactor_data_,
connect_handler<Handler>(
impl.socket_,
(impl.flags_ & implementation_type::user_set_non_blocking) != 0,
- completed, this->get_io_service(), *reactor, handler));
+ this->get_io_service(), handler));
}
else
{
@@ -2285,7 +2277,7 @@
interlocked_compare_exchange_pointer(
reinterpret_cast<void**>(&reactor_), 0, 0));
if (reactor)
- reactor->close_descriptor(impl.socket_);
+ reactor->close_descriptor(impl.socket_, impl.reactor_data_);
// The socket destructor must not block. If the user has changed the
// linger option to block in the foreground, we will change it back to the
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk