Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r56984 - in sandbox/task: boost/task boost/task/detail libs/task/src libs/task/test
From: oliver.kowalke_at_[hidden]
Date: 2009-10-18 14:26:14


Author: olli
Date: 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
New Revision: 56984
URL: http://svn.boost.org/trac/boost/changeset/56984

Log:
- replaced semaphore by manual-reset-event for pool-shutdown
- pool_base derieves from UMS

Text files modified:
   sandbox/task/boost/task/detail/pool_base.hpp | 31 ++++++++-----
   sandbox/task/boost/task/detail/worker.hpp | 86 ++++++---------------------------------
   sandbox/task/boost/task/detail/worker_group.hpp | 6 -
   sandbox/task/boost/task/spin_auto_reset_event.hpp | 4 +
   sandbox/task/boost/task/spin_manual_reset_event.hpp | 2
   sandbox/task/libs/task/src/spin_auto_reset_event.cpp | 9 ++++
   sandbox/task/libs/task/src/spin_manual_reset_event.cpp | 17 +++++++
   sandbox/task/libs/task/src/worker.cpp | 8 ---
   sandbox/task/libs/task/src/worker_group.cpp | 14 ------
   sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp | 12 ++++
   sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp | 17 ++++++
   11 files changed, 90 insertions(+), 116 deletions(-)

Modified: sandbox/task/boost/task/detail/pool_base.hpp
==============================================================================
--- sandbox/task/boost/task/detail/pool_base.hpp (original)
+++ sandbox/task/boost/task/detail/pool_base.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -29,6 +29,7 @@
 #include <boost/task/handle.hpp>
 #include <boost/task/poolsize.hpp>
 #include <boost/task/scanns.hpp>
+#include <boost/task/spin_manual_reset_event.hpp>
 #include <boost/task/stacksize.hpp>
 #include <boost/task/task.hpp>
 #include <boost/task/watermark.hpp>
@@ -43,23 +44,24 @@
         typename Queue,
         typename UMS
>
-class pool_base
+class pool_base : public UMS
 {
 private:
         friend class worker;
 
- template< typename T, typename X, typename Z >
+ template< typename T, typename Z >
         friend class worker_object;
 
         typedef Queue queue_type;
         typedef typename queue_type::value_type value_type;
 
- UMS ums_;
         worker_group wg_;
         shared_mutex mtx_wg_;
         volatile uint32_t state_;
         queue_type queue_;
         volatile uint32_t idle_worker_;
+ spin_manual_reset_event shtdwn_ev_;
+ spin_manual_reset_event shtdwn_now_ev_;
 
         void worker_entry_()
         {
@@ -81,7 +83,6 @@
                 wg_.insert(
                         worker(
                                 * this,
- ums_,
                                 psize,
                                 asleep,
                                 max_scns,
@@ -108,7 +109,6 @@
                 wg_.insert(
                         worker(
                                 * this,
- ums_,
                                 psize,
                                 asleep,
                                 max_scns,
@@ -135,12 +135,13 @@
                         posix_time::time_duration const& asleep,
                         scanns const& max_scns,
                         stacksize const& stack_size) :
- ums_(),
                 wg_(),
                 mtx_wg_(),
                 state_( 0),
                 queue_(),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
         {
                 if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration();
@@ -160,7 +161,9 @@
                 mtx_wg_(),
                 state_( 0),
                 queue_( hwm, lwm),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
         {
                 if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration();
@@ -178,7 +181,9 @@
                 mtx_wg_(),
                 state_( 0),
                 queue_(),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
         {
                 if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration();
@@ -199,7 +204,9 @@
                 mtx_wg_(),
                 state_( 0),
                 queue_( hwm, lwm),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
         {
                 if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration();
@@ -228,7 +235,7 @@
 
                 queue_.deactivate();
                 shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.signal_shutdown_all();
+ shtdwn_ev_.set();
                 wg_.join_all();
         }
 
@@ -238,7 +245,7 @@
 
                 queue_.deactivate();
                 shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.signal_shutdown_now_all();
+ shtdwn_now_ev_.set();
                 wg_.interrupt_all();
                 wg_.join_all();
         }

Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -52,10 +52,6 @@
 
         virtual bool try_steal( callable &) = 0;
 
- virtual void signal_shutdown() = 0;
-
- virtual void signal_shutdown_now() = 0;
-
         virtual void run() = 0;
 
         virtual void reschedule_until( function< bool() > const&) = 0;
@@ -65,7 +61,6 @@
 
 template<
         typename Pool,
- typename UMS,
         typename Worker
>
 class worker_object : public worker_base,
@@ -93,12 +88,9 @@
         typedef shared_ptr< thread > thread_t;
 
         Pool & pool_;
- UMS & ums_;
         thread_t thrd_;
         fiber::sptr_t fib_;
         wsq wsq_;
- semaphore shtdwn_sem_;
- semaphore shtdwn_now_sem_;
         bool shtdwn_;
         posix_time::time_duration asleep_;
         scanns max_scns_;
@@ -133,7 +125,6 @@
                 std::size_t idx( rnd_idx_() );
                 for ( std::size_t j( 0); j < pool_.wg_.size(); ++j)
                 {
- // TODO: not thread-safe -> segfault
                         Worker other( pool_.wg_[idx]);
                         if ( this_thread::get_id() == other.get_id() ) continue;
                         if ( ++idx >= pool_.wg_.size() ) idx = 0;
@@ -145,13 +136,13 @@
 
         bool try_blocked_fibers_()
         {
- if ( ums_.has_blocked() )
+ if ( pool_.has_blocked() )
                 {
                         fiber::sptr_t this_fib = fib_;
                         BOOST_ASSERT( this_fib);
- ums_.put_runnable( this_fib);
+ pool_.put_runnable( this_fib);
                         fiber::sptr_t blocked_fib;
- ums_.try_take_blocked( blocked_fib);
+ pool_.try_take_blocked( blocked_fib);
                         BOOST_ASSERT( blocked_fib);
                         fib_ = blocked_fib;
                         BOOST_ASSERT( this_fib->running() );
@@ -163,43 +154,6 @@
                 return false;
         }
 
- void process_( bool all)
- {
- try_blocked_fibers_();
- callable ca;
- if ( all ? try_take_local_callable_( ca) ||
- try_steal_other_callable_( ca) ||
- try_take_global_callable_( ca)
- : try_take_local_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
- {
- guard grd( pool_.idle_worker_);
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- if ( pool_.size_() > pool_.idle_worker_)
- {
- if ( take_global_callable_( ca, asleep_) )
- execute_( ca);
- }
- else if ( ! ums_.has_blocked() )
- {
- if ( take_global_callable_( ca) )
- execute_( ca);
- }
- else
- this_thread::yield();
- scns_ = 0;
- }
- else
- this_thread::yield();
- }
- }
-
         void run_()
         {
                 while ( ! shutdown_() )
@@ -224,7 +178,7 @@
                                                 if ( take_global_callable_( ca, asleep_) )
                                                         execute_( ca);
                                         }
- else if ( ! ums_.has_blocked() )
+ else if ( ! pool_.has_blocked() )
                                         {
                                                 try
                                                 { this_thread::sleep( asleep_); }
@@ -241,7 +195,7 @@
 
         bool shutdown_()
         {
- if ( shutdown__() && pool_.queue_.empty() && ! ums_.has_blocked() )
+ if ( shutdown__() && pool_.queue_.empty() && ! pool_.has_blocked() )
                         return true;
                 else if ( shutdown_now__() )
                         return true;
@@ -251,29 +205,25 @@
         bool shutdown__()
         {
                 if ( ! shtdwn_)
- shtdwn_ = shtdwn_sem_.try_wait();
+ shtdwn_ = pool_.shtdwn_ev_.try_wait();
                 return shtdwn_;
         }
         
         bool shutdown_now__()
- { return shtdwn_now_sem_.try_wait(); }
+ { return pool_.shtdwn_now_ev_.try_wait(); }
 
 public:
         worker_object(
                         Pool & pool,
- UMS & ums,
                         poolsize const& psize,
                         posix_time::time_duration const& asleep,
                         scanns const& max_scns,
                         stacksize const& stack_size,
                         function< void() > const& fn) :
                 pool_( pool),
- ums_( ums),
                 thrd_( new thread( fn) ),
                 fib_(),
                 wsq_(),
- shtdwn_sem_( 0),
- shtdwn_now_sem_( 0),
                 shtdwn_( false),
                 asleep_( asleep),
                 max_scns_( max_scns),
@@ -292,12 +242,6 @@
         interrupt() const
         { thrd_->interrupt(); }
 
- void signal_shutdown()
- { shtdwn_sem_.post(); }
-
- void signal_shutdown_now()
- { shtdwn_now_sem_.post(); }
-
         void put( callable const& ca)
         {
                 BOOST_ASSERT( ! ca.empty() );
@@ -313,7 +257,7 @@
 
                 fiber::convert_thread_to_fiber();
 
- ums_.attach();
+ pool_.attach();
 
                 fib_ = fiber::create(
                         bind( & worker_object::run_, this),
@@ -331,10 +275,10 @@
         void block()
         {
                 fiber::sptr_t this_fib = fib_;
- ums_.put_blocked( this_fib);
+ pool_.put_blocked( this_fib);
                 fiber::sptr_t runnable_fib;
- if ( ums_.has_runnable() )
- ums_.try_take_runnable( runnable_fib);
+ if ( pool_.has_runnable() )
+ pool_.try_take_runnable( runnable_fib);
                 else
                         runnable_fib = fiber::create(
                                 bind( & worker_object::run_, this),
@@ -356,19 +300,17 @@
         shared_ptr< worker_base > impl_;
 
 public:
- template< typename Pool, typename UMS >
+ template< typename Pool >
         worker(
                         Pool & pool,
- UMS & ums,
                         poolsize const& psize,
                         posix_time::time_duration const& asleep,
                         scanns const& max_scns,
                         stacksize const& stack_size,
                         function< void() > const& fn) :
                 impl_(
- new worker_object< Pool, UMS, worker >(
+ new worker_object< Pool, worker >(
                                 pool,
- ums,
                                 psize,
                                 asleep,
                                 max_scns,
@@ -380,8 +322,6 @@
 
         void join() const;
         void interrupt() const;
- void signal_shutdown();
- void signal_shutdown_now();
 
         void put( callable const&);
         bool try_steal( callable &);

Modified: sandbox/task/boost/task/detail/worker_group.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker_group.hpp (original)
+++ sandbox/task/boost/task/detail/worker_group.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -17,6 +17,7 @@
 
 #include <boost/task/detail/config.hpp>
 #include <boost/task/detail/worker.hpp>
+#include <boost/task/spin_manual_reset_event.hpp>
 
 #include <boost/config/abi_prefix.hpp>
 
@@ -57,6 +58,7 @@
         id_idx & id_idx_;
         rnd_idx & rnd_idx_;
 
+
 public:
         typedef id_idx::iterator iterator;
         typedef id_idx::const_iterator const_iterator;
@@ -86,10 +88,6 @@
         void join_all();
 
         void interrupt_all();
-
- void signal_shutdown_all();
-
- void signal_shutdown_now_all();
 };
 
 }}}

Modified: sandbox/task/boost/task/spin_auto_reset_event.hpp
==============================================================================
--- sandbox/task/boost/task/spin_auto_reset_event.hpp (original)
+++ sandbox/task/boost/task/spin_auto_reset_event.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -26,7 +26,7 @@
         volatile uint32_t state_;
 
 public:
- spin_auto_reset_event( bool = false);
+ explicit spin_auto_reset_event( bool = false);
 
         void set();
 
@@ -37,6 +37,8 @@
         template< typename TimeDuration >
         bool wait( TimeDuration const& rel_time)
         { return wait( get_system_time() + rel_time); }
+
+ bool try_wait();
 };
 
 }}

Modified: sandbox/task/boost/task/spin_manual_reset_event.hpp
==============================================================================
--- sandbox/task/boost/task/spin_manual_reset_event.hpp (original)
+++ sandbox/task/boost/task/spin_manual_reset_event.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -43,6 +43,8 @@
         template< typename TimeDuration >
         bool wait( TimeDuration const& rel_time)
         { return wait( get_system_time() + rel_time); }
+
+ bool try_wait();
 };
 
 }}

Modified: sandbox/task/libs/task/src/spin_auto_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/src/spin_auto_reset_event.cpp (original)
+++ sandbox/task/libs/task/src/spin_auto_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -67,4 +67,13 @@
         return true;
 }
 
+bool
+spin_auto_reset_event::try_wait()
+{
+ uint32_t expected = static_cast< uint32_t >( SET);
+ return detail::atomic_compare_exchange_strong(
+ & state_, & expected,
+ static_cast< uint32_t >( RESET) );
+}
+
 }}

Modified: sandbox/task/libs/task/src/spin_manual_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/src/spin_manual_reset_event.cpp (original)
+++ sandbox/task/libs/task/src/spin_manual_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -91,4 +91,21 @@
         return true;
 }
 
+bool
+spin_manual_reset_event::try_wait()
+{
+ {
+ spin_lock< spin_mutex > lk( enter_mtx_);
+ BOOST_ASSERT( lk);
+ detail::atomic_fetch_add( & waiters_, 1);
+ }
+
+ bool result = static_cast< uint32_t >( SET) == detail::atomic_load( & state_);
+
+ if ( 1 == detail::atomic_fetch_sub( & waiters_, 1) )
+ enter_mtx_.unlock();
+
+ return result;
+}
+
 }}

Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp (original)
+++ sandbox/task/libs/task/src/worker.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -25,14 +25,6 @@
 { impl_->interrupt(); }
 
 void
-worker::signal_shutdown()
-{ impl_->signal_shutdown(); }
-
-void
-worker::signal_shutdown_now()
-{ impl_->signal_shutdown_now(); }
-
-void
 worker::put( callable const& ca)
 { impl_->put( ca); }
 

Modified: sandbox/task/libs/task/src/worker_group.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker_group.cpp (original)
+++ sandbox/task/libs/task/src/worker_group.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -85,18 +85,4 @@
         { w.interrupt(); }
 }
 
-void
-worker_group::signal_shutdown_all()
-{
- BOOST_FOREACH( worker w, cont_)
- { w.signal_shutdown(); }
-}
-
-void
-worker_group::signal_shutdown_now_all()
-{
- BOOST_FOREACH( worker w, cont_)
- { w.signal_shutdown_now(); }
-}
-
 }}}

Modified: sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp (original)
+++ sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -23,8 +23,6 @@
 namespace pt = boost::posix_time;
 namespace tsk = boost::task;
 
-namespace {
-
 uint32_t wait_fn( uint32_t n, tsk::spin_auto_reset_event & ev)
 {
         ev.wait();
@@ -186,6 +184,16 @@
         }
 }
 
+void test_case_4()
+{
+ tsk::spin_auto_reset_event ev;
+
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
+
+ ev.set();
+
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
 }
 
 boost::unit_test::test_suite * init_unit_test_suite( int, char* [])

Modified: sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp (original)
+++ sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -23,8 +23,6 @@
 namespace pt = boost::posix_time;
 namespace tsk = boost::task;
 
-namespace {
-
 uint32_t wait_fn( uint32_t n, tsk::spin_manual_reset_event & ev)
 {
         ev.wait();
@@ -205,6 +203,21 @@
         BOOST_CHECK_EQUAL( h4.get(), n);
 }
 
+void test_case_4()
+{
+ tsk::spin_manual_reset_event ev;
+
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
+
+ ev.set();
+
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+ ev.wait();
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+
+ ev.reset();
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
 }
 
 boost::unit_test::test_suite * init_unit_test_suite( int, char* [])


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk