|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r56984 - in sandbox/task: boost/task boost/task/detail libs/task/src libs/task/test
From: oliver.kowalke_at_[hidden]
Date: 2009-10-18 14:26:14
Author: olli
Date: 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
New Revision: 56984
URL: http://svn.boost.org/trac/boost/changeset/56984
Log:
- replaced semaphore by manual-reset-event for pool-shutdown
- pool_base derieves from UMS
Text files modified:
sandbox/task/boost/task/detail/pool_base.hpp | 31 ++++++++-----
sandbox/task/boost/task/detail/worker.hpp | 86 ++++++---------------------------------
sandbox/task/boost/task/detail/worker_group.hpp | 6 -
sandbox/task/boost/task/spin_auto_reset_event.hpp | 4 +
sandbox/task/boost/task/spin_manual_reset_event.hpp | 2
sandbox/task/libs/task/src/spin_auto_reset_event.cpp | 9 ++++
sandbox/task/libs/task/src/spin_manual_reset_event.cpp | 17 +++++++
sandbox/task/libs/task/src/worker.cpp | 8 ---
sandbox/task/libs/task/src/worker_group.cpp | 14 ------
sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp | 12 ++++
sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp | 17 ++++++
11 files changed, 90 insertions(+), 116 deletions(-)
Modified: sandbox/task/boost/task/detail/pool_base.hpp
==============================================================================
--- sandbox/task/boost/task/detail/pool_base.hpp (original)
+++ sandbox/task/boost/task/detail/pool_base.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -29,6 +29,7 @@
#include <boost/task/handle.hpp>
#include <boost/task/poolsize.hpp>
#include <boost/task/scanns.hpp>
+#include <boost/task/spin_manual_reset_event.hpp>
#include <boost/task/stacksize.hpp>
#include <boost/task/task.hpp>
#include <boost/task/watermark.hpp>
@@ -43,23 +44,24 @@
typename Queue,
typename UMS
>
-class pool_base
+class pool_base : public UMS
{
private:
friend class worker;
- template< typename T, typename X, typename Z >
+ template< typename T, typename Z >
friend class worker_object;
typedef Queue queue_type;
typedef typename queue_type::value_type value_type;
- UMS ums_;
worker_group wg_;
shared_mutex mtx_wg_;
volatile uint32_t state_;
queue_type queue_;
volatile uint32_t idle_worker_;
+ spin_manual_reset_event shtdwn_ev_;
+ spin_manual_reset_event shtdwn_now_ev_;
void worker_entry_()
{
@@ -81,7 +83,6 @@
wg_.insert(
worker(
* this,
- ums_,
psize,
asleep,
max_scns,
@@ -108,7 +109,6 @@
wg_.insert(
worker(
* this,
- ums_,
psize,
asleep,
max_scns,
@@ -135,12 +135,13 @@
posix_time::time_duration const& asleep,
scanns const& max_scns,
stacksize const& stack_size) :
- ums_(),
wg_(),
mtx_wg_(),
state_( 0),
queue_(),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
{
if ( asleep.is_special() || asleep.is_negative() )
throw invalid_timeduration();
@@ -160,7 +161,9 @@
mtx_wg_(),
state_( 0),
queue_( hwm, lwm),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
{
if ( asleep.is_special() || asleep.is_negative() )
throw invalid_timeduration();
@@ -178,7 +181,9 @@
mtx_wg_(),
state_( 0),
queue_(),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
{
if ( asleep.is_special() || asleep.is_negative() )
throw invalid_timeduration();
@@ -199,7 +204,9 @@
mtx_wg_(),
state_( 0),
queue_( hwm, lwm),
- idle_worker_( 0)
+ idle_worker_( 0),
+ shtdwn_ev_(),
+ shtdwn_now_ev_()
{
if ( asleep.is_special() || asleep.is_negative() )
throw invalid_timeduration();
@@ -228,7 +235,7 @@
queue_.deactivate();
shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.signal_shutdown_all();
+ shtdwn_ev_.set();
wg_.join_all();
}
@@ -238,7 +245,7 @@
queue_.deactivate();
shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.signal_shutdown_now_all();
+ shtdwn_now_ev_.set();
wg_.interrupt_all();
wg_.join_all();
}
Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -52,10 +52,6 @@
virtual bool try_steal( callable &) = 0;
- virtual void signal_shutdown() = 0;
-
- virtual void signal_shutdown_now() = 0;
-
virtual void run() = 0;
virtual void reschedule_until( function< bool() > const&) = 0;
@@ -65,7 +61,6 @@
template<
typename Pool,
- typename UMS,
typename Worker
>
class worker_object : public worker_base,
@@ -93,12 +88,9 @@
typedef shared_ptr< thread > thread_t;
Pool & pool_;
- UMS & ums_;
thread_t thrd_;
fiber::sptr_t fib_;
wsq wsq_;
- semaphore shtdwn_sem_;
- semaphore shtdwn_now_sem_;
bool shtdwn_;
posix_time::time_duration asleep_;
scanns max_scns_;
@@ -133,7 +125,6 @@
std::size_t idx( rnd_idx_() );
for ( std::size_t j( 0); j < pool_.wg_.size(); ++j)
{
- // TODO: not thread-safe -> segfault
Worker other( pool_.wg_[idx]);
if ( this_thread::get_id() == other.get_id() ) continue;
if ( ++idx >= pool_.wg_.size() ) idx = 0;
@@ -145,13 +136,13 @@
bool try_blocked_fibers_()
{
- if ( ums_.has_blocked() )
+ if ( pool_.has_blocked() )
{
fiber::sptr_t this_fib = fib_;
BOOST_ASSERT( this_fib);
- ums_.put_runnable( this_fib);
+ pool_.put_runnable( this_fib);
fiber::sptr_t blocked_fib;
- ums_.try_take_blocked( blocked_fib);
+ pool_.try_take_blocked( blocked_fib);
BOOST_ASSERT( blocked_fib);
fib_ = blocked_fib;
BOOST_ASSERT( this_fib->running() );
@@ -163,43 +154,6 @@
return false;
}
- void process_( bool all)
- {
- try_blocked_fibers_();
- callable ca;
- if ( all ? try_take_local_callable_( ca) ||
- try_steal_other_callable_( ca) ||
- try_take_global_callable_( ca)
- : try_take_local_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
- {
- guard grd( pool_.idle_worker_);
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- if ( pool_.size_() > pool_.idle_worker_)
- {
- if ( take_global_callable_( ca, asleep_) )
- execute_( ca);
- }
- else if ( ! ums_.has_blocked() )
- {
- if ( take_global_callable_( ca) )
- execute_( ca);
- }
- else
- this_thread::yield();
- scns_ = 0;
- }
- else
- this_thread::yield();
- }
- }
-
void run_()
{
while ( ! shutdown_() )
@@ -224,7 +178,7 @@
if ( take_global_callable_( ca, asleep_) )
execute_( ca);
}
- else if ( ! ums_.has_blocked() )
+ else if ( ! pool_.has_blocked() )
{
try
{ this_thread::sleep( asleep_); }
@@ -241,7 +195,7 @@
bool shutdown_()
{
- if ( shutdown__() && pool_.queue_.empty() && ! ums_.has_blocked() )
+ if ( shutdown__() && pool_.queue_.empty() && ! pool_.has_blocked() )
return true;
else if ( shutdown_now__() )
return true;
@@ -251,29 +205,25 @@
bool shutdown__()
{
if ( ! shtdwn_)
- shtdwn_ = shtdwn_sem_.try_wait();
+ shtdwn_ = pool_.shtdwn_ev_.try_wait();
return shtdwn_;
}
bool shutdown_now__()
- { return shtdwn_now_sem_.try_wait(); }
+ { return pool_.shtdwn_now_ev_.try_wait(); }
public:
worker_object(
Pool & pool,
- UMS & ums,
poolsize const& psize,
posix_time::time_duration const& asleep,
scanns const& max_scns,
stacksize const& stack_size,
function< void() > const& fn) :
pool_( pool),
- ums_( ums),
thrd_( new thread( fn) ),
fib_(),
wsq_(),
- shtdwn_sem_( 0),
- shtdwn_now_sem_( 0),
shtdwn_( false),
asleep_( asleep),
max_scns_( max_scns),
@@ -292,12 +242,6 @@
interrupt() const
{ thrd_->interrupt(); }
- void signal_shutdown()
- { shtdwn_sem_.post(); }
-
- void signal_shutdown_now()
- { shtdwn_now_sem_.post(); }
-
void put( callable const& ca)
{
BOOST_ASSERT( ! ca.empty() );
@@ -313,7 +257,7 @@
fiber::convert_thread_to_fiber();
- ums_.attach();
+ pool_.attach();
fib_ = fiber::create(
bind( & worker_object::run_, this),
@@ -331,10 +275,10 @@
void block()
{
fiber::sptr_t this_fib = fib_;
- ums_.put_blocked( this_fib);
+ pool_.put_blocked( this_fib);
fiber::sptr_t runnable_fib;
- if ( ums_.has_runnable() )
- ums_.try_take_runnable( runnable_fib);
+ if ( pool_.has_runnable() )
+ pool_.try_take_runnable( runnable_fib);
else
runnable_fib = fiber::create(
bind( & worker_object::run_, this),
@@ -356,19 +300,17 @@
shared_ptr< worker_base > impl_;
public:
- template< typename Pool, typename UMS >
+ template< typename Pool >
worker(
Pool & pool,
- UMS & ums,
poolsize const& psize,
posix_time::time_duration const& asleep,
scanns const& max_scns,
stacksize const& stack_size,
function< void() > const& fn) :
impl_(
- new worker_object< Pool, UMS, worker >(
+ new worker_object< Pool, worker >(
pool,
- ums,
psize,
asleep,
max_scns,
@@ -380,8 +322,6 @@
void join() const;
void interrupt() const;
- void signal_shutdown();
- void signal_shutdown_now();
void put( callable const&);
bool try_steal( callable &);
Modified: sandbox/task/boost/task/detail/worker_group.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker_group.hpp (original)
+++ sandbox/task/boost/task/detail/worker_group.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -17,6 +17,7 @@
#include <boost/task/detail/config.hpp>
#include <boost/task/detail/worker.hpp>
+#include <boost/task/spin_manual_reset_event.hpp>
#include <boost/config/abi_prefix.hpp>
@@ -57,6 +58,7 @@
id_idx & id_idx_;
rnd_idx & rnd_idx_;
+
public:
typedef id_idx::iterator iterator;
typedef id_idx::const_iterator const_iterator;
@@ -86,10 +88,6 @@
void join_all();
void interrupt_all();
-
- void signal_shutdown_all();
-
- void signal_shutdown_now_all();
};
}}}
Modified: sandbox/task/boost/task/spin_auto_reset_event.hpp
==============================================================================
--- sandbox/task/boost/task/spin_auto_reset_event.hpp (original)
+++ sandbox/task/boost/task/spin_auto_reset_event.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -26,7 +26,7 @@
volatile uint32_t state_;
public:
- spin_auto_reset_event( bool = false);
+ explicit spin_auto_reset_event( bool = false);
void set();
@@ -37,6 +37,8 @@
template< typename TimeDuration >
bool wait( TimeDuration const& rel_time)
{ return wait( get_system_time() + rel_time); }
+
+ bool try_wait();
};
}}
Modified: sandbox/task/boost/task/spin_manual_reset_event.hpp
==============================================================================
--- sandbox/task/boost/task/spin_manual_reset_event.hpp (original)
+++ sandbox/task/boost/task/spin_manual_reset_event.hpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -43,6 +43,8 @@
template< typename TimeDuration >
bool wait( TimeDuration const& rel_time)
{ return wait( get_system_time() + rel_time); }
+
+ bool try_wait();
};
}}
Modified: sandbox/task/libs/task/src/spin_auto_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/src/spin_auto_reset_event.cpp (original)
+++ sandbox/task/libs/task/src/spin_auto_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -67,4 +67,13 @@
return true;
}
+bool
+spin_auto_reset_event::try_wait()
+{
+ uint32_t expected = static_cast< uint32_t >( SET);
+ return detail::atomic_compare_exchange_strong(
+ & state_, & expected,
+ static_cast< uint32_t >( RESET) );
+}
+
}}
Modified: sandbox/task/libs/task/src/spin_manual_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/src/spin_manual_reset_event.cpp (original)
+++ sandbox/task/libs/task/src/spin_manual_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -91,4 +91,21 @@
return true;
}
+bool
+spin_manual_reset_event::try_wait()
+{
+ {
+ spin_lock< spin_mutex > lk( enter_mtx_);
+ BOOST_ASSERT( lk);
+ detail::atomic_fetch_add( & waiters_, 1);
+ }
+
+ bool result = static_cast< uint32_t >( SET) == detail::atomic_load( & state_);
+
+ if ( 1 == detail::atomic_fetch_sub( & waiters_, 1) )
+ enter_mtx_.unlock();
+
+ return result;
+}
+
}}
Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp (original)
+++ sandbox/task/libs/task/src/worker.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -25,14 +25,6 @@
{ impl_->interrupt(); }
void
-worker::signal_shutdown()
-{ impl_->signal_shutdown(); }
-
-void
-worker::signal_shutdown_now()
-{ impl_->signal_shutdown_now(); }
-
-void
worker::put( callable const& ca)
{ impl_->put( ca); }
Modified: sandbox/task/libs/task/src/worker_group.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker_group.cpp (original)
+++ sandbox/task/libs/task/src/worker_group.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -85,18 +85,4 @@
{ w.interrupt(); }
}
-void
-worker_group::signal_shutdown_all()
-{
- BOOST_FOREACH( worker w, cont_)
- { w.signal_shutdown(); }
-}
-
-void
-worker_group::signal_shutdown_now_all()
-{
- BOOST_FOREACH( worker w, cont_)
- { w.signal_shutdown_now(); }
-}
-
}}}
Modified: sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp (original)
+++ sandbox/task/libs/task/test/test_spin_auto_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -23,8 +23,6 @@
namespace pt = boost::posix_time;
namespace tsk = boost::task;
-namespace {
-
uint32_t wait_fn( uint32_t n, tsk::spin_auto_reset_event & ev)
{
ev.wait();
@@ -186,6 +184,16 @@
}
}
+void test_case_4()
+{
+ tsk::spin_auto_reset_event ev;
+
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
+
+ ev.set();
+
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
}
boost::unit_test::test_suite * init_unit_test_suite( int, char* [])
Modified: sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp
==============================================================================
--- sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp (original)
+++ sandbox/task/libs/task/test/test_spin_manual_reset_event.cpp 2009-10-18 14:26:12 EDT (Sun, 18 Oct 2009)
@@ -23,8 +23,6 @@
namespace pt = boost::posix_time;
namespace tsk = boost::task;
-namespace {
-
uint32_t wait_fn( uint32_t n, tsk::spin_manual_reset_event & ev)
{
ev.wait();
@@ -205,6 +203,21 @@
BOOST_CHECK_EQUAL( h4.get(), n);
}
+void test_case_4()
+{
+ tsk::spin_manual_reset_event ev;
+
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
+
+ ev.set();
+
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+ ev.wait();
+ BOOST_CHECK_EQUAL( true, ev.try_wait() );
+
+ ev.reset();
+ BOOST_CHECK_EQUAL( false, ev.try_wait() );
}
boost::unit_test::test_suite * init_unit_test_suite( int, char* [])
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk