Boost logo

Boost-Commit :

From: chris_at_[hidden]
Date: 2007-11-07 23:10:14


Author: chris_kohlhoff
Date: 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
New Revision: 40919
URL: http://svn.boost.org/trac/boost/changeset/40919

Log:
Eliminate the need for an extra thread to perform timer dispatching.

Text files modified:
   trunk/boost/asio/deadline_timer_service.hpp | 3
   trunk/boost/asio/detail/win_iocp_io_service.hpp | 197 +++++++++++++++++++++++++++++++++++++++
   2 files changed, 195 insertions(+), 5 deletions(-)

Modified: trunk/boost/asio/deadline_timer_service.hpp
==============================================================================
--- trunk/boost/asio/deadline_timer_service.hpp (original)
+++ trunk/boost/asio/deadline_timer_service.hpp 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
@@ -29,6 +29,7 @@
 #include <boost/asio/detail/kqueue_reactor.hpp>
 #include <boost/asio/detail/select_reactor.hpp>
 #include <boost/asio/detail/service_base.hpp>
+#include <boost/asio/detail/win_iocp_io_service.hpp>
 
 namespace boost {
 namespace asio {
@@ -63,7 +64,7 @@
   // The type of the platform-specific implementation.
 #if defined(BOOST_ASIO_HAS_IOCP)
   typedef detail::deadline_timer_service<
- traits_type, detail::select_reactor<true> > service_impl_type;
+ traits_type, detail::win_iocp_io_service> service_impl_type;
 #elif defined(BOOST_ASIO_HAS_EPOLL)
   typedef detail::deadline_timer_service<
     traits_type, detail::epoll_reactor<false> > service_impl_type;

Modified: trunk/boost/asio/detail/win_iocp_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_io_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_io_service.hpp 2007-11-07 23:10:14 EST (Wed, 07 Nov 2007)
@@ -33,7 +33,9 @@
 #include <boost/asio/detail/handler_invoke_helpers.hpp>
 #include <boost/asio/detail/service_base.hpp>
 #include <boost/asio/detail/socket_types.hpp>
+#include <boost/asio/detail/timer_queue.hpp>
 #include <boost/asio/detail/win_iocp_operation.hpp>
+#include <boost/asio/detail/mutex.hpp>
 
 namespace boost {
 namespace asio {
@@ -52,7 +54,9 @@
       iocp_(),
       outstanding_work_(0),
       stopped_(0),
- shutdown_(0)
+ shutdown_(0),
+ timer_thread_(0),
+ timer_interrupt_issued_(false)
   {
   }
 
@@ -94,6 +98,10 @@
       if (overlapped)
         static_cast<operation*>(overlapped)->destroy();
     }
+
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ timer_queues_[i]->destroy_timers();
+ timer_queues_.clear();
   }
 
   // Register a handle with the IO completion port.
@@ -258,14 +266,96 @@
     }
   }
 
+ // Add a new timer queue to the service.
+ template <typename Time_Traits>
+ void add_timer_queue(timer_queue<Time_Traits>& timer_queue)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_queues_.push_back(&timer_queue);
+ }
+
+ // Remove a timer queue from the service.
+ template <typename Time_Traits>
+ void remove_timer_queue(timer_queue<Time_Traits>& timer_queue)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ if (timer_queues_[i] == &timer_queue)
+ {
+ timer_queues_.erase(timer_queues_.begin() + i);
+ return;
+ }
+ }
+ }
+
+ // Schedule a timer in the given timer queue to expire at the specified
+ // absolute time. The handler object will be invoked when the timer expires.
+ template <typename Time_Traits, typename Handler>
+ void schedule_timer(timer_queue<Time_Traits>& timer_queue,
+ const typename Time_Traits::time_type& time, Handler handler, void* token)
+ {
+ // If the service has been shut down we silently discard the timer.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return;
+
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ if (timer_queue.enqueue_timer(time, handler, token))
+ {
+ if (!timer_interrupt_issued_)
+ {
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+ }
+ }
+
+ // Cancel the timer associated with the given token. Returns the number of
+ // handlers that have been posted or dispatched.
+ template <typename Time_Traits>
+ std::size_t cancel_timer(timer_queue<Time_Traits>& timer_queue, void* token)
+ {
+ // If the service has been shut down we silently ignore the cancellation.
+ if (::InterlockedExchangeAdd(&shutdown_, 0) != 0)
+ return 0;
+
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ std::size_t n = timer_queue.cancel_timer(token);
+ if (n > 0 && !timer_interrupt_issued_)
+ {
+ timer_interrupt_issued_ = true;
+ lock.unlock();
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+ return n;
+ }
+
 private:
   // Dequeues at most one operation from the I/O completion port, and then
   // executes it. Returns the number of operations that were dequeued (i.e.
   // either 0 or 1).
   size_t do_one(bool block, boost::system::error_code& ec)
   {
+ bool doing_timers = false;
     for (;;)
     {
+ // Try to become the timer thread.
+ if (!doing_timers)
+ {
+ doing_timers = (InterlockedCompareExchange(&timer_thread_,
+ static_cast<long>(GetCurrentThreadId()), 0) == 0);
+ }
+
+ // Calculate timeout for GetQueuedCompletionStatus call.
+ DWORD timeout = 1000;
+ if (doing_timers)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_interrupt_issued_ = false;
+ timeout = get_timeout();
+ }
+
       // Get the next operation from the queue.
       DWORD bytes_transferred = 0;
 #if (WINVER < 0x0500)
@@ -276,18 +366,45 @@
       LPOVERLAPPED overlapped = 0;
       ::SetLastError(0);
       BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
- &completion_key, &overlapped, block ? 1000 : 0);
+ &completion_key, &overlapped, block ? timeout : 0);
       DWORD last_error = ::GetLastError();
 
+ // Dispatch any pending timers.
+ if (doing_timers)
+ {
+ boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ timer_queues_[i]->dispatch_timers();
+ timer_queues_[i]->dispatch_cancellations();
+ }
+
+ // Clean up timers. We must not hold the lock while cleaning up timers
+ // since the destructors may make calls back into this service. We make
+ // a copy of the vector of timer queues since the original may be
+ // modified while the lock is not held.
+ timer_queues_for_cleanup_ = timer_queues_;
+ lock.unlock();
+ for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
+ timer_queues_for_cleanup_[i]->cleanup_timers();
+ }
+
       if (!ok && overlapped == 0)
       {
         if (block && last_error == WAIT_TIMEOUT)
           continue;
+
+ // Pass responsibility for timers to another thread.
+ if (doing_timers)
+ {
+ ::InterlockedExchange(&timer_thread_, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+
         ec = boost::system::error_code();
         return 0;
       }
-
- if (overlapped)
+ else if (overlapped)
       {
         // We may have been passed a last_error value in the completion_key.
         if (last_error == 0)
@@ -295,6 +412,13 @@
           last_error = completion_key;
         }
 
+ // Pass responsibility for timers to another thread.
+ if (doing_timers)
+ {
+ ::InterlockedExchange(&timer_thread_, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ }
+
         // Ensure that the io_service does not exit due to running out of work
         // while we make the upcall.
         auto_work work(*this);
@@ -306,12 +430,22 @@
         ec = boost::system::error_code();
         return 1;
       }
+ else if (completion_key == 1)
+ {
+ // Woken up to try to become the timer thread.
+ }
       else
       {
         // The stopped_ flag is always checked to ensure that any leftover
         // interrupts from a previous run invocation are ignored.
         if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
         {
+ // Pass responsibility for timers to another thread.
+ if (doing_timers)
+ {
+ ::InterlockedExchange(&timer_thread_, 0);
+ }
+
           // Wake up next thread that is blocked on GetQueuedCompletionStatus.
           if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
           {
@@ -328,6 +462,45 @@
     }
   }
 
+ // Check if all timer queues are empty.
+ bool all_timer_queues_are_empty() const
+ {
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ if (!timer_queues_[i]->empty())
+ return false;
+ return true;
+ }
+
+ // Get the timeout value for the GetQueuedCompletionStatus call. The timeout
+ // value is returned as a number of milliseconds. We will wait no longer than
+ // 1000 milliseconds.
+ DWORD get_timeout()
+ {
+ if (all_timer_queues_are_empty())
+ return 1000;
+
+ boost::posix_time::time_duration minimum_wait_duration
+ = boost::posix_time::seconds(1);
+
+ for (std::size_t i = 0; i < timer_queues_.size(); ++i)
+ {
+ boost::posix_time::time_duration wait_duration
+ = timer_queues_[i]->wait_duration();
+ if (wait_duration < minimum_wait_duration)
+ minimum_wait_duration = wait_duration;
+ }
+
+ if (minimum_wait_duration > boost::posix_time::time_duration())
+ {
+ int milliseconds = minimum_wait_duration.total_milliseconds();
+ return static_cast<DWORD>(milliseconds > 0 ? milliseconds : 1);
+ }
+ else
+ {
+ return 0;
+ }
+ }
+
   struct auto_work
   {
     auto_work(win_iocp_io_service& io_service)
@@ -417,6 +590,22 @@
 
   // Flag to indicate whether the service has been shut down.
   long shutdown_;
+
+ // The thread that's currently in charge of dispatching timers.
+ long timer_thread_;
+
+ // Mutex for protecting access to the timer queues.
+ mutex timer_mutex_;
+
+ // Whether a thread has been interrupted to process a new timeout.
+ bool timer_interrupt_issued_;
+
+ // The timer queues.
+ std::vector<timer_queue_base*> timer_queues_;
+
+ // A copy of the timer queues, used when cleaning up timers. The copy is
+ // stored as a class data member to avoid unnecessary memory allocation.
+ std::vector<timer_queue_base*> timer_queues_for_cleanup_;
 };
 
 } // namespace detail


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk