|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r51614 - in sandbox/threadpool: boost/tp boost/tp/detail libs/tp/build libs/tp/examples libs/tp/src
From: oliver.kowalke_at_[hidden]
Date: 2009-03-04 17:52:44
Author: olli
Date: 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
New Revision: 51614
URL: http://svn.boost.org/trac/boost/changeset/51614
Log:
* worker moved from pool.hpp into worker.hpp
* dependencyy between pool and worker reduced
* this_task::reschedule_until() expects an callable object
* this_task::is_valid() returns if current thread is a valid worker thread
Added:
sandbox/threadpool/boost/tp/detail/worker.hpp (contents, props changed)
sandbox/threadpool/libs/tp/src/worker.cpp (contents, props changed)
Text files modified:
sandbox/threadpool/boost/tp/pool.hpp | 345 ++++++++-------------------------------
sandbox/threadpool/libs/tp/build/Jamfile.v2 | 2
sandbox/threadpool/libs/tp/examples/reschedule_until.cpp | 31 ++
3 files changed, 96 insertions(+), 282 deletions(-)
Added: sandbox/threadpool/boost/tp/detail/worker.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker.hpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -0,0 +1,156 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_H
+#define BOOST_TP_DETAIL_WORKER_H
+
+#include <cstddef>
+#include <utility>
+
+#include <boost/assert.hpp>
+#include <boost/function.hpp>
+#include <boost/interprocess/sync/interprocess_semaphore.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/wsq.hpp>
+
+namespace boost {
+namespace tp {
+namespace detail
+{
+class worker
+{
+private:
+ static thread_specific_ptr< worker > tss_;
+
+ class impl : private noncopyable
+ {
+ private:
+ typedef std::pair< callable, interrupter > item;
+ shared_ptr< thread > thrd_;
+ wsq< item > wsq_;
+ interprocess::interprocess_semaphore shtdwn_sem_;
+ interprocess::interprocess_semaphore shtdwn_now_sem_;
+ bool shtdwn_;
+ std::size_t scns_;
+
+ public:
+ impl( function< void() > const&);
+
+ virtual ~impl() {}
+
+ const shared_ptr< thread > thrd() const;
+
+ const thread::id get_id() const;
+
+ void join() const;
+
+ void interrupt() const;
+
+ void put( callable const&, interrupter const&);
+
+ bool try_take( callable &, interrupter &);
+
+ bool try_steal( callable &, interrupter &);
+
+ bool empty();
+
+ void signal_shutdown();
+
+ void signal_shutdown_now();
+
+ bool shutdown();
+
+ bool shutdown_now();
+
+ std::size_t scanns() const;
+
+ void increment_scanns();
+
+ void reset_scanns();
+
+ virtual void reschedule_until( function< bool() > const&) = 0;
+ };
+
+ template< typename Pool >
+ class impl_pool : public impl
+ {
+ private:
+ Pool & p_;
+
+ public:
+ impl_pool( Pool & p, function< void() > const& fn)
+ : impl( fn), p_( p)
+ {}
+
+ void reschedule_until( function< bool() > const& pred)
+ { p_.reschedule_until_( pred); }
+
+ Pool & get_pool() const
+ { return p_; }
+ };
+
+ shared_ptr< impl > impl_;
+
+public:
+ template< typename Pool >
+ worker(
+ Pool & pool,
+ function< void() > const& fn)
+ : impl_( new impl_pool< Pool >( pool, fn) )
+ {}
+
+ const shared_ptr< thread > thrd() const;
+
+ const thread::id get_id() const;
+
+ void join() const;
+
+ void interrupt() const;
+
+ void put( callable const&, interrupter const&);
+
+ bool try_take( callable &, interrupter &);
+
+ bool try_steal( callable &, interrupter &);
+
+ bool empty() const;
+
+ void signal_shutdown();
+
+ void signal_shutdown_now();
+
+ bool shutdown();
+
+ bool shutdown_now();
+
+ std::size_t scanns() const;
+
+ void increment_scanns();
+
+ void reset_scanns();
+
+ void reschedule_until( function< bool() > const&);
+
+ template< typename Pool >
+ Pool & get_thread_pool() const
+ {
+ impl_pool< Pool > * p( dynamic_cast< impl_pool< Pool > * >( impl_.get() ) );
+ BOOST_ASSERT( p);
+ return p->get_pool();
+ }
+
+ static worker * tss_get();
+
+ static void tss_reset( worker * w);
+};
+
+} } }
+
+#endif // BOOST_TP_DETAIL_WORKER_H
+
Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -16,7 +16,6 @@
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
-#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/ordered_index.hpp>
@@ -24,13 +23,8 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/random.hpp>
#include <boost/ref.hpp>
-#include <boost/scoped_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
-#include <boost/thread/condition.hpp>
-#include <boost/thread/locks.hpp>
-#include <boost/thread/shared_mutex.hpp>
-#include <boost/thread/tss.hpp>
#include <boost/utility.hpp>
#include <boost/utility/result_of.hpp>
@@ -40,7 +34,7 @@
#ifdef BOOST_BIND_WORKER_TO_PROCESSORS
#include <boost/tp/detail/bind_processor.hpp>
#endif
-#include <boost/tp/detail/wsq.hpp>
+#include <boost/tp/detail/worker.hpp>
#include <boost/tp/exceptions.hpp>
#include <boost/tp/poolsize.hpp>
#include <boost/tp/scanns.hpp>
@@ -48,38 +42,27 @@
#include <boost/tp/watermark.hpp>
namespace boost {
-
namespace this_task
{
-template<
- typename Pool,
- typename R
->
-void reschedule_until( unique_future< R > const& f)
-{
- typename Pool::worker * w( Pool::tss_worker_.get() );
- BOOST_ASSERT( w);
- w->reschedule_until( f);
-}
-
-template<
- typename Pool,
- typename R
->
-void reschedule_until( shared_future< R > const& f)
+template< typename Pred >
+void reschedule_until( Pred const& pred)
{
- typename Pool::worker * w( Pool::tss_worker_.get() );
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
BOOST_ASSERT( w);
- w->reschedule_until( f);
+ w->reschedule_until( pred);
}
template< typename Pool >
Pool & get_thread_pool()
{
- typename Pool::worker * w( Pool::tss_worker_.get() );
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
BOOST_ASSERT( w);
- return w->get_thread_pool();
+ return w->get_thread_pool< Pool >();
}
+
+inline
+bool is_valid()
+{ return tp::detail::worker::tss_get() != 0; }
}
namespace tp
@@ -88,21 +71,14 @@
class pool : private noncopyable
{
private:
- template<
- typename Pool,
- typename R
- >
- friend void this_task::reschedule_until( unique_future< R > const&);
-
- template<
- typename Pool,
- typename R
- >
- friend void this_task::reschedule_until( shared_future< R > const&);
+ template< typename Pred >
+ friend void this_task::reschedule_until( Pred const&);
template< typename Pool >
friend Pool & this_task::get_thread_pool();
+ friend class detail::worker;
+
typedef Channel channel;
typedef typename channel::item channel_item;
@@ -113,200 +89,18 @@
terminated_state
};
- class worker
- {
- private:
- class wimpl : private noncopyable
- {
- private:
- typedef std::pair< detail::callable, detail::interrupter > item;
-
- pool * pool_ptr_;
- shared_ptr< thread > thrd_;
- detail::wsq< item > wsq_;
- interprocess::interprocess_semaphore shtdwn_sem_;
- interprocess::interprocess_semaphore shtdwn_now_sem_;
- bool shtdwn_;
- std::size_t scns_;
-
- public:
- wimpl(
- pool * pool_ptr,
- function< void() > const& fn)
- :
- pool_ptr_( pool_ptr),
- thrd_( new thread( fn) ),
- wsq_(),
- shtdwn_sem_( 0),
- shtdwn_now_sem_( 0),
- shtdwn_( false),
- scns_( 0)
- { BOOST_ASSERT( ! fn.empty() ); }
-
- const shared_ptr< thread > thrd() const
- { return thrd_; }
-
- const thread::id get_id() const
- { return thrd_->get_id(); }
-
- void join() const
- { thrd_->join(); }
-
- void interrupt() const
- { thrd_->interrupt(); }
-
- void put(
- detail::callable const& ca,
- detail::interrupter const& intr)
- {
- BOOST_ASSERT( ! ca.empty() );
- wsq_.put( std::make_pair( ca, intr) );
- }
-
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
- {
- item itm;
- bool result( wsq_.try_take( itm) );
- if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
- return result;
- }
-
- bool try_steal(
- detail::callable & ca,
- detail::interrupter & intr)
- {
- item itm;
- bool result( wsq_.try_steal( itm) );
- if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
- return result;
- }
-
- bool empty()
- { return wsq_.empty(); }
-
- void signal_shutdown()
- { shtdwn_sem_.post(); }
-
- void signal_shutdown_now()
- { shtdwn_now_sem_.post(); }
-
- bool shutdown()
- {
- if ( ! shtdwn_)
- shtdwn_ = shtdwn_sem_.try_wait();
- return shtdwn_;
- }
-
- bool shutdown_now()
- { return shtdwn_now_sem_.try_wait(); }
-
- std::size_t scanns() const
- { return scns_; }
-
- void increment_scanns()
- { ++scns_; }
-
- void reset_scanns()
- { scns_ = 0; }
-
- template< typename F >
- void reschedule_until( F const& f)
- { pool_ptr_->reschedule_until_( f); }
-
- pool & get_thread_pool()
- { return * pool_ptr_; }
- };
-
- shared_ptr< wimpl > impl_;
-
- public:
- worker(
- pool * pool_ptr,
- function< void() > const& fn)
- : impl_( new wimpl( pool_ptr, fn) )
- {}
-
- const shared_ptr< thread > thrd() const
- { return impl_->thrd(); }
-
- const thread::id get_id() const
- { return impl_->get_id(); }
-
- void join() const
- { impl_->join(); }
-
- void interrupt() const
- { impl_->interrupt(); }
-
- void put(
- detail::callable const& ca,
- detail::interrupter const& intr)
- { impl_->put( ca, intr); }
-
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
- { return impl_->try_take( ca, intr); }
-
- bool try_steal(
- detail::callable & ca,
- detail::interrupter & intr)
- { return impl_->try_steal( ca, intr); }
-
- bool empty() const
- { return impl_->empty(); }
-
- void signal_shutdown()
- { impl_->signal_shutdown(); }
-
- void signal_shutdown_now()
- { impl_->signal_shutdown_now(); }
-
- bool shutdown()
- { return impl_->shutdown(); }
-
- bool shutdown_now()
- { return impl_->shutdown_now(); }
-
- std::size_t scanns() const
- { return impl_->scanns(); }
-
- void increment_scanns()
- { impl_->increment_scanns(); }
-
- void reset_scanns()
- { impl_->reset_scanns(); }
-
- template< typename F >
- void reschedule_until( F const& f)
- { return impl_->reschedule_until( f); }
-
- pool & get_thread_pool()
- { return impl_->get_thread_pool(); }
- };
-
struct id_idx_tag {};
struct rnd_idx_tag {};
typedef multi_index::multi_index_container<
- worker,
+ detail::worker,
multi_index::indexed_by<
multi_index::ordered_unique<
multi_index::tag< id_idx_tag >,
multi_index::const_mem_fun<
- worker,
+ detail::worker,
const thread::id,
- & worker::get_id
+ & detail::worker::get_id
>
>,
multi_index::random_access< multi_index::tag< rnd_idx_tag > >
@@ -326,10 +120,10 @@
variate_generator< rand48 &, uniform_int<> > die_;
public:
- random_idx( worker_list & lst)
+ random_idx( std::size_t size)
:
rng_(),
- six_( 0, lst.size() - 1),
+ six_( 0, size - 1),
die_( rng_, six_)
{}
@@ -337,19 +131,18 @@
{ return die_(); }
};
- static thread_specific_ptr< worker > tss_worker_;
static thread_specific_ptr< random_idx > tss_rnd_idx_;
- worker_list worker_;
- shared_mutex mtx_worker_;
- state state_;
- shared_mutex mtx_state_;
- channel channel_;
- posix_time::time_duration asleep_;
- scanns scns_;
- volatile uint32_t active_worker_;
- volatile uint32_t idle_worker_;
- volatile uint32_t running_worker_;
+ worker_list worker_;
+ shared_mutex mtx_worker_;
+ state state_;
+ shared_mutex mtx_state_;
+ channel channel_;
+ posix_time::time_duration asleep_;
+ scanns scns_;
+ volatile uint32_t active_worker_;
+ volatile uint32_t idle_worker_;
+ volatile uint32_t running_worker_;
void execute_(
detail::callable & ca,
@@ -369,7 +162,7 @@
BOOST_ASSERT( ca.empty() );
}
- void next_callable_( worker & w, detail::callable & ca, detail::interrupter & intr)
+ void next_callable_( detail::worker & w, detail::callable & ca, detail::interrupter & intr)
{
rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
if ( ! w.try_take( ca, intr) )
@@ -379,7 +172,7 @@
std::size_t idx( ( * tss_rnd_idx_)() );
for ( std::size_t j( 0); j < worker_.size(); ++j)
{
- worker other( ridx[idx]);
+ detail::worker other( ridx[idx]);
if ( this_thread::get_id() == other.get_id() ) continue;
if ( ++idx >= worker_.size() ) idx = 0;
if ( other.try_steal( ca, intr) ) break;
@@ -405,16 +198,16 @@
}
}
- template< typename F >
- void reschedule_until_( F const& f)
+ void reschedule_until_( function< bool() > const& pred)
{
- worker * w( tss_worker_.get() );
+ detail::worker * w( detail::worker::tss_get() );
BOOST_ASSERT( w);
+ BOOST_ASSERT( w->get_id() == this_thread::get_id() );
shared_ptr< thread > thrd( w->thrd() );
BOOST_ASSERT( thrd);
detail::interrupter intr;
detail::callable ca;
- while ( ! f.is_ready() )
+ while ( ! pred() )
{
next_callable_( * w, ca, intr);
if( ! ca.empty() )
@@ -432,26 +225,27 @@
typename id_idx::iterator i( iidx.find( this_thread::get_id() ) );
lk.unlock();
BOOST_ASSERT( i != iidx.end() );
-
- worker w( * i);
- BOOST_ASSERT( w.get_id() == this_thread::get_id() );
- tss_worker_.reset( new worker( w) );
- shared_ptr< thread > thrd( w.thrd() );
+ detail::worker::tss_reset( new detail::worker( * i) );
+
+ detail::worker * w( detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ BOOST_ASSERT( w->get_id() == this_thread::get_id() );
+ shared_ptr< thread > thrd( w->thrd() );
BOOST_ASSERT( thrd);
detail::callable ca;
detail::interrupter intr;
- pool::tss_rnd_idx_.reset( new random_idx( worker_) );
+ tss_rnd_idx_.reset( new random_idx( worker_.size() ) );
detail::guard grd( running_worker_);
- while ( ! shutdown_( w) )
+ while ( ! shutdown_( * w) )
{
- next_callable_( w, ca, intr);
+ next_callable_( * w, ca, intr);
if( ! ca.empty() )
{
execute_( ca, intr, thrd);
- w.reset_scanns();
+ w->reset_scanns();
}
}
}
@@ -460,8 +254,8 @@
{
BOOST_ASSERT( ! terminateing_() && ! terminated_() );
worker_.insert(
- worker(
- this,
+ detail::worker(
+ * this,
boost::bind(
& pool::entry_,
this) ) );
@@ -477,14 +271,13 @@
void create_worker_( std::size_t n)
{
BOOST_ASSERT( ! terminateing_() && ! terminated_() );
- worker w(
- this,
+ worker_.insert(
+ detail::worker(
+ * this,
boost::bind(
& pool::entry_,
this,
- n) );
- worker_.insert(
- w );
+ n) ) );
}
#endif
@@ -503,7 +296,7 @@
bool terminateing_() const
{ return state_ == terminateing_state; }
- bool shutdown_( worker & w)
+ bool shutdown_( detail::worker & w)
{
if ( w.shutdown() && channel_.empty() )
return true;
@@ -649,9 +442,9 @@
channel_.deactivate();
shared_lock< shared_mutex > lk2( mtx_worker_);
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
{ w.signal_shutdown(); }
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
{ w.join(); }
lk2.unlock();
@@ -668,12 +461,12 @@
channel_.deactivate_now();
shared_lock< shared_mutex > lk2( mtx_worker_);
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
{
w.signal_shutdown_now();
w.interrupt();
}
- BOOST_FOREACH( worker w, worker_)
+ BOOST_FOREACH( detail::worker w, worker_)
{ w.join(); }
lk2.unlock();
std::vector< detail::callable > drain( channel_.drain() );
@@ -731,14 +524,18 @@
detail::interrupter intr;
packaged_task< R > tsk( act);
shared_future< R > f( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker * w( detail::worker::tss_get() );
if ( w)
{
+ function< bool() > wcb(
+ bind(
+ & shared_future< R >::is_ready,
+ f) );
tsk.set_wait_callback(
bind(
- ( void ( pool::*)( shared_future< R > const&) ) & pool::reschedule_until_,
+ ( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
this,
- f) );
+ wcb) );
w->put( detail::callable( move( tsk) ), intr);
return task< R >( f, intr);
}
@@ -768,14 +565,18 @@
detail::interrupter intr;
packaged_task< R > tsk( act);
shared_future< R > f( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker * w( detail::worker::tss_get() );
if ( w)
{
+ function< bool() > wcb(
+ bind(
+ & shared_future< R >::is_ready,
+ f) );
tsk.set_wait_callback(
bind(
- ( void ( pool::*)( shared_future< R > const&) ) & pool::reschedule_until_,
+ ( void ( pool::*)( function< bool() > const&) ) & pool::reschedule_until_,
this,
- f) );
+ wcb) );
w->put( detail::callable( move( tsk) ), intr);
return task< R >( f, intr);
}
@@ -795,10 +596,6 @@
};
template< typename Channel >
-thread_specific_ptr< typename pool< Channel >::worker >
-pool< Channel >::tss_worker_;
-
-template< typename Channel >
thread_specific_ptr< typename pool< Channel >::random_idx >
pool< Channel >::tss_rnd_idx_;
Modified: sandbox/threadpool/libs/tp/build/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/build/Jamfile.v2 (original)
+++ sandbox/threadpool/libs/tp/build/Jamfile.v2 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -14,7 +14,7 @@
<link>static:<define>BOOST_THREADPOOL_STATIC_LINK=1
;
-SOURCES = callable default_pool guard interrupter poolsize scanns watermark ;
+SOURCES = callable default_pool guard interrupter poolsize scanns watermark worker ;
lib boost_threadpool
: $(SOURCES).cpp
Modified: sandbox/threadpool/libs/tp/examples/reschedule_until.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/reschedule_until.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/reschedule_until.cpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -21,9 +21,23 @@
class fibo
{
private:
- jss::shared_future< void > f_;
+ boost::shared_future< void > f_;
int offset_;
+ class holder
+ {
+ private:
+ boost::shared_future< void > f_;
+
+ public:
+ holder( boost::shared_future< void > const& f)
+ : f_( f)
+ {}
+
+ bool operator()()
+ { return f_.is_ready(); }
+ };
+
int seq_( int n)
{
if ( n <= 1) return n;
@@ -36,7 +50,10 @@
else
{
if ( n == 7)
- boost::this_task::reschedule_until< pool_type >( f_);
+ {
+ holder hldr( f_);
+ boost::this_task::reschedule_until( hldr);
+ }
boost::function< int() > fn1 = boost::bind(
& fibo::par_,
@@ -53,13 +70,13 @@
boost::this_task::get_thread_pool< pool_type >().submit(
fn2) );
- return t1.get() + t2.get();
+ return t1.result().get() + t2.result().get();
}
}
public:
fibo(
- jss::shared_future< void > f,
+ boost::shared_future< void > f,
int offset)
: f_( f), offset_( offset)
{}
@@ -78,8 +95,8 @@
try
{
pool_type pool( tp::poolsize( 1) );
- jss::packaged_task< void > tsk( boost::bind( f) );
- jss::shared_future< void > f( tsk.get_future() );
+ boost::packaged_task< void > tsk( boost::bind( f) );
+ boost::shared_future< void > f( tsk.get_future() );
fibo fib( f, 3);
std::vector< tp::task< int > > results;
results.reserve( 40);
@@ -103,7 +120,7 @@
std::vector< tp::task< int > >::iterator i( results.begin() );
i != e;
++i)
- std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;
+ std::cout << "fibonacci " << k++ << " == " << i->result().get() << std::endl;
pt::ptime stop( pt::microsec_clock::universal_time() );
std::cout << ( stop - start).total_milliseconds() << " milli seconds" << std::endl;
Added: sandbox/threadpool/libs/tp/src/worker.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/worker.cpp 2009-03-04 17:52:43 EST (Wed, 04 Mar 2009)
@@ -0,0 +1,189 @@
+#include "boost/tp/detail/worker.hpp"
+
+namespace boost {
+namespace tp {
+namespace detail
+{
+thread_specific_ptr< worker > worker::tss_;
+
+worker::impl::impl( function< void() > const& fn)
+:
+thrd_( new thread( fn) ),
+wsq_(),
+shtdwn_sem_( 0),
+shtdwn_now_sem_( 0),
+shtdwn_( false),
+scns_( 0)
+{ BOOST_ASSERT( ! fn.empty() ); }
+
+const shared_ptr< thread >
+worker::impl::thrd() const
+{ return thrd_; }
+
+const thread::id
+worker::impl::get_id() const
+{ return thrd_->get_id(); }
+
+void
+worker::impl::join() const
+{ thrd_->join(); }
+
+void
+worker::impl::interrupt() const
+{ thrd_->interrupt(); }
+
+void
+worker::impl::put(
+ callable const& ca,
+ interrupter const& intr)
+{
+ BOOST_ASSERT( ! ca.empty() );
+ wsq_.put( std::make_pair( ca, intr) );
+}
+
+bool
+worker::impl::try_take(
+ callable & ca,
+ interrupter & intr)
+{
+ item itm;
+ bool result( wsq_.try_take( itm) );
+ if ( result)
+ {
+ ca = itm.first;
+ intr = itm.second;
+ }
+ return result;
+}
+
+bool
+worker::impl::try_steal(
+ callable & ca,
+ interrupter & intr)
+{
+ item itm;
+ bool result( wsq_.try_steal( itm) );
+ if ( result)
+ {
+ ca = itm.first;
+ intr = itm.second;
+ }
+ return result;
+}
+
+bool
+worker::impl::empty()
+{ return wsq_.empty(); }
+
+void
+worker::impl::signal_shutdown()
+{ shtdwn_sem_.post(); }
+
+void
+worker::impl::signal_shutdown_now()
+{ shtdwn_now_sem_.post(); }
+
+bool
+worker::impl::shutdown()
+{
+ if ( ! shtdwn_)
+ shtdwn_ = shtdwn_sem_.try_wait();
+ return shtdwn_;
+}
+
+bool
+worker::impl::shutdown_now()
+{ return shtdwn_now_sem_.try_wait(); }
+
+std::size_t
+worker::impl::scanns() const
+{ return scns_; }
+
+void
+worker::impl::increment_scanns()
+{ ++scns_; }
+
+void
+worker::impl::reset_scanns()
+{ scns_ = 0; }
+
+const shared_ptr< thread >
+worker::thrd() const
+{ return impl_->thrd(); }
+
+const thread::id
+worker::get_id() const
+{ return impl_->get_id(); }
+
+void
+worker::join() const
+{ impl_->join(); }
+
+void
+worker::interrupt() const
+{ impl_->interrupt(); }
+
+void
+worker::put(
+ callable const& ca,
+ interrupter const& intr)
+{ impl_->put( ca, intr); }
+
+bool
+worker::try_take(
+ callable & ca,
+ interrupter & intr)
+{ return impl_->try_take( ca, intr); }
+
+bool
+worker::try_steal(
+ callable & ca,
+ interrupter & intr)
+{ return impl_->try_steal( ca, intr); }
+
+bool
+worker::empty() const
+{ return impl_->empty(); }
+
+void
+worker::signal_shutdown()
+{ impl_->signal_shutdown(); }
+
+void
+worker::signal_shutdown_now()
+{ impl_->signal_shutdown_now(); }
+
+bool
+worker::shutdown()
+{ return impl_->shutdown(); }
+
+bool
+worker::shutdown_now()
+{ return impl_->shutdown_now(); }
+
+std::size_t
+worker::scanns() const
+{ return impl_->scanns(); }
+
+void
+worker::increment_scanns()
+{ impl_->increment_scanns(); }
+
+void
+worker::reset_scanns()
+{ impl_->reset_scanns(); }
+
+void
+worker::reschedule_until( function< bool() > const& pred)
+{ return impl_->reschedule_until( pred); }
+
+worker *
+worker::tss_get()
+{ return worker::tss_.get(); }
+
+void
+worker::tss_reset( worker * w)
+{ worker::tss_.reset( w); }
+
+} } }
+
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk