Boost logo

Boost-Commit :

From: chris_at_[hidden]
Date: 2007-11-08 02:52:50


Author: chris_kohlhoff
Date: 2007-11-08 02:52:49 EST (Thu, 08 Nov 2007)
New Revision: 40924
URL: http://svn.boost.org/trac/boost/changeset/40924

Log:
Fix timer stalls.

Text files modified:
   trunk/boost/asio/detail/win_iocp_io_service.hpp | 102 ++++++++++++++++++++++++---------------
   1 files changed, 63 insertions(+), 39 deletions(-)

Modified: trunk/boost/asio/detail/win_iocp_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_io_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_io_service.hpp 2007-11-08 02:52:49 EST (Thu, 08 Nov 2007)
@@ -306,7 +306,8 @@
       {
         timer_interrupt_issued_ = true;
         lock.unlock();
- ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, steal_timer_dispatching, 0);
       }
     }
   }
@@ -326,7 +327,8 @@
     {
       timer_interrupt_issued_ = true;
       lock.unlock();
- ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, steal_timer_dispatching, 0);
     }
     return n;
   }
@@ -337,19 +339,17 @@
   // either 0 or 1).
   size_t do_one(bool block, boost::system::error_code& ec)
   {
- bool doing_timers = false;
+ long this_thread_id = static_cast<long>(::GetCurrentThreadId());
+
     for (;;)
     {
- // Try to become the timer thread.
- if (!doing_timers)
- {
- doing_timers = (InterlockedCompareExchange(&timer_thread_,
- static_cast<long>(GetCurrentThreadId()), 0) == 0);
- }
+ // Try to acquire responsibility for dispatching timers.
+ bool dispatching_timers = (::InterlockedCompareExchange(
+ &timer_thread_, this_thread_id, 0) == 0);
 
       // Calculate timeout for GetQueuedCompletionStatus call.
- DWORD timeout = 1000;
- if (doing_timers)
+ DWORD timeout = max_timeout;
+ if (dispatching_timers)
       {
         boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
         timer_interrupt_issued_ = false;
@@ -370,35 +370,37 @@
       DWORD last_error = ::GetLastError();
 
       // Dispatch any pending timers.
- if (doing_timers)
+ if (dispatching_timers)
       {
         boost::asio::detail::mutex::scoped_lock lock(timer_mutex_);
+ timer_queues_copy_ = timer_queues_;
         for (std::size_t i = 0; i < timer_queues_.size(); ++i)
         {
           timer_queues_[i]->dispatch_timers();
           timer_queues_[i]->dispatch_cancellations();
+ timer_queues_[i]->cleanup_timers();
         }
-
- // Clean up timers. We must not hold the lock while cleaning up timers
- // since the destructors may make calls back into this service. We make
- // a copy of the vector of timer queues since the original may be
- // modified while the lock is not held.
- timer_queues_for_cleanup_ = timer_queues_;
- lock.unlock();
- for (std::size_t i = 0; i < timer_queues_for_cleanup_.size(); ++i)
- timer_queues_for_cleanup_[i]->cleanup_timers();
       }
 
       if (!ok && overlapped == 0)
       {
         if (block && last_error == WAIT_TIMEOUT)
+ {
+ // Relinquish responsibility for dispatching timers.
+ if (dispatching_timers)
+ {
+ ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
+ }
+
           continue;
+ }
 
- // Pass responsibility for timers to another thread.
- if (doing_timers)
+ // Transfer responsibility for dispatching timers to another thread.
+ if (dispatching_timers && ::InterlockedCompareExchange(
+ &timer_thread_, 0, this_thread_id) == this_thread_id)
         {
- ::InterlockedExchange(&timer_thread_, 0);
- ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, transfer_timer_dispatching, 0);
         }
 
         ec = boost::system::error_code();
@@ -412,11 +414,12 @@
           last_error = completion_key;
         }
 
- // Pass responsibility for timers to another thread.
- if (doing_timers)
+ // Transfer responsibility for dispatching timers to another thread.
+ if (dispatching_timers && ::InterlockedCompareExchange(
+ &timer_thread_, 0, this_thread_id) == this_thread_id)
         {
- ::InterlockedExchange(&timer_thread_, 0);
- ::PostQueuedCompletionStatus(iocp_.handle, 0, 1, 0);
+ ::PostQueuedCompletionStatus(iocp_.handle,
+ 0, transfer_timer_dispatching, 0);
         }
 
         // Ensure that the io_service does not exit due to running out of work
@@ -430,9 +433,15 @@
         ec = boost::system::error_code();
         return 1;
       }
- else if (completion_key == 1)
+ else if (completion_key == transfer_timer_dispatching)
+ {
+ // Woken up to try to acquire responsibility for dispatching timers.
+ ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
+ }
+ else if (completion_key == steal_timer_dispatching)
       {
- // Woken up to try to become the timer thread.
+ // Woken up to steal responsibility for dispatching timers.
+ ::InterlockedExchange(&timer_thread_, 0);
       }
       else
       {
@@ -440,10 +449,10 @@
         // interrupts from a previous run invocation are ignored.
         if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
         {
- // Pass responsibility for timers to another thread.
- if (doing_timers)
+ // Relinquish responsibility for dispatching timers.
+ if (dispatching_timers)
           {
- ::InterlockedExchange(&timer_thread_, 0);
+ ::InterlockedCompareExchange(&timer_thread_, 0, this_thread_id);
           }
 
           // Wake up next thread that is blocked on GetQueuedCompletionStatus.
@@ -477,10 +486,10 @@
   DWORD get_timeout()
   {
     if (all_timer_queues_are_empty())
- return 1000;
+ return max_timeout;
 
     boost::posix_time::time_duration minimum_wait_duration
- = boost::posix_time::seconds(1);
+ = boost::posix_time::milliseconds(max_timeout);
 
     for (std::size_t i = 0; i < timer_queues_.size(); ++i)
     {
@@ -591,6 +600,20 @@
   // Flag to indicate whether the service has been shut down.
   long shutdown_;
 
+ enum
+ {
+ // Maximum GetQueuedCompletionStatus timeout, in milliseconds.
+ max_timeout = 1000,
+
+ // Completion key value to indicate that responsibility for dispatching
+ // timers is being cooperatively transferred from one thread to another.
+ transfer_timer_dispatching = 1,
+
+ // Completion key value to indicate that responsibility for dispatching
+ // timers should be stolen from another thread.
+ steal_timer_dispatching = 2
+ };
+
   // The thread that's currently in charge of dispatching timers.
   long timer_thread_;
 
@@ -603,9 +626,10 @@
   // The timer queues.
   std::vector<timer_queue_base*> timer_queues_;
 
- // A copy of the timer queues, used when cleaning up timers. The copy is
- // stored as a class data member to avoid unnecessary memory allocation.
- std::vector<timer_queue_base*> timer_queues_for_cleanup_;
+ // A copy of the timer queues, used when dispatching, cancelling and cleaning
+ // up timers. The copy is stored as a class data member to avoid unnecessary
+ // memory allocation.
+ std::vector<timer_queue_base*> timer_queues_copy_;
 };
 
 } // namespace detail


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk