Boost logo

Boost-Commit :

From: anthony_at_[hidden]
Date: 2008-04-21 12:22:17


Author: anthonyw
Date: 2008-04-21 12:22:16 EDT (Mon, 21 Apr 2008)
New Revision: 44699
URL: http://svn.boost.org/trac/boost/changeset/44699

Log:
Revamped condition variable to try and fix swallowed-notify problems (trac issue #1834)
Text files modified:
   trunk/boost/thread/win32/condition_variable.hpp | 230 +++++++++++++++------------------------
   1 files changed, 89 insertions(+), 141 deletions(-)

Modified: trunk/boost/thread/win32/condition_variable.hpp
==============================================================================
--- trunk/boost/thread/win32/condition_variable.hpp (original)
+++ trunk/boost/thread/win32/condition_variable.hpp 2008-04-21 12:22:16 EDT (Mon, 21 Apr 2008)
@@ -14,6 +14,8 @@
 #include <boost/thread/thread_time.hpp>
 #include "interlocked_read.hpp"
 #include <boost/thread/xtime.hpp>
+#include <vector>
+#include <boost/intrusive_ptr.hpp>
 
 namespace boost
 {
@@ -27,79 +29,50 @@
 
             struct list_entry
             {
- detail::win32::handle semaphore;
- long count;
+ detail::win32::handle_manager semaphore;
+ detail::win32::handle_manager wake_sem;
+ long waiters;
                 bool notified;
+ long references;
 
                 list_entry():
- semaphore(0),count(0),notified(0)
+ semaphore(detail::win32::create_anonymous_semaphore(0,LONG_MAX)),
+ wake_sem(0),
+ waiters(1),notified(false),references(0)
                 {}
 
- void release(unsigned count_to_release=1)
+ void release(unsigned count_to_release)
                 {
+ notified=true;
                     detail::win32::ReleaseSemaphore(semaphore,count_to_release,0);
                 }
-
- };
-
- BOOST_STATIC_CONSTANT(unsigned,generation_count=3);
-
- list_entry generations[generation_count];
- detail::win32::handle wake_sem;
-
- void wake_waiters(long count_to_wake)
- {
- detail::interlocked_write_release(&total_count,total_count-count_to_wake);
- detail::win32::ReleaseSemaphore(wake_sem,count_to_wake,0);
- }
-
 
- static bool no_waiters(list_entry const& entry)
- {
- return entry.count==0;
- }
-
- void shift_generations_down()
- {
- list_entry* const last_active_entry=std::remove_if(generations,generations+generation_count,no_waiters);
- if(last_active_entry==generations+generation_count)
+ friend void intrusive_ptr_add_ref(list_entry * p)
                 {
- broadcast_entry(generations[generation_count-1]);
+ BOOST_INTERLOCKED_INCREMENT(&p->references);
                 }
- else
+
+ friend void intrusive_ptr_release(list_entry * p)
                 {
- active_generation_count=unsigned(last_active_entry-generations)+1;
+ if(!BOOST_INTERLOCKED_DECREMENT(&p->references))
+ {
+ delete p;
+ }
                 }
+ };
 
-#ifdef BOOST_MSVC
-#pragma warning(push)
-#pragma warning(disable:4996)
-#endif
- std::copy_backward(generations,generations+active_generation_count-1,generations+active_generation_count);
-#ifdef BOOST_MSVC
-#pragma warning(pop)
-#endif
- generations[0]=list_entry();
- }
+ typedef boost::intrusive_ptr<list_entry> entry_ptr;
+ typedef std::vector<entry_ptr> generation_list;
 
- void broadcast_entry(list_entry& entry)
- {
- entry.release(entry.count);
- entry.count=0;
- dispose_entry(entry);
- }
-
+ generation_list generations;
+ detail::win32::handle_manager wake_sem;
 
- void dispose_entry(list_entry& entry)
+ void wake_waiters(long count_to_wake)
             {
- if(entry.semaphore)
- {
- BOOST_VERIFY(detail::win32::CloseHandle(entry.semaphore));
- entry.semaphore=0;
- }
- entry.notified=false;
+ detail::interlocked_write_release(&total_count,total_count-count_to_wake);
+ detail::win32::ReleaseSemaphore(wake_sem,count_to_wake,0);
             }
-
+
             template<typename lock_type>
             struct relocker
             {
@@ -123,77 +96,79 @@
                     
                 }
             private:
+ relocker(relocker&);
                 void operator=(relocker&);
             };
             
 
- template<typename lock_type>
- void start_wait_loop_first_time(relocker<lock_type>& locker,
- detail::win32::handle_manager& local_wake_sem)
+ entry_ptr get_wait_entry()
             {
- detail::interlocked_write_release(&total_count,total_count+1);
- locker.unlock();
+ boost::lock_guard<boost::mutex> internal_lock(internal_mutex);
+
                 if(!wake_sem)
                 {
                     wake_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
                     BOOST_ASSERT(wake_sem);
                 }
- local_wake_sem=detail::win32::duplicate_handle(wake_sem);
-
- if(generations[0].notified)
+
+ detail::interlocked_write_release(&total_count,total_count+1);
+ if(generations.empty() || generations.back()->notified)
                 {
- shift_generations_down();
+ entry_ptr new_entry(new list_entry);
+ new_entry->wake_sem=wake_sem.duplicate();
+ generations.push_back(new_entry);
+ return new_entry;
                 }
- else if(!active_generation_count)
+ else
                 {
- active_generation_count=1;
+ BOOST_INTERLOCKED_INCREMENT(&generations.back()->waiters);
+ return generations.back();
                 }
             }
-
- void ensure_generation_present()
+
+ struct entry_manager
             {
- if(!generations[0].semaphore)
+ entry_ptr const entry;
+
+ entry_manager(entry_ptr const& entry_):
+ entry(entry_)
+ {}
+
+ ~entry_manager()
                 {
- generations[0].semaphore=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
- BOOST_ASSERT(generations[0].semaphore);
+ BOOST_INTERLOCKED_DECREMENT(&entry->waiters);
                 }
- }
-
- template<typename lock_type>
- void start_wait_loop(relocker<lock_type>& locker,
- detail::win32::handle_manager& local_wake_sem,
- detail::win32::handle_manager& sem)
- {
- boost::mutex::scoped_lock internal_lock(internal_mutex);
- if(!local_wake_sem)
+
+ list_entry* operator->()
                 {
- start_wait_loop_first_time(locker,local_wake_sem);
+ return entry.get();
                 }
- ensure_generation_present();
- ++generations[0].count;
- sem=detail::win32::duplicate_handle(generations[0].semaphore);
- }
+
+ private:
+ void operator=(entry_manager&);
+ entry_manager(entry_manager&);
+ };
+
 
         protected:
             template<typename lock_type>
             bool do_wait(lock_type& lock,timeout wait_until)
             {
- detail::win32::handle_manager local_wake_sem;
- detail::win32::handle_manager sem;
- bool woken=false;
-
                 relocker<lock_type> locker(lock);
-
+
+ entry_manager entry=get_wait_entry();
+
+ locker.unlock();
+
+ bool woken=false;
                 while(!woken)
                 {
- start_wait_loop(locker,local_wake_sem,sem);
-
- if(!this_thread::interruptible_wait(sem,wait_until))
+ if(!this_thread::interruptible_wait(entry->semaphore,wait_until))
                     {
                         return false;
                     }
                 
- unsigned long const woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0);
+ unsigned long const woken_result=detail::win32::WaitForSingleObject(entry->wake_sem,0);
                     BOOST_ASSERT(woken_result==detail::win32::timeout || woken_result==0);
 
                     woken=(woken_result==0);
@@ -214,21 +189,19 @@
         
             basic_condition_variable(const basic_condition_variable& other);
             basic_condition_variable& operator=(const basic_condition_variable& other);
+
+ static bool no_waiters(entry_ptr const& entry)
+ {
+ return !detail::interlocked_read_acquire(&entry->waiters);
+ }
         public:
             basic_condition_variable():
                 total_count(0),active_generation_count(0),wake_sem(0)
             {}
             
             ~basic_condition_variable()
- {
- for(unsigned i=0;i<generation_count;++i)
- {
- dispose_entry(generations[i]);
- }
- detail::win32::CloseHandle(wake_sem);
- }
+ {}
 
-
             void notify_one()
             {
                 if(detail::interlocked_read_acquire(&total_count))
@@ -239,33 +212,14 @@
                         return;
                     }
                     wake_waiters(1);
-
- unsigned waiting_count=0;
-
- for(unsigned generation=active_generation_count;generation!=0;--generation)
- {
- list_entry& entry=generations[generation-1];
- waiting_count+=entry.count;
- if(entry.count)
- {
- entry.notified=true;
- entry.release();
- if(!--entry.count)
- {
- dispose_entry(entry);
- if(generation==active_generation_count)
- {
- --active_generation_count;
- }
- }
- }
- }
- if(waiting_count<=total_count)
+
+ for(generation_list::iterator it=generations.begin(),
+ end=generations.end();
+ it!=end;++it)
                     {
- shift_generations_down();
- ensure_generation_present();
- generations[0].release();
+ (*it)->release(1);
                     }
+ generations.erase(std::remove_if(generations.begin(),generations.end(),no_waiters),generations.end());
                 }
             }
         
@@ -274,24 +228,18 @@
                 if(detail::interlocked_read_acquire(&total_count))
                 {
                     boost::mutex::scoped_lock internal_lock(internal_mutex);
- long waiting_count=total_count;
-
- wake_waiters(total_count);
- for(unsigned generation=active_generation_count;generation!=0;--generation)
+ if(!total_count)
                     {
- list_entry& entry=generations[generation-1];
- if(entry.count)
- {
- waiting_count-=entry.count;
- broadcast_entry(entry);
- }
+ return;
                     }
- if(waiting_count)
+ wake_waiters(total_count);
+ for(generation_list::iterator it=generations.begin(),
+ end=generations.end();
+ it!=end;++it)
                     {
- ensure_generation_present();
- generations[0].release(waiting_count);
+ (*it)->release(detail::interlocked_read_acquire(&(*it)->waiters));
                     }
- active_generation_count=0;
+ wake_sem=detail::win32::handle(0);
                 }
             }
         


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk