|
Boost-Commit : |
From: chris_at_[hidden]
Date: 2007-11-07 23:10:14
Author: chris_kohlhoff
Date: 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
New Revision: 40919
URL: http://svn.boost.org/trac/boost/changeset/40919
Log:
Eliminate the need for an extra thread to perform timer dispatching.
Text files modified:
trunk/boost/asio/deadline_timer_service.hpp | 3
trunk/boost/asio/detail/win_iocp_io_service.hpp | 197 +++++++++++++++++++++++++++++++++++++++
2 files changed, 195 insertions(+), 5 deletions(-)
Modified: trunk/boost/asio/deadline_timer_service.hpp
==============================================================================
--- trunk/boost/asio/deadline_timer_service.hpp (original)
+++ trunk/boost/asio/deadline_timer_service.hpp 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
@@ -29,6 +29,7 @@
#include <boost/asio/detail/kqueue_reactor.hpp>
#include <boost/asio/detail/select_reactor.hpp>
#include <boost/asio/detail/service_base.hpp>
+#include <boost/asio/detail/win_iocp_io_service.hpp>
namespace boost {
namespace asio {
@@ -63,7 +64,7 @@
// The type of the platform-specific implementation.
#if defined(BOOST_ASIO_HAS_IOCP)
typedef detail::deadline_timer_service<
- traits_type, detail::select_reactor<true> > service_impl_type;
+ traits_type, detail::win_iocp_io_service> service_impl_type;
#elif defined(BOOST_ASIO_HAS_EPOLL)
typedef detail::deadline_timer_service<
traits_type, detail::epoll_reactor<false> > service_impl_type;
Modified: trunk/boost/asio/detail/win_iocp_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_io_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_io_service.hpp 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
@@ -33,7 +33,9 @@
#include <boost/asio/detail/handler_invoke_helpers.hpp>
#include <boost/asio/detail/service_base.hpp>
#include <boost/asio/detail/socket_types.hpp>
+#include <boost/asio/detail/timer_queue.hpp>
#include <boost/asio/detail/win_iocp_operation.hpp>
+#include <boost/asio/detail/mutex.hpp>
namespace boost {
namespace asio {
@@ -52,7 +54,9 @@
iocp_(),
outstanding_work_(0),
stopped_(0),
- shutdown_(0)
+ shutdown_(0),
+ timer_thread_(0),
+ timer_interrupt_issued_(false)
{
}
@@ -94,6 +98,10 @@
if (overlapped)
static_cast<operation*>(overlapped)->destroy();
}
+
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ timer_queues_[i]->destroy_timers();
+ timer_queues_.clear();
}
// Register a handle with the IO completion port.
@@ -258,14 +266,96 @@
}
}
+ // Add a new timer queue to the service.
+ template <typename Time_Traits>
+ void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_queues_.push_back(&timer_queue);
+ }
+
+ // Remove a timer queue from the service.
+ template <typename Time_Traits>
+ void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ if (timer_queues_[i] == &timer_queue)
+ {
+ timer_queues_.erase(timer_queues_.begin() + i);
+ return;
+ }
+ }
+ }
+
+ // Schedule a timer in the given timer queue to expire at the specified
+ // absolute time. The handler object will be invoked when the timer expires.
+ template <typename Time_Traits, typename Handler>
+ void schedule_timer(timer_queue<Time_Traits>& timer_queue,
+ const typename Time_Traits::time_type& time, Handler handler, void* token)
+ {
+ // If the service has been shut down we silently discard the timer.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return;
+
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ if (timer_queue.enqueue_timer(time, handler, token))
+ {
+ if (!timer_interrupt_issued_)
+ {
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+ }
+ }
+
+ // Cancel the timer associated with the given token. Returns the number of
+ // handlers that have been posted or dispatched.
+ template <typename Time_Traits>
+ std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
+ {
+ // If the service has been shut down we silently ignore the cancellation.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return 0;
+
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ std::size_t n = timer_queue.cancel_timer(token);
+ if (n > 0 && !timer_interrupt_issued_)
+ {
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+ return n;
+ }
+
private:
// Dequeues at most one operation from the I/O completion port, and then
// executes it. Returns the number of operations that were dequeued (i.e.
// either 0 or 1).
size_t do_one(bool block, boost::system::error_code& ec)
{
+ bool doing_timers = false;
for (;;)
{
+ // Try to become the timer thread.
+ if (!doing_timers)
+ {
+ doing_timers = (InterlockedCompareExchange(&timer_thread_,
+ static_cast<long>(GetCurrentThreadId()), 0) == 0);
+ }
+
+ // Calculate timeout for GetQueuedCompletionStatus call.
+ DWORD timeout = 1000;
+ if (doing_timers)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_interrupt_issued_ = false;
+ timeout = get_timeout();
+ }
+
// Get the next operation from the queue.
DWORD bytes_transferred = 0;
#if (WINVER < 0x0500)
@@ -276,18 +366,45 @@
LPOVERLAPPED overlapped = 0;
::SetLastError(0);
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
- &completion_key, &overlapped, block ? 1000 : 0);
+ &completion_key, &overlapped, block ? timeout : 0);
DWORD last_error = ::GetLastError();
+ // Dispatch any pending timers.
+ if (doing_timers)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ timer_queues_[i]->dispatch_timers();
+ timer_queues_[i]->dispatch_cancellations();
+ }
+
+ // Clean up timers. We must not hold the lock while cleaning up timers
+ // since the destructors may make calls back into this service. We make
+ // a copy of the vector of timer queues since the original may be
+ // modified while the lock is not held.
+ timer_queues_for_cleanup_ = timer_queues_;
+ lock.unlock();
+ for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
+ timer_queues_for_cleanup_[i]->cleanup_timers();
+ }
+
if (!ok && overlapped == 0)
{
if (block && last_error == WAIT_TIMEOUT)
continue;
+
+ // Pass responsibility for timers to another thread.
+ if (doing_timers)
+ {
+ ::InterlockedExchange(&timer_thread_, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+
ec = boost::system::error_code();
return 0;
}
-
- if (overlapped)
+ else if (overlapped)
{
// We may have been passed a last_error value in the completion_key.
if (last_error == 0)
@@ -295,6 +412,13 @@
last_error = completion_key;
}
+ // Pass responsibility for timers to another thread.
+ if (doing_timers)
+ {
+ ::InterlockedExchange(&timer_thread_, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+
// Ensure that the io_service does not exit due to running out of work
// while we make the upcall.
auto_work work(*this);
@@ -306,12 +430,22 @@
ec = boost::system::error_code();
return 1;
}
+ else if (completion_key == 1)
+ {
+ // Woken up to try to become the timer thread.
+ }
else
{
// The stopped_ flag is always checked to ensure that any leftover
// interrupts from a previous run invocation are ignored.
if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
{
+ // Pass responsibility for timers to another thread.
+ if (doing_timers)
+ {
+ ::InterlockedExchange(&timer_thread_, 0);
+ }
+
// Wake up next thread that is blocked on GetQueuedCompletionStatus.
if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
{
@@ -328,6 +462,45 @@
}
}
+ // Check if all timer queues are empty.
+ bool all_timer_queues_are_empty() const
+ {
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ if (!timer_queues_[i]->empty())
+ return false;
+ return true;
+ }
+
+ // Get the timeout value for the GetQueuedCompletionStatus call. The timeout
+ // value is returned as a number of milliseconds. We will wait no longer than
+ // 1000 milliseconds.
+ DWORD get_timeout()
+ {
+ if (all_timer_queues_are_empty())
+ return 1000;
+
+ boost::posix_time::time_duration minimum_wait_duration
+ = boost::posix_time::seconds(1);
+
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ boost::posix_time::time_duration wait_duration
+ = timer_queues_[i]->wait_duration();
+ if (wait_duration < minimum_wait_duration)
+ minimum_wait_duration = wait_duration;
+ }
+
+ if (minimum_wait_duration > boost::posix_time::time_duration())
+ {
+ int milliseconds = minimum_wait_duration.total_milliseconds();
+ return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
struct auto_work
{
auto_work(win_iocp_io_service& io_service)
@@ -417,6 +590,22 @@
// Flag to indicate whether the service has been shut down.
long shutdown_;
+
+ // The thread that's currently in charge of dispatching timers.
+ long timer_thread_;
+
+ // Mutex for protecting access to the timer queues.
+ mutex timer_mutex_;
+
+ // Whether a thread has been interrupted to process a new timeout.
+ bool timer_interrupt_issued_;
+
+ // The timer queues.
+ std::vector<timer_queue_base*> timer_queues_;
+
+ // A copy of the timer queues, used when cleaning up timers. The copy is
+ // stored as a class data member to avoid unnecessary memory allocation.
+ std::vector<timer_queue_base*> timer_queues_for_cleanup_;
};
} // namespace detail
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk