Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r51614 - in sandbox/threadpool: boost/tp boost/tp/detail libs/tp/build libs/tp/examples libs/tp/src
From: oliver.kowalke_at_[hidden]
Date: 2009-03-04 17:52:44


Author: olli
Date: 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
New Revision: 51614
URL: http://svn.boost.org/trac/boost/changeset/51614

Log:
* worker moved from pool.hpp into worker.hpp
* dependencyy between pool and worker reduced
* this_task::reschedule_until() expects an callable object
* this_task::is_valid() returns if current thread is a valid worker thread

Added:
   sandbox/threadpool/boost/tp/detail/worker.hpp (contents, props changed)
   sandbox/threadpool/libs/tp/src/worker.cpp (contents, props changed)
Text files modified:
   sandbox/threadpool/boost/tp/pool.hpp | 345 ++++++++-------------------------------
   sandbox/threadpool/libs/tp/build/Jamfile.v2 | 2
   sandbox/threadpool/libs/tp/examples/reschedule_until.cpp | 31 ++
   3 files changed, 96 insertions(+), 282 deletions(-)

Added: sandbox/threadpool/boost/tp/detail/worker.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker.hpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -0,0 +1,156 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_H
+#define BOOST_TP_DETAIL_WORKER_H
+
+#include <cstddef>
+#include <utility>
+
+#include <boost/assert.hpp>
+#include <boost/function.hpp>
+#include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/wsq.hpp>
+
+namespace boost {
+namespace tp {
+namespace detail
+{
+class worker
+{
+private:
+ static thread_specific_ptr< worker > tss_;
+
+ class impl : private noncopyable
+ {
+ private:
+ typedef std::pair< callable, interrupter > item;
+ shared_ptr< thread > thrd_;
+ wsq< item > wsq_;
+ interprocess::interprocess_semaphore shtdwn_sem_;
+ interprocess::interprocess_semaphore shtdwn_now_sem_;
+ bool shtdwn_;
+ std::size_t scns_;
+
+ public:
+ impl( function< void() > const&);
+
+ virtual ~impl() {}
+
+ const shared_ptr< thread > thrd() const;
+
+ const thread::id get_id() const;
+
+ void join() const;
+
+ void interrupt() const;
+
+ void put( callable const&, interrupter const&);
+
+ bool try_take( callable &, interrupter &);
+
+ bool try_steal( callable &, interrupter &);
+
+ bool empty();
+
+ void signal_shutdown();
+
+ void signal_shutdown_now();
+
+ bool shutdown();
+
+ bool shutdown_now();
+
+ std::size_t scanns() const;
+
+ void increment_scanns();
+
+ void reset_scanns();
+
+ virtual void reschedule_until( function< bool() > const&) = 0;
+ };
+
+ template< typename Pool >
+ class impl_pool : public impl
+ {
+ private:
+ Pool & p_;
+
+ public:
+ impl_pool( Pool & p, function< void() > const& fn)
+ : impl( fn), p_( p)
+ {}
+
+ void reschedule_until( function< bool() > const& pred)
+ { p_.reschedule_until_( pred); }
+
+ Pool & get_pool() const
+ { return p_; }
+ };
+
+ shared_ptr< impl > impl_;
+
+public:
+ template< typename Pool >
+ worker(
+ Pool & pool,
+ function< void() > const& fn)
+ : impl_( new impl_pool< Pool >( pool, fn) )
+ {}
+
+ const shared_ptr< thread > thrd() const;
+
+ const thread::id get_id() const;
+
+ void join() const;
+
+ void interrupt() const;
+
+ void put( callable const&, interrupter const&);
+
+ bool try_take( callable &, interrupter &);
+
+ bool try_steal( callable &, interrupter &);
+
+ bool empty() const;
+
+ void signal_shutdown();
+
+ void signal_shutdown_now();
+
+ bool shutdown();
+
+ bool shutdown_now();
+
+ std::size_t scanns() const;
+
+ void increment_scanns();
+
+ void reset_scanns();
+
+ void reschedule_until( function< bool() > const&);
+
+ template< typename Pool >
+ Pool & get_thread_pool() const
+ {
+ impl_pool< Pool > * p( dynamic_cast< impl_pool< Pool > * >( impl_.get() ) );
+ BOOST_ASSERT( p);
+ return p->get_pool();
+ }
+
+ static worker * tss_get();
+
+ static void tss_reset( worker * w);
+};
+
+} } }
+
+#endif // BOOST_TP_DETAIL_WORKER_H
+

Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -16,7 +16,6 @@
 #include <boost/bind.hpp>
 #include <boost/foreach.hpp>
 #include <boost/function.hpp>
-#include <boost/interprocess/sync/interprocess_semaphore.hpp>
 #include <boost/multi_index_container.hpp>
 #include <boost/multi_index/mem_fun.hpp>
 #include <boost/multi_index/ordered_index.hpp>
@@ -24,13 +23,8 @@
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <boost/random.hpp>
 #include <boost/ref.hpp>
-#include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
-#include <boost/thread/condition.hpp>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/shared_mutex.hpp>
-#include <boost/thread/tss.hpp>
 #include <boost/utility.hpp>
 #include <boost/utility/result_of.hpp>
 
@@ -40,7 +34,7 @@
 #ifdef BOOST_BIND_WORKER_TO_PROCESSORS
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
-#include <boost/tp/detail/wsq.hpp>
+#include <boost/tp/detail/worker.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/poolsize.hpp>
 #include <boost/tp/scanns.hpp>
@@ -48,38 +42,27 @@
 #include <boost/tp/watermark.hpp>
 
 namespace boost {
-
 namespace this_task
 {
-template<
- typename Pool,
- typename R
->
-void reschedule_until( unique_future< R > const& f)
-{
- typename Pool::worker * w( Pool::tss_worker_.get() );
- BOOST_ASSERT( w);
- w->reschedule_until( f);
-}
-
-template<
- typename Pool,
- typename R
->
-void reschedule_until( shared_future< R > const& f)
+template< typename Pred >
+void reschedule_until( Pred const& pred)
 {
- typename Pool::worker * w( Pool::tss_worker_.get() );
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
         BOOST_ASSERT( w);
- w->reschedule_until( f);
+ w->reschedule_until( pred);
 }
 
 template< typename Pool >
 Pool & get_thread_pool()
 {
- typename Pool::worker * w( Pool::tss_worker_.get() );
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
         BOOST_ASSERT( w);
- return w->get_thread_pool();
+ return w->get_thread_pool< Pool >();
 }
+
+inline
+bool is_valid()
+{ return tp::detail::worker::tss_get() != 0; }
 }
 
 namespace tp
@@ -88,21 +71,14 @@
 class pool : private noncopyable
 {
 private:
- template<
- typename Pool,
- typename R
- >
- friend void this_task::reschedule_until( unique_future< R > const&);
-
- template<
- typename Pool,
- typename R
- >
- friend void this_task::reschedule_until( shared_future< R > const&);
+ template< typename Pred >
+ friend void this_task::reschedule_until( Pred const&);
 
         template< typename Pool >
         friend Pool & this_task::get_thread_pool();
 
+ friend class detail::worker;
+
         typedef Channel channel;
         typedef typename channel::item channel_item;
 
@@ -113,200 +89,18 @@
                 terminated_state
         };
 
- class worker
- {
- private:
- class wimpl : private noncopyable
- {
- private:
- typedef std::pair< detail::callable, detail::interrupter > item;
-
- pool * pool_ptr_;
- shared_ptr< thread > thrd_;
- detail::wsq< item > wsq_;
- interprocess::interprocess_semaphore shtdwn_sem_;
- interprocess::interprocess_semaphore shtdwn_now_sem_;
- bool shtdwn_;
- std::size_t scns_;
-
- public:
- wimpl(
- pool * pool_ptr,
- function< void() > const& fn)
- :
- pool_ptr_( pool_ptr),
- thrd_( new thread( fn) ),
- wsq_(),
- shtdwn_sem_( 0),
- shtdwn_now_sem_( 0),
- shtdwn_( false),
- scns_( 0)
- { BOOST_ASSERT( ! fn.empty() ); }
-
- const shared_ptr< thread > thrd() const
- { return thrd_; }
-
- const thread::id get_id() const
- { return thrd_->get_id(); }
-
- void join() const
- { thrd_->join(); }
-
- void interrupt() const
- { thrd_->interrupt(); }
-
- void put(
- detail::callable const& ca,
- detail::interrupter const& intr)
- {
- BOOST_ASSERT( ! ca.empty() );
- wsq_.put( std::make_pair( ca, intr) );
- }
-
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
- {
- item itm;
- bool result( wsq_.try_take( itm) );
- if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
- return result;
- }
-
- bool try_steal(
- detail::callable & ca,
- detail::interrupter & intr)
- {
- item itm;
- bool result( wsq_.try_steal( itm) );
- if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
- return result;
- }
-
- bool empty()
- { return wsq_.empty(); }
-
- void signal_shutdown()
- { shtdwn_sem_.post(); }
-
- void signal_shutdown_now()
- { shtdwn_now_sem_.post(); }
-
- bool shutdown()
- {
- if ( ! shtdwn_)
- shtdwn_ = shtdwn_sem_.try_wait();
- return shtdwn_;
- }
-
- bool shutdown_now()
- { return shtdwn_now_sem_.try_wait(); }
-
- std::size_t scanns() const
- { return scns_; }
-
- void increment_scanns()
- { ++scns_; }
-
- void reset_scanns()
- { scns_ = 0; }
-
- template< typename F >
- void reschedule_until( F const& f)
- { pool_ptr_->reschedule_until_( f); }
-
- pool & get_thread_pool()
- { return * pool_ptr_; }
- };
-
- shared_ptr< wimpl > impl_;
-
- public:
- worker(
- pool * pool_ptr,
- function< void() > const& fn)
- : impl_( new wimpl( pool_ptr, fn) )
- {}
-
- const shared_ptr< thread > thrd() const
- { return impl_->thrd(); }
-
- const thread::id get_id() const
- { return impl_->get_id(); }
-
- void join() const
- { impl_->join(); }
-
- void interrupt() const
- { impl_->interrupt(); }
-
- void put(
- detail::callable const& ca,
- detail::interrupter const& intr)
- { impl_->put( ca, intr); }
-
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
- { return impl_->try_take( ca, intr); }
-
- bool try_steal(
- detail::callable & ca,
- detail::interrupter & intr)
- { return impl_->try_steal( ca, intr); }
-
- bool empty() const
- { return impl_->empty(); }
-
- void signal_shutdown()
- { impl_->signal_shutdown(); }
-
- void signal_shutdown_now()
- { impl_->signal_shutdown_now(); }
-
- bool shutdown()
- { return impl_->shutdown(); }
-
- bool shutdown_now()
- { return impl_->shutdown_now(); }
-
- std::size_t scanns() const
- { return impl_->scanns(); }
-
- void increment_scanns()
- { impl_->increment_scanns(); }
-
- void reset_scanns()
- { impl_->reset_scanns(); }
-
- template< typename F >
- void reschedule_until( F const& f)
- { return impl_->reschedule_until( f); }
-
- pool & get_thread_pool()
- { return impl_->get_thread_pool(); }
- };
-
         struct id_idx_tag {};
         struct rnd_idx_tag {};
 
         typedef multi_index::multi_index_container<
- worker,
+ detail::worker,
                 multi_index::indexed_by<
                         multi_index::ordered_unique<
                                 multi_index::tag< id_idx_tag >,
                                 multi_index::const_mem_fun<
- worker,
+ detail::worker,
                                         const thread::id,
- & worker::get_id
+ & detail::worker::get_id
>
>,
                         multi_index::random_access< multi_index::tag< rnd_idx_tag > >
@@ -326,10 +120,10 @@
                 variate_generator< rand48 &, uniform_int<> > die_;
 
         public:
- random_idx( worker_list & lst)
+ random_idx( std::size_t size)
                 :
                 rng_(),
- six_( 0, lst.size() - 1),
+ six_( 0, size - 1),
                 die_( rng_, six_)
                 {}
 
@@ -337,19 +131,18 @@
                 { return die_(); }
         };
 
- static thread_specific_ptr< worker > tss_worker_;
         static thread_specific_ptr< random_idx > tss_rnd_idx_;
 
- worker_list worker_;
- shared_mutex mtx_worker_;
- state state_;
- shared_mutex mtx_state_;
- channel channel_;
- posix_time::time_duration asleep_;
- scanns scns_;
- volatile uint32_t active_worker_;
- volatile uint32_t idle_worker_;
- volatile uint32_t running_worker_;
+ worker_list worker_;
+ shared_mutex mtx_worker_;
+ state state_;
+ shared_mutex mtx_state_;
+ channel channel_;
+ posix_time::time_duration asleep_;
+ scanns scns_;
+ volatile uint32_t active_worker_;
+ volatile uint32_t idle_worker_;
+ volatile uint32_t running_worker_;
 
         void execute_(
                 detail::callable & ca,
@@ -369,7 +162,7 @@
                 BOOST_ASSERT( ca.empty() );
         }
 
- void next_callable_( worker & w, detail::callable & ca, detail::interrupter & intr)
+ void next_callable_( detail::worker & w, detail::callable & ca, detail::interrupter & intr)
         {
                 rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
                 if ( ! w.try_take( ca, intr) )
@@ -379,7 +172,7 @@
                                 std::size_t idx( ( * tss_rnd_idx_)() );
                                 for ( std::size_t j( 0); j < worker_.size(); ++j)
                                 {
- worker other( ridx[idx]);
+ detail::worker other( ridx[idx]);
                                         if ( this_thread::get_id() == other.get_id() ) continue;
                                         if ( ++idx >= worker_.size() ) idx = 0;
                                         if ( other.try_steal( ca, intr) ) break;
@@ -405,16 +198,16 @@
                 }
         }
 
- template< typename F >
- void reschedule_until_( F const& f)
+ void reschedule_until_( function< bool() > const& pred)
         {
- worker * w( tss_worker_.get() );
+ detail::worker * w( detail::worker::tss_get() );
                 BOOST_ASSERT( w);
+ BOOST_ASSERT( w->get_id() == this_thread::get_id() );
                 shared_ptr< thread > thrd( w->thrd() );
                 BOOST_ASSERT( thrd);
                 detail::interrupter intr;
                 detail::callable ca;
- while ( ! f.is_ready() )
+ while ( ! pred() )
                 {
                         next_callable_( * w, ca, intr);
                         if( ! ca.empty() )
@@ -432,26 +225,27 @@
                 typename id_idx::iterator i( iidx.find( this_thread::get_id() ) );
                 lk.unlock();
                 BOOST_ASSERT( i != iidx.end() );
-
- worker w( * i);
- BOOST_ASSERT( w.get_id() == this_thread::get_id() );
- tss_worker_.reset( new worker( w) );
- shared_ptr< thread > thrd( w.thrd() );
+ detail::worker::tss_reset( new detail::worker( * i) );
+
+ detail::worker * w( detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ BOOST_ASSERT( w->get_id() == this_thread::get_id() );
+ shared_ptr< thread > thrd( w->thrd() );
                 BOOST_ASSERT( thrd);
                 detail::callable ca;
                 detail::interrupter intr;
 
- pool::tss_rnd_idx_.reset( new random_idx( worker_) );
+ tss_rnd_idx_.reset( new random_idx( worker_.size() ) );
 
                 detail::guard grd( running_worker_);
 
- while ( ! shutdown_( w) )
+ while ( ! shutdown_( * w) )
                 {
- next_callable_( w, ca, intr);
+ next_callable_( * w, ca, intr);
                         if( ! ca.empty() )
                         {
                                 execute_( ca, intr, thrd);
- w.reset_scanns();
+ w->reset_scanns();
                         }
                 }
         }
@@ -460,8 +254,8 @@
         {
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
                 worker_.insert(
- worker(
- this,
+ detail::worker(
+ * this,
                                 boost::bind(
                                         & pool::entry_,
                                         this) ) );
@@ -477,14 +271,13 @@
         void create_worker_( std::size_t n)
         {
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
- worker w(
- this,
+ worker_.insert(
+ detail::worker(
+ * this,
                                 boost::bind(
                                         & pool::entry_,
                                         this,
- n) );
- worker_.insert(
- w );
+ n) ) );
         }
 #endif
 
@@ -503,7 +296,7 @@
         bool terminateing_() const
         { return state_ == terminateing_state; }
 
- bool shutdown_( worker & w)
+ bool shutdown_( detail::worker & w)
         {
                 if ( w.shutdown() && channel_.empty() )
                         return true;
@@ -649,9 +442,9 @@
 
                 channel_.deactivate();
                 shared_lock< shared_mutex > lk2( mtx_worker_);
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
                 { w.signal_shutdown(); }
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
                 { w.join(); }
                 lk2.unlock();
 
@@ -668,12 +461,12 @@
 
                 channel_.deactivate_now();
                 shared_lock< shared_mutex > lk2( mtx_worker_);
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
                 {
                         w.signal_shutdown_now();
                         w.interrupt();
                 }
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
                 { w.join(); }
                 lk2.unlock();
                 std::vector< detail::callable > drain( channel_.drain() );
@@ -731,14 +524,18 @@
                 detail::interrupter intr;
                 packaged_task< R > tsk( act);
                 shared_future< R > f( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker * w( detail::worker::tss_get() );
                 if ( w)
                 {
+ function< bool() > wcb(
+ bind(
+ & shared_future< R >::is_ready,
+ f) );
                         tsk.set_wait_callback(
                                 bind(
- ( void ( pool::*)( shared_future< R > const&) ) & pool::reschedule_until_,
+ ( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
                                         this,
- f) );
+ wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
                 }
@@ -768,14 +565,18 @@
                 detail::interrupter intr;
                 packaged_task< R > tsk( act);
                 shared_future< R > f( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker * w( detail::worker::tss_get() );
                 if ( w)
                 {
+ function< bool() > wcb(
+ bind(
+ & shared_future< R >::is_ready,
+ f) );
                         tsk.set_wait_callback(
                                 bind(
- ( void ( pool::*)( shared_future< R > const&) ) & pool::reschedule_until_,
+ ( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
                                         this,
- f) );
+ wcb) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( f, intr);
                 }
@@ -795,10 +596,6 @@
 };
 
 template< typename Channel >
-thread_specific_ptr< typename pool< Channel >::worker >
-pool< Channel >::tss_worker_;
-
-template< typename Channel >
 thread_specific_ptr< typename pool< Channel >::random_idx >
 pool< Channel >::tss_rnd_idx_;
 

Modified: sandbox/threadpool/libs/tp/build/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/build/Jamfile.v2 (original)
+++ sandbox/threadpool/libs/tp/build/Jamfile.v2 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -14,7 +14,7 @@
       <link>static:<define>BOOST_THREADPOOL_STATIC_LINK=1
     ;
 
-SOURCES = callable default_pool guard interrupter poolsize scanns watermark ;
+SOURCES = callable default_pool guard interrupter poolsize scanns watermark worker ;
 
 lib boost_threadpool
    : $(SOURCES).cpp

Modified: sandbox/threadpool/libs/tp/examples/reschedule_until.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/reschedule_until.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/reschedule_until.cpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -21,9 +21,23 @@
 class fibo
 {
 private:
- jss::shared_future< void > f_;
+ boost::shared_future< void > f_;
         int offset_;
 
+ class holder
+ {
+ private:
+ boost::shared_future< void > f_;
+
+ public:
+ holder( boost::shared_future< void > const& f)
+ : f_( f)
+ {}
+
+ bool operator()()
+ { return f_.is_ready(); }
+ };
+
         int seq_( int n)
         {
                 if ( n <= 1) return n;
@@ -36,7 +50,10 @@
                 else
                 {
                         if ( n == 7)
- boost::this_task::reschedule_until< pool_type >( f_);
+ {
+ holder hldr( f_);
+ boost::this_task::reschedule_until( hldr);
+ }
 
                         boost::function< int() > fn1 = boost::bind(
                                                 & fibo::par_,
@@ -53,13 +70,13 @@
                                 boost::this_task::get_thread_pool< pool_type >().submit(
                                         fn2) );
 
- return t1.get() + t2.get();
+ return t1.result().get() + t2.result().get();
                 }
         }
 
 public:
         fibo(
- jss::shared_future< void > f,
+ boost::shared_future< void > f,
                 int offset)
         : f_( f), offset_( offset)
         {}
@@ -78,8 +95,8 @@
         try
         {
                 pool_type pool( tp::poolsize( 1) );
- jss::packaged_task< void > tsk( boost::bind( f) );
- jss::shared_future< void > f( tsk.get_future() );
+ boost::packaged_task< void > tsk( boost::bind( f) );
+ boost::shared_future< void > f( tsk.get_future() );
                 fibo fib( f, 3);
                 std::vector< tp::task< int > > results;
                 results.reserve( 40);
@@ -103,7 +120,7 @@
                         std::vector< tp::task< int > >::iterator i( results.begin() );
                         i != e;
                         ++i)
- std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;
+ std::cout << "fibonacci " << k++ << " == " << i->result().get() << std::endl;
 
                 pt::ptime stop( pt::microsec_clock::universal_time() );
                 std::cout << ( stop - start).total_milliseconds() << " milli seconds" << std::endl;

Added: sandbox/threadpool/libs/tp/src/worker.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/worker.cpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -0,0 +1,189 @@
+#include "boost/tp/detail/worker.hpp"
+
+namespace boost {
+namespace tp {
+namespace detail
+{
+thread_specific_ptr< worker > worker::tss_;
+
+worker::impl::impl( function< void() > const& fn)
+:
+thrd_( new thread( fn) ),
+wsq_(),
+shtdwn_sem_( 0),
+shtdwn_now_sem_( 0),
+shtdwn_( false),
+scns_( 0)
+{ BOOST_ASSERT( ! fn.empty() ); }
+
+const shared_ptr< thread >
+worker::impl::thrd() const
+{ return thrd_; }
+
+const thread::id
+worker::impl::get_id() const
+{ return thrd_->get_id(); }
+
+void
+worker::impl::join() const
+{ thrd_->join(); }
+
+void
+worker::impl::interrupt() const
+{ thrd_->interrupt(); }
+
+void
+worker::impl::put(
+ callable const& ca,
+ interrupter const& intr)
+{
+ BOOST_ASSERT( ! ca.empty() );
+ wsq_.put( std::make_pair( ca, intr) );
+}
+
+bool
+worker::impl::try_take(
+ callable & ca,
+ interrupter & intr)
+{
+ item itm;
+ bool result( wsq_.try_take( itm) );
+ if ( result)
+ {
+ ca = itm.first;
+ intr = itm.second;
+ }
+ return result;
+}
+
+bool
+worker::impl::try_steal(
+ callable & ca,
+ interrupter & intr)
+{
+ item itm;
+ bool result( wsq_.try_steal( itm) );
+ if ( result)
+ {
+ ca = itm.first;
+ intr = itm.second;
+ }
+ return result;
+}
+
+bool
+worker::impl::empty()
+{ return wsq_.empty(); }
+
+void
+worker::impl::signal_shutdown()
+{ shtdwn_sem_.post(); }
+
+void
+worker::impl::signal_shutdown_now()
+{ shtdwn_now_sem_.post(); }
+
+bool
+worker::impl::shutdown()
+{
+ if ( ! shtdwn_)
+ shtdwn_ = shtdwn_sem_.try_wait();
+ return shtdwn_;
+}
+
+bool
+worker::impl::shutdown_now()
+{ return shtdwn_now_sem_.try_wait(); }
+
+std::size_t
+worker::impl::scanns() const
+{ return scns_; }
+
+void
+worker::impl::increment_scanns()
+{ ++scns_; }
+
+void
+worker::impl::reset_scanns()
+{ scns_ = 0; }
+
+const shared_ptr< thread >
+worker::thrd() const
+{ return impl_->thrd(); }
+
+const thread::id
+worker::get_id() const
+{ return impl_->get_id(); }
+
+void
+worker::join() const
+{ impl_->join(); }
+
+void
+worker::interrupt() const
+{ impl_->interrupt(); }
+
+void
+worker::put(
+ callable const& ca,
+ interrupter const& intr)
+{ impl_->put( ca, intr); }
+
+bool
+worker::try_take(
+ callable & ca,
+ interrupter & intr)
+{ return impl_->try_take( ca, intr); }
+
+bool
+worker::try_steal(
+ callable & ca,
+ interrupter & intr)
+{ return impl_->try_steal( ca, intr); }
+
+bool
+worker::empty() const
+{ return impl_->empty(); }
+
+void
+worker::signal_shutdown()
+{ impl_->signal_shutdown(); }
+
+void
+worker::signal_shutdown_now()
+{ impl_->signal_shutdown_now(); }
+
+bool
+worker::shutdown()
+{ return impl_->shutdown(); }
+
+bool
+worker::shutdown_now()
+{ return impl_->shutdown_now(); }
+
+std::size_t
+worker::scanns() const
+{ return impl_->scanns(); }
+
+void
+worker::increment_scanns()
+{ impl_->increment_scanns(); }
+
+void
+worker::reset_scanns()
+{ impl_->reset_scanns(); }
+
+void
+worker::reschedule_until( function< bool() > const& pred)
+{ return impl_->reschedule_until( pred); }
+
+worker *
+worker::tss_get()
+{ return worker::tss_.get(); }
+
+void
+worker::tss_reset( worker * w)
+{ worker::tss_.reset( w); }
+
+} } }
+


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk