Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r52298 - in sandbox/threadpool: boost/tp boost/tp/detail libs/tp/build libs/tp/src
From: oliver.kowalke_at_[hidden]
Date: 2009-04-09 12:04:40


Author: olli
Date: 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
New Revision: 52298
URL: http://svn.boost.org/trac/boost/changeset/52298

Log:
* restructuring of pool, worker
* introduction of worker_group

Added:
   sandbox/threadpool/boost/tp/detail/worker_group.hpp (contents, props changed)
   sandbox/threadpool/libs/tp/src/worker_group.cpp (contents, props changed)
Text files modified:
   sandbox/threadpool/boost/tp/detail/worker.hpp | 297 +++++++++++++++++++++++++++--------
   sandbox/threadpool/boost/tp/pool.hpp | 330 +++++++++------------------------------
   sandbox/threadpool/libs/tp/build/Jamfile.v2 | 2
   sandbox/threadpool/libs/tp/src/worker.cpp | 157 +-----------------
   4 files changed, 320 insertions(+), 466 deletions(-)

Modified: sandbox/threadpool/boost/tp/detail/worker.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/worker.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/worker.hpp 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -11,13 +11,17 @@
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
 #include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#include <boost/random.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
 #include <boost/utility.hpp>
 
 #include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/guard.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 #include <boost/tp/detail/wsq.hpp>
+#include <boost/tp/poolsize.hpp>
+#include <boost/tp/scanns.hpp>
 
 namespace boost {
 namespace tp {
@@ -28,71 +32,237 @@
 private:
         static thread_specific_ptr< worker > tss_;
 
- class impl : private noncopyable
+ struct impl
         {
- private:
- typedef std::pair< callable, interrupter > item;
- shared_ptr< thread > thrd_;
- wsq< item > wsq_;
- interprocess::interprocess_semaphore shtdwn_sem_;
- interprocess::interprocess_semaphore shtdwn_now_sem_;
- bool shtdwn_;
- std::size_t scns_;
-
- public:
- impl( function< void() > const&);
-
                 virtual ~impl() {}
 
- const shared_ptr< thread > thrd() const;
-
- const thread::id get_id() const;
+ virtual const thread::id get_id() const = 0;
 
- void join() const;
+ virtual void join() const = 0;
 
- void interrupt() const;
+ virtual void interrupt() const = 0;
 
- void put( callable const&, interrupter const&);
+ virtual void put( callable const&, interrupter const&) = 0;
 
- bool try_take( callable &, interrupter &);
+ virtual bool try_take( callable &, interrupter &) = 0;
 
- bool try_steal( callable &, interrupter &);
+ virtual bool try_steal( callable &, interrupter &) = 0;
 
- bool empty();
+ virtual void signal_shutdown() = 0;
 
- void signal_shutdown();
+ virtual void signal_shutdown_now() = 0;
 
- void signal_shutdown_now();
+ virtual void schedule_until( function< bool() > const&) = 0;
 
- bool shutdown();
-
- bool shutdown_now();
-
- std::size_t scanns() const;
-
- void increment_scanns();
-
- void reset_scanns();
-
- virtual void reschedule_until( function< bool() > const&) = 0;
+ virtual void run() = 0;
         };
 
         template< typename Pool >
- class impl_pool : public impl
+ class impl_pool : public impl,
+ private noncopyable
         {
         private:
- Pool & p_;
+ class random_idx
+ {
+ private:
+ rand48 rng_;
+ uniform_int<> six_;
+ variate_generator< rand48 &, uniform_int<> > die_;
+
+ public:
+ random_idx( std::size_t size)
+ :
+ rng_(),
+ six_( 0, size - 1),
+ die_( rng_, six_)
+ {}
+
+ std::size_t operator()()
+ { return die_(); }
+ };
 
- public:
- impl_pool( Pool & p, function< void() > const& fn)
- : impl( fn), p_( p)
- {}
+ typedef std::pair< callable, interrupter > item;
 
- void reschedule_until( function< bool() > const& pred)
- { p_.reschedule_until_( pred); }
+ Pool & pool_;
+ shared_ptr< thread > thrd_;
+ wsq< item > wsq_;
+ interprocess::interprocess_semaphore shtdwn_sem_;
+ interprocess::interprocess_semaphore shtdwn_now_sem_;
+ bool shtdwn_;
+ posix_time::time_duration asleep_;
+ scanns max_scns_;
+ std::size_t scns_;
+ random_idx rnd_idx_;
+
+ void execute_(
+ callable & ca,
+ interrupter & intr)
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ guard grd( get_pool().active_worker_);
+ shared_ptr< void > ig(
+ static_cast< void * >( 0),
+ boost::bind(
+ & interrupter::reset,
+ intr) );
+ intr.set( thrd_);
+ ca();
+ ca.clear();
+ BOOST_ASSERT( ca.empty() );
+ }
+
+ void next_callable_( callable & ca, interrupter & intr)
+ {
+ if ( ! try_take( ca, intr) )
+ {
+ if ( ! get_pool().channel_.try_take( ca, intr) )
+ {
+ std::size_t idx( rnd_idx_() );
+ for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
+ {
+ worker other( get_pool().wg_[idx]);
+ if ( this_thread::get_id() == other.get_id() ) continue;
+ if ( ++idx >= get_pool().wg_.size() ) idx = 0;
+ if ( other.try_steal( ca, intr) ) break;
+ }
+
+ if ( ca.empty() )
+ {
+ guard grd( get_pool().idle_worker_);
+ if ( shutdown_() ) return;
+ ++scns_;
+ if ( scns_ >= max_scns_)
+ {
+ if ( get_pool().size_() == get_pool().idle_worker_)
+ get_pool().channel_.take( ca, intr, asleep_);
+ else
+ this_thread::sleep( asleep_);
+ scns_ = 0;
+ }
+ else
+ this_thread::yield();
+ }
+ }
+ }
+ }
+
+ bool shutdown_()
+ {
+ if ( shutdown__() && get_pool().channel_.empty() )
+ return true;
+ else if ( shutdown_now__() )
+ return true;
+ return false;
+ }
+
+ bool shutdown__()
+ {
+ if ( ! shtdwn_)
+ shtdwn_ = shtdwn_sem_.try_wait();
+ return shtdwn_;
+ }
+
+ bool shutdown_now__()
+ { return shtdwn_now_sem_.try_wait(); }
+
+ public:
+ impl_pool(
+ Pool & pool,
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns,
+ function< void() > const& fn)
+ :
+ pool_( pool),
+ thrd_( new thread( fn) ),
+ wsq_(),
+ shtdwn_sem_( 0),
+ shtdwn_now_sem_( 0),
+ shtdwn_( false),
+ asleep_( asleep),
+ max_scns_( max_scns),
+ scns_( 0),
+ rnd_idx_( psize)
+ { BOOST_ASSERT( ! fn.empty() ); }
+
+ const thread::id get_id() const
+ { return thrd_->get_id(); }
+
+ void join() const
+ { thrd_->join(); }
+
+ void
+ interrupt() const
+ { thrd_->interrupt(); }
+
+ void signal_shutdown()
+ { shtdwn_sem_.post(); }
+
+ void signal_shutdown_now()
+ { shtdwn_now_sem_.post(); }
+
+ void put(
+ callable const& ca,
+ interrupter const& intr)
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ wsq_.put( std::make_pair( ca, intr) );
+ }
+
+ bool try_take(
+ callable & ca,
+ interrupter & intr)
+ {
+ item itm;
+ bool result( wsq_.try_take( itm) );
+ if ( result)
+ {
+ ca = itm.first;
+ intr = itm.second;
+ }
+ return result;
+ }
+
+ bool try_steal(
+ callable & ca,
+ interrupter & intr)
+ {
+ item itm;
+ bool result( wsq_.try_steal( itm) );
+ if ( result)
+ {
+ ca = itm.first;
+ intr = itm.second;
+ }
+ return result;
+ }
 
                 Pool & get_pool() const
- { return p_; }
+ { return pool_; }
+
+ void run()
+ {
+ BOOST_ASSERT( get_id() == this_thread::get_id() );
+
+ schedule_until(
+ bind( & impl_pool::shutdown_, this) );
+ }
+
+ void schedule_until( function< bool() > const& pred)
+ {
+ callable ca;
+ interrupter intr;
+
+ while ( ! pred() )
+ {
+ next_callable_( ca, intr);
+ if( ! ca.empty() )
+ {
+ execute_( ca, intr);
+ scns_ = 0;
+ }
+ }
+ }
         };
 
         shared_ptr< impl > impl_;
@@ -101,40 +271,31 @@
         template< typename Pool >
         worker(
                 Pool & pool,
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns,
                 function< void() > const& fn)
- : impl_( new impl_pool< Pool >( pool, fn) )
+ :
+ impl_(
+ new impl_pool< Pool >(
+ pool,
+ psize,
+ asleep,
+ max_scns,
+ fn) )
         {}
 
- const shared_ptr< thread > thrd() const;
-
         const thread::id get_id() const;
 
         void join() const;
-
         void interrupt() const;
+ void signal_shutdown();
+ void signal_shutdown_now();
 
         void put( callable const&, interrupter const&);
-
         bool try_take( callable &, interrupter &);
-
         bool try_steal( callable &, interrupter &);
 
- bool empty() const;
-
- void signal_shutdown();
-
- void signal_shutdown_now();
-
- bool shutdown();
-
- bool shutdown_now();
-
- std::size_t scanns() const;
-
- void increment_scanns();
-
- void reset_scanns();
-
         void reschedule_until( function< bool() > const&);
 
         template< typename Pool >
@@ -145,9 +306,9 @@
                 return p->get_pool();
         }
 
- static worker * tss_get();
+ void run();
 
- static void tss_reset( worker * w);
+ static worker * tss_get();
 };
 
 } } }

Added: sandbox/threadpool/boost/tp/detail/worker_group.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker_group.hpp 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -0,0 +1,82 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_GROUP_H
+#define BOOST_TP_DETAIL_WORKER_GROUP_H
+
+#include <cstddef>
+
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/mem_fun.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/random_access_index.hpp>
+#include <boost/thread.hpp>
+
+#include <boost/tp/detail/worker.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+class worker_group
+{
+private:
+ struct id_idx_tag {};
+ struct rnd_idx_tag {};
+
+ typedef multi_index::multi_index_container<
+ worker,
+ multi_index::indexed_by<
+ multi_index::ordered_unique<
+ multi_index::tag< id_idx_tag >,
+ multi_index::const_mem_fun<
+ worker,
+ const thread::id,
+ & worker::get_id
+ >
+ >,
+ multi_index::random_access< multi_index::tag< rnd_idx_tag > >
+ >
+ > container;
+
+ typedef container::index< id_idx_tag >::type id_idx;
+ typedef container::index< rnd_idx_tag >::type rnd_idx;
+
+ container cont_;
+ id_idx & id_idx_;
+ rnd_idx & rnd_idx_;
+
+public:
+ typedef id_idx::iterator iterator;
+ typedef id_idx::const_iterator const_iterator;
+
+ worker_group();
+
+ const worker operator[]( std::size_t pos) const;
+
+ std::size_t size() const;
+
+ const iterator begin();
+
+ const const_iterator begin() const;
+
+ const iterator end();
+
+ const const_iterator end() const;
+
+ const const_iterator find( thread::id const& id) const;
+
+ void join_all();
+
+ void interrupt_all();
+
+ void insert( worker const& w);
+
+ void signal_shutdown_all();
+
+ void signal_shutdown_now_all();
+};
+} } }
+
+#endif // BOOST_TP_DETAIL_WORKER_GROUP_H
+

Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -16,12 +16,7 @@
 #include <boost/bind.hpp>
 #include <boost/foreach.hpp>
 #include <boost/function.hpp>
-#include <boost/multi_index_container.hpp>
-#include <boost/multi_index/mem_fun.hpp>
-#include <boost/multi_index/ordered_index.hpp>
-#include <boost/multi_index/random_access_index.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/random.hpp>
 #include <boost/ref.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
@@ -30,12 +25,12 @@
 
 #include <boost/tp/detail/atomic.hpp>
 #include <boost/tp/detail/callable.hpp>
-#include <boost/tp/detail/guard.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
 #include <boost/tp/detail/worker.hpp>
+#include <boost/tp/detail/worker_group.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/poolsize.hpp>
 #include <boost/tp/scanns.hpp>
@@ -59,199 +54,62 @@
         typedef Channel channel;
         typedef typename channel::item channel_item;
 
- enum state
- {
- active_state,
- terminateing_state,
- terminated_state
- };
-
- struct id_idx_tag {};
- struct rnd_idx_tag {};
-
- typedef multi_index::multi_index_container<
- detail::worker,
- multi_index::indexed_by<
- multi_index::ordered_unique<
- multi_index::tag< id_idx_tag >,
- multi_index::const_mem_fun<
- detail::worker,
- const thread::id,
- & detail::worker::get_id
- >
- >,
- multi_index::random_access< multi_index::tag< rnd_idx_tag > >
- >
- > worker_list;
-
- typedef typename worker_list::template index<
- id_idx_tag >::type id_idx;
- typedef typename worker_list::template index<
- rnd_idx_tag >::type rnd_idx;
-
- class random_idx
- {
- private:
- rand48 rng_;
- uniform_int<> six_;
- variate_generator< rand48 &, uniform_int<> > die_;
-
- public:
- random_idx( std::size_t size)
- :
- rng_(),
- six_( 0, size - 1),
- die_( rng_, six_)
- {}
-
- std::size_t operator()()
- { return die_(); }
- };
-
- static thread_specific_ptr< random_idx > tss_rnd_idx_;
 
- worker_list worker_;
- shared_mutex mtx_worker_;
+ detail::worker_group wg_;
+ shared_mutex mtx_wg_;
         unsigned int state_;
         channel channel_;
- posix_time::time_duration asleep_;
- scanns scns_;
         unsigned int active_worker_;
         unsigned int idle_worker_;
- unsigned int running_worker_;
-
- void execute_(
- detail::callable & ca,
- detail::interrupter & intr,
- shared_ptr< thread > const& thrd)
- {
- BOOST_ASSERT( ! ca.empty() );
- detail::guard grd( active_worker_);
- shared_ptr< void > ig(
- static_cast< void * >( 0),
- boost::bind(
- & detail::interrupter::reset,
- intr) );
- intr.set( thrd);
- ca();
- ca.clear();
- BOOST_ASSERT( ca.empty() );
- }
-
- void next_callable_( detail::worker & w, detail::callable & ca, detail::interrupter & intr)
- {
- rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
- if ( ! w.try_take( ca, intr) )
- {
- if ( ! channel_.try_take( ca, intr) )
- {
- std::size_t idx( ( * tss_rnd_idx_)() );
- for ( std::size_t j( 0); j < worker_.size(); ++j)
- {
- detail::worker other( ridx[idx]);
- if ( this_thread::get_id() == other.get_id() ) continue;
- if ( ++idx >= worker_.size() ) idx = 0;
- if ( other.try_steal( ca, intr) ) break;
- }
-
- if ( ca.empty() )
- {
- detail::guard grd( idle_worker_);
- if ( shutdown_( w) ) return;
- w.increment_scanns();
- if ( w.scanns() >= scns_)
- {
- if ( size_() == idle_worker_)
- channel_.take( ca, intr, asleep_);
- else
- this_thread::sleep( asleep_);
- w.reset_scanns();
- }
- else
- this_thread::yield();
- }
- }
- }
- }
 
- void reschedule_until_( function< bool() > const& pred)
+ void worker_entry_()
         {
- detail::worker * w( detail::worker::tss_get() );
- BOOST_ASSERT( w);
- BOOST_ASSERT( w->get_id() == this_thread::get_id() );
- shared_ptr< thread > thrd( w->thrd() );
- BOOST_ASSERT( thrd);
- detail::interrupter intr;
- detail::callable ca;
- while ( ! pred() )
- {
- next_callable_( * w, ca, intr);
- if( ! ca.empty() )
- {
- execute_( ca, intr, thrd);
- w->reset_scanns();
- }
- }
- }
-
- void entry_()
- {
- shared_lock< shared_mutex > lk( mtx_worker_);
- id_idx & iidx( worker_.get< id_idx_tag >() );
- typename id_idx::iterator i( iidx.find( this_thread::get_id() ) );
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ typename detail::worker_group::iterator i( wg_.find( this_thread::get_id() ) );
                 lk.unlock();
- BOOST_ASSERT( i != iidx.end() );
- detail::worker::tss_reset( new detail::worker( * i) );
-
- detail::worker * w( detail::worker::tss_get() );
- BOOST_ASSERT( w);
- BOOST_ASSERT( w->get_id() == this_thread::get_id() );
- shared_ptr< thread > thrd( w->thrd() );
- BOOST_ASSERT( thrd);
- detail::callable ca;
- detail::interrupter intr;
-
- tss_rnd_idx_.reset( new random_idx( worker_.size() ) );
+ BOOST_ASSERT( i != wg_.end() );
 
- detail::guard grd( running_worker_);
-
- while ( ! shutdown_( * w) )
- {
- next_callable_( * w, ca, intr);
- if( ! ca.empty() )
- {
- execute_( ca, intr, thrd);
- w->reset_scanns();
- }
- }
+ detail::worker w( * i);
+ w.run();
         }
-
- void create_worker_()
+
+ void create_worker_(
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns)
         {
- BOOST_ASSERT( ! terminateing_() && ! terminated_() );
- worker_.insert(
+ wg_.insert(
                         detail::worker(
                                 * this,
+ psize,
+ asleep,
+ max_scns,
                                 boost::bind(
- & pool::entry_,
+ & pool::worker_entry_,
                                         this) ) );
         }
 
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
- void entry_( std::size_t n)
+ void worker_entry_( std::size_t n)
         {
                 this_thread::bind_to_processor( n);
- entry_();
+ worker_entry_();
         }
 
- void create_worker_( std::size_t n)
+ void create_worker_(
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns,
+ std::size_t n)
         {
- BOOST_ASSERT( ! terminateing_() && ! terminated_() );
- worker_.insert(
+ wg_.insert(
                         detail::worker(
                                 * this,
+ psize,
+ asleep,
+ max_scns,
                                 boost::bind(
- & pool::entry_,
+ & pool::worker_entry_,
                                         this,
                                         n) ) );
         }
@@ -264,22 +122,7 @@
         { return size_() - active_(); }
 
         std::size_t size_() const
- { return worker_.size(); }
-
- bool terminated_() const
- { return state_ == terminated_state; }
-
- bool terminateing_() const
- { return state_ == terminateing_state; }
-
- bool shutdown_( detail::worker & w)
- {
- if ( w.shutdown() && channel_.empty() )
- return true;
- else if ( w.shutdown_now() )
- return true;
- return false;
- }
+ { return wg_.size(); }
 
         bool closed_() const
         { return state_ > 0; }
@@ -291,24 +134,21 @@
         explicit pool(
                 poolsize const& psize,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 10),
- scanns const& scns = scanns( 20) )
+ scanns const& max_scns = scanns( 20) )
         :
- worker_(),
- mtx_worker_(),
+ wg_(),
+ mtx_wg_(),
         state_( 0),
         channel_(),
- asleep_( asleep),
- scns_( scns),
         active_worker_( 0),
- idle_worker_( 0),
- running_worker_( 0)
+ idle_worker_( 0)
         {
- if ( asleep_.is_special() || asleep_.is_negative() )
+ if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 channel_.activate();
- unique_lock< shared_mutex > lk( mtx_worker_);
+ unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
- create_worker_();
+ create_worker_( psize, asleep, max_scns);
                 lk.unlock();
         }
 
@@ -317,52 +157,46 @@
                 high_watermark const& hwm,
                 low_watermark const& lwm,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 100),
- scanns const& scns = scanns( 20) )
+ scanns const& max_scns = scanns( 20) )
         :
- worker_(),
- mtx_worker_(),
+ wg_(),
+ mtx_wg_(),
         state_( 0),
         channel_(
                 hwm,
                 lwm),
- asleep_( asleep),
- scns_( scns),
         active_worker_( 0),
- idle_worker_( 0),
- running_worker_( 0)
+ idle_worker_( 0)
         {
- if ( asleep_.is_special() || asleep_.is_negative() )
+ if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 channel_.activate();
- unique_lock< shared_mutex > lk( mtx_worker_);
+ unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
- create_worker_();
+ create_worker_( psize, asleep, max_scns);
                 lk.unlock();
         }
 
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
         explicit pool(
                 posix_time::time_duration const& asleep = posix_time::microseconds( 10),
- scanns const& scns = scanns( 20) )
+ scanns const& max_scns = scanns( 20) )
         :
- worker_(),
- mtx_worker_(),
+ wg_(),
+ mtx_wg_(),
         state_( 0),
         channel_(),
- asleep_( asleep),
- scns_( scns),
         active_worker_( 0),
- idle_worker_( 0),
- running_worker_( 0)
+ idle_worker_( 0)
         {
- if ( asleep_.is_special() || asleep_.is_negative() )
+ if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 std::size_t psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
- unique_lock< shared_mutex > lk( mtx_worker_);
+ unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( i);
+ create_worker_( psize, asleep, max_scns, i);
                 lk.unlock();
         }
 
@@ -370,28 +204,25 @@
                 high_watermark const& hwm,
                 low_watermark const& lwm,
                 posix_time::time_duration const& asleep = posix_time::microseconds( 100),
- scanns const& scns = scanns( 20) )
+ scanns const& max_scns = scanns( 20) )
         :
- worker_(),
- mtx_worker_(),
+ wg_(),
+ mtx_wg_(),
         state_( 0),
         channel_(
                 hwm,
                 lwm),
- asleep_( asleep),
- scns_( scns),
         active_worker_( 0),
- idle_worker_( 0),
- running_worker_( 0)
+ idle_worker_( 0)
         {
- if ( asleep_.is_special() || asleep_.is_negative() )
+ if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
                 std::size_t psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
- unique_lock< shared_mutex > lk( mtx_worker_);
+ unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( i);
+ create_worker_( psize, asleep, max_scns, i);
                 lk.unlock();
         }
 #endif
@@ -401,26 +232,24 @@
 
         std::size_t active()
         {
- shared_lock< shared_mutex > lk( mtx_worker_);
+ shared_lock< shared_mutex > lk( mtx_wg_);
                 return active_();
         }
 
         std::size_t idle()
         {
- shared_lock< shared_mutex > lk( mtx_worker_);
+ shared_lock< shared_mutex > lk( mtx_wg_);
                 return idle_();
         }
-
+
         void shutdown()
         {
                 if ( closed_() || close_() > 1) return;
 
                 channel_.deactivate();
- shared_lock< shared_mutex > lk( mtx_worker_);
- BOOST_FOREACH( detail::worker w, worker_)
- { w.signal_shutdown(); }
- BOOST_FOREACH( detail::worker w, worker_)
- { w.join(); }
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ wg_.signal_shutdown_all();
+ wg_.join_all();
                 lk.unlock();
         }
 
@@ -430,14 +259,10 @@
                         return std::vector< detail::callable >();
 
                 channel_.deactivate_now();
- shared_lock< shared_mutex > lk( mtx_worker_);
- BOOST_FOREACH( detail::worker w, worker_)
- {
- w.signal_shutdown_now();
- w.interrupt();
- }
- BOOST_FOREACH( detail::worker w, worker_)
- { w.join(); }
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ wg_.signal_shutdown_now_all();
+ wg_.interrupt_all();
+ wg_.join_all();
                 lk.unlock();
                 std::vector< detail::callable > drain( channel_.drain() );
 
@@ -446,7 +271,7 @@
 
         std::size_t size()
         {
- shared_lock< shared_mutex > lk( mtx_worker_);
+ shared_lock< shared_mutex > lk( mtx_wg_);
                 return size_();
         }
 
@@ -490,8 +315,8 @@
                                         f) );
                         tsk.set_wait_callback(
                                 bind(
- ( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
- this,
+ ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
+ w,
                                         wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
@@ -528,8 +353,8 @@
                                         f) );
                         tsk.set_wait_callback(
                                 bind(
- ( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
- this,
+ ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
+ w,
                                         wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
@@ -545,11 +370,6 @@
                 }
         }
 };
-
-template< typename Channel >
-thread_specific_ptr< typename pool< Channel >::random_idx >
-pool< Channel >::tss_rnd_idx_;
-
 }}
 
 #endif // BOOST_TP_POOL_H

Modified: sandbox/threadpool/libs/tp/build/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/build/Jamfile.v2 (original)
+++ sandbox/threadpool/libs/tp/build/Jamfile.v2 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -14,7 +14,7 @@
       <link>static:<define>BOOST_THREADPOOL_STATIC_LINK=1
     ;
 
-SOURCES = callable default_pool guard interrupter poolsize scanns watermark worker ;
+SOURCES = callable default_pool guard interrupter poolsize scanns watermark worker worker_group ;
 
 lib boost_threadpool
    : $(SOURCES).cpp

Modified: sandbox/threadpool/libs/tp/src/worker.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/worker.cpp (original)
+++ sandbox/threadpool/libs/tp/src/worker.cpp 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -6,111 +6,6 @@
 {
 thread_specific_ptr< worker > worker::tss_;
 
-worker::impl::impl( function< void() > const& fn)
-:
-thrd_( new thread( fn) ),
-wsq_(),
-shtdwn_sem_( 0),
-shtdwn_now_sem_( 0),
-shtdwn_( false),
-scns_( 0)
-{ BOOST_ASSERT( ! fn.empty() ); }
-
-const shared_ptr< thread >
-worker::impl::thrd() const
-{ return thrd_; }
-
-const thread::id
-worker::impl::get_id() const
-{ return thrd_->get_id(); }
-
-void
-worker::impl::join() const
-{ thrd_->join(); }
-
-void
-worker::impl::interrupt() const
-{ thrd_->interrupt(); }
-
-void
-worker::impl::put(
- callable const& ca,
- interrupter const& intr)
-{
- BOOST_ASSERT( ! ca.empty() );
- wsq_.put( std::make_pair( ca, intr) );
-}
-
-bool
-worker::impl::try_take(
- callable & ca,
- interrupter & intr)
-{
- item itm;
- bool result( wsq_.try_take( itm) );
- if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
- return result;
-}
-
-bool
-worker::impl::try_steal(
- callable & ca,
- interrupter & intr)
-{
- item itm;
- bool result( wsq_.try_steal( itm) );
- if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
- return result;
-}
-
-bool
-worker::impl::empty()
-{ return wsq_.empty(); }
-
-void
-worker::impl::signal_shutdown()
-{ shtdwn_sem_.post(); }
-
-void
-worker::impl::signal_shutdown_now()
-{ shtdwn_now_sem_.post(); }
-
-bool
-worker::impl::shutdown()
-{
- if ( ! shtdwn_)
- shtdwn_ = shtdwn_sem_.try_wait();
- return shtdwn_;
-}
-
-bool
-worker::impl::shutdown_now()
-{ return shtdwn_now_sem_.try_wait(); }
-
-std::size_t
-worker::impl::scanns() const
-{ return scns_; }
-
-void
-worker::impl::increment_scanns()
-{ ++scns_; }
-
-void
-worker::impl::reset_scanns()
-{ scns_ = 0; }
-
-const shared_ptr< thread >
-worker::thrd() const
-{ return impl_->thrd(); }
-
 const thread::id
 worker::get_id() const
 { return impl_->get_id(); }
@@ -124,6 +19,14 @@
 { impl_->interrupt(); }
 
 void
+worker::signal_shutdown()
+{ impl_->signal_shutdown(); }
+
+void
+worker::signal_shutdown_now()
+{ impl_->signal_shutdown_now(); }
+
+void
 worker::put(
         callable const& ca,
         interrupter const& intr)
@@ -141,49 +44,19 @@
         interrupter & intr)
 { return impl_->try_steal( ca, intr); }
 
-bool
-worker::empty() const
-{ return impl_->empty(); }
-
-void
-worker::signal_shutdown()
-{ impl_->signal_shutdown(); }
-
 void
-worker::signal_shutdown_now()
-{ impl_->signal_shutdown_now(); }
-
-bool
-worker::shutdown()
-{ return impl_->shutdown(); }
-
-bool
-worker::shutdown_now()
-{ return impl_->shutdown_now(); }
-
-std::size_t
-worker::scanns() const
-{ return impl_->scanns(); }
-
-void
-worker::increment_scanns()
-{ impl_->increment_scanns(); }
-
-void
-worker::reset_scanns()
-{ impl_->reset_scanns(); }
+worker::reschedule_until( function< bool() > const& pred)
+{ return impl_->schedule_until( pred); }
 
 void
-worker::reschedule_until( function< bool() > const& pred)
-{ return impl_->reschedule_until( pred); }
+worker::run()
+{
+ worker::tss_.reset( new worker( * this) );
+ impl_->run();
+}
 
 worker *
 worker::tss_get()
 { return worker::tss_.get(); }
-
-void
-worker::tss_reset( worker * w)
-{ worker::tss_.reset( w); }
-
 } } }
 

Added: sandbox/threadpool/libs/tp/src/worker_group.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/worker_group.cpp 2009-04-09 12:04:39 EDT (Thu, 09 Apr 2009)
@@ -0,0 +1,79 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/tp/detail/worker_group.hpp"
+
+#include <boost/utility.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+worker_group::worker_group()
+:
+cont_(),
+id_idx_( cont_.get< id_idx_tag >() ),
+rnd_idx_( cont_.get< rnd_idx_tag >() )
+{}
+
+const worker
+worker_group::operator[]( std::size_t pos) const
+{ return rnd_idx_[pos]; }
+
+std::size_t
+worker_group::size() const
+{ return cont_.size(); }
+
+const worker_group::iterator
+worker_group::begin()
+{ return id_idx_.begin(); }
+
+const worker_group::const_iterator
+worker_group::begin() const
+{ return id_idx_.begin(); }
+
+const worker_group::iterator
+worker_group::end()
+{ return id_idx_.end(); }
+
+const worker_group::const_iterator
+worker_group::end() const
+{ return id_idx_.end(); }
+
+const worker_group::const_iterator
+worker_group::find( thread::id const& id) const
+{ return id_idx_.find( id); }
+
+void
+worker_group::join_all()
+{
+ BOOST_FOREACH( worker w, cont_)
+ { w.join(); }
+}
+
+void
+worker_group::interrupt_all()
+{
+ BOOST_FOREACH( worker w, cont_)
+ { w.interrupt(); }
+}
+
+void
+worker_group::insert( worker const& w)
+{ cont_.insert( w); }
+
+void
+worker_group::signal_shutdown_all()
+{
+ BOOST_FOREACH( worker w, cont_)
+ { w.signal_shutdown(); }
+}
+
+void
+worker_group::signal_shutdown_now_all()
+{
+ BOOST_FOREACH( worker w, cont_)
+ { w.signal_shutdown_now(); }
+}
+} } }
+


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk