|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r54901 - in sandbox/task: boost/task boost/task/detail libs/task/doc libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-07-11 15:57:41
Author: olli
Date: 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
New Revision: 54901
URL: http://svn.boost.org/trac/boost/changeset/54901
Log:
* refactoring
Added:
sandbox/task/boost/task/detail/pool_base.hpp (contents, props changed)
Text files modified:
sandbox/task/boost/task/detail/worker.hpp | 399 +++++++++++++++++++--------------------
sandbox/task/boost/task/static_pool.hpp | 322 +-------------------------------
sandbox/task/libs/task/doc/overview.qbk | 2
sandbox/task/libs/task/doc/todo.qbk | 9
sandbox/task/libs/task/src/guard.cpp | 6
sandbox/task/libs/task/src/interrupter.cpp | 3
sandbox/task/libs/task/src/poolsize.cpp | 2
sandbox/task/libs/task/src/scanns.cpp | 3
sandbox/task/libs/task/src/semaphore_posix.cpp | 2
sandbox/task/libs/task/src/semaphore_windows.cpp | 2
sandbox/task/libs/task/src/watermark.cpp | 4
sandbox/task/libs/task/src/worker.cpp | 7
sandbox/task/libs/task/src/worker_group.cpp | 6
sandbox/task/libs/task/src/wsq.cpp | 7
14 files changed, 239 insertions(+), 535 deletions(-)
Added: sandbox/task/boost/task/detail/pool_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/detail/pool_base.hpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -0,0 +1,331 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_DETAIL_POOL_BASE_H
+#define BOOST_TASK_DETAIL_POOL_BASE_H
+
+#include <cstddef>
+
+#include <boost/assert.hpp>
+#include <boost/bind.hpp>
+#include <boost/config.hpp>
+#include <boost/cstdint.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/detail/move.hpp>
+
+#include <boost/task/detail/atomic.hpp>
+#include <boost/task/detail/bind_processor.hpp>
+#include <boost/task/detail/worker.hpp>
+#include <boost/task/detail/worker_group.hpp>
+
+#include <boost/task/callable.hpp>
+#include <boost/task/context.hpp>
+#include <boost/task/exceptions.hpp>
+#include <boost/task/future.hpp>
+#include <boost/task/handle.hpp>
+#include <boost/task/poolsize.hpp>
+#include <boost/task/scanns.hpp>
+#include <boost/task/task.hpp>
+#include <boost/task/watermark.hpp>
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost { namespace task {
+namespace detail
+{
+
+template< typename Channel >
+class pool_base
+{
+private:
+ friend class worker;
+
+ template< typename T, typename X >
+ friend class worker_object;
+
+ typedef Channel channel;
+ typedef typename channel::item channel_item;
+
+ worker_group wg_;
+ shared_mutex mtx_wg_;
+ volatile uint32_t state_;
+ channel channel_;
+ volatile uint32_t active_worker_;
+ volatile uint32_t idle_worker_;
+
+ void worker_entry_()
+ {
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ typename detail::worker_group::iterator i( wg_.find( this_thread::get_id() ) );
+ lk.unlock();
+ BOOST_ASSERT( i != wg_.end() );
+
+ worker w( * i);
+ w.run();
+ }
+
+ void create_worker_(
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns)
+ {
+ wg_.insert(
+ worker(
+ * this,
+ psize,
+ asleep,
+ max_scns,
+ boost::bind(
+ & pool_base::worker_entry_,
+ this) ) );
+ }
+
+# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
+ void worker_entry_( std::size_t n)
+ {
+ this_thread::bind_to_processor( n);
+ worker_entry_();
+ }
+
+ void create_worker_(
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns,
+ std::size_t n)
+ {
+ wg_.insert(
+ worker(
+ * this,
+ psize,
+ asleep,
+ max_scns,
+ boost::bind(
+ & pool_base::worker_entry_,
+ this,
+ n) ) );
+ }
+# endif
+
+ std::size_t active_() const
+ { return active_worker_; }
+
+ std::size_t idle_() const
+ { return size_() - active_(); }
+
+ std::size_t size_() const
+ { return wg_.size(); }
+
+ bool closed_() const
+ { return state_ > 0; }
+
+ bool close_()
+ { return atomic_fetch_add( & state_, 1) > 1; }
+
+public:
+ explicit pool_base(
+ poolsize const& psize,
+ posix_time::time_duration const& asleep = posix_time::microseconds( 10),
+ scanns const& max_scns = scanns( 20) )
+ :
+ wg_(),
+ mtx_wg_(),
+ state_( 0),
+ channel_(),
+ active_worker_( 0),
+ idle_worker_( 0)
+ {
+ if ( asleep.is_special() || asleep.is_negative() )
+ throw invalid_timeduration();
+ channel_.activate();
+ lock_guard< shared_mutex > lk( mtx_wg_);
+ for ( std::size_t i( 0); i < psize; ++i)
+ create_worker_( psize, asleep, max_scns);
+ }
+
+ explicit pool_base(
+ poolsize const& psize,
+ high_watermark const& hwm,
+ low_watermark const& lwm,
+ posix_time::time_duration const& asleep = posix_time::microseconds( 100),
+ scanns const& max_scns = scanns( 20) )
+ :
+ wg_(),
+ mtx_wg_(),
+ state_( 0),
+ channel_(
+ hwm,
+ lwm),
+ active_worker_( 0),
+ idle_worker_( 0)
+ {
+ if ( asleep.is_special() || asleep.is_negative() )
+ throw invalid_timeduration();
+ channel_.activate();
+ lock_guard< shared_mutex > lk( mtx_wg_);
+ for ( std::size_t i( 0); i < psize; ++i)
+ create_worker_( psize, asleep, max_scns);
+ }
+
+# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
+ explicit pool_base(
+ posix_time::time_duration const& asleep = posix_time::microseconds( 10),
+ scanns const& max_scns = scanns( 20) )
+ :
+ wg_(),
+ mtx_wg_(),
+ state_( 0),
+ channel_(),
+ active_worker_( 0),
+ idle_worker_( 0)
+ {
+ if ( asleep.is_special() || asleep.is_negative() )
+ throw invalid_timeduration();
+ poolsize psize( thread::hardware_concurrency() );
+ BOOST_ASSERT( psize > 0);
+ channel_.activate();
+ lock_guard< shared_mutex > lk( mtx_wg_);
+ for ( std::size_t i( 0); i < psize; ++i)
+ create_worker_( psize, asleep, max_scns, i);
+ }
+
+ explicit pool_base(
+ high_watermark const& hwm,
+ low_watermark const& lwm,
+ posix_time::time_duration const& asleep = posix_time::microseconds( 100),
+ scanns const& max_scns = scanns( 20) )
+ :
+ wg_(),
+ mtx_wg_(),
+ state_( 0),
+ channel_(
+ hwm,
+ lwm),
+ active_worker_( 0),
+ idle_worker_( 0)
+ {
+ if ( asleep.is_special() || asleep.is_negative() )
+ throw invalid_timeduration();
+ poolsize psize( thread::hardware_concurrency() );
+ BOOST_ASSERT( psize > 0);
+ channel_.activate();
+ lock_guard< shared_mutex > lk( mtx_wg_);
+ for ( std::size_t i( 0); i < psize; ++i)
+ create_worker_( psize, asleep, max_scns, i);
+ }
+# endif
+
+ ~pool_base()
+ { shutdown(); }
+
+ std::size_t active()
+ {
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ return active_();
+ }
+
+ std::size_t idle()
+ {
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ return idle_();
+ }
+
+ void interrupt_all_worker()
+ {
+ if ( closed_() ) return;
+
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ wg_.interrupt_all();
+ }
+
+ void shutdown()
+ {
+ if ( closed_() || close_() ) return;
+
+ channel_.deactivate();
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ wg_.signal_shutdown_all();
+ wg_.join_all();
+ }
+
+ const void shutdown_now()
+ {
+ if ( closed_() || close_() ) return;
+
+ channel_.deactivate_now();
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ wg_.signal_shutdown_now_all();
+ wg_.interrupt_all();
+ wg_.join_all();
+ }
+
+ std::size_t size()
+ {
+ shared_lock< shared_mutex > lk( mtx_wg_);
+ return size_();
+ }
+
+ bool closed()
+ { return closed_(); }
+
+ void clear()
+ { channel_.clear(); }
+
+ bool empty()
+ { return channel_.empty(); }
+
+ std::size_t pending()
+ { return channel_.size(); }
+
+ std::size_t upper_bound()
+ { return channel_.upper_bound(); }
+
+ void upper_bound( high_watermark const& hwm)
+ { channel_.upper_bound( hwm); }
+
+ std::size_t lower_bound()
+ { return channel_.lower_bound(); }
+
+ void lower_bound( low_watermark const lwm)
+ { channel_.lower_bound( lwm); }
+
+ template< typename R >
+ handle< R > submit( task< R > t)
+ {
+ if ( closed_() )
+ throw task_rejected("pool is closed");
+
+ shared_future< R > f( t.get_future() );
+ context ctx;
+ handle< R > h( ctx.get_handle( f) );
+ channel_.put(
+ ctx.get_callable( boost::move( t) ) );
+ return h;
+ }
+
+ template< typename R, typename Attr >
+ handle< R > submit( task< R > t, Attr const& attr)
+ {
+ if ( closed_() )
+ throw task_rejected("pool is closed");
+
+ shared_future< R > f( t.get_future() );
+ context ctx;
+ handle< R > h( ctx.get_handle( f) );
+ channel_.put(
+ channel_item(
+ ctx.get_callable( boost::move( t) ),
+ attr) );
+ return h;
+ }
+};
+
+}}}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif // BOOST_TASK_DETAIL_POOL_BASE_H
+
Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -33,259 +33,254 @@
# pragma warning(disable:4251 4275)
# endif
-namespace boost { namespace task
-{
-
-template< typename Channel >
-class static_pool;
-
+namespace boost { namespace task {
namespace detail
{
-class BOOST_TASK_DECL worker
+struct worker_base
{
-private:
- template< typename Channel >
- friend class static_pool;
+ virtual ~worker_base() {}
- static thread_specific_ptr< worker > tss_;
+ virtual const thread::id get_id() const = 0;
- struct worker_base
- {
- virtual ~worker_base() {}
+ virtual void join() const = 0;
- virtual const thread::id get_id() const = 0;
+ virtual void interrupt() const = 0;
- virtual void join() const = 0;
+ virtual void put( callable const&) = 0;
- virtual void interrupt() const = 0;
+ virtual bool try_take( callable &) = 0;
- virtual void put( callable const&) = 0;
+ virtual bool try_steal( callable &) = 0;
- virtual bool try_take( callable &) = 0;
+ virtual void signal_shutdown() = 0;
- virtual bool try_steal( callable &) = 0;
+ virtual void signal_shutdown_now() = 0;
- virtual void signal_shutdown() = 0;
+ virtual void reschedule_until( function< bool() > const&) = 0;
- virtual void signal_shutdown_now() = 0;
+ virtual void run() = 0;
+};
+
+template<
+ typename Pool,
+ typename Worker
+>
+class worker_object : public worker_base,
+ private noncopyable
+{
+private:
+ class random_idx
+ {
+ private:
+ rand48 rng_;
+ uniform_int<> six_;
+ variate_generator< rand48 &, uniform_int<> > die_;
- virtual void reschedule_until( function< bool() > const&) = 0;
+ public:
+ random_idx( std::size_t size)
+ :
+ rng_(),
+ six_( 0, size - 1),
+ die_( rng_, six_)
+ {}
- virtual void run() = 0;
+ std::size_t operator()()
+ { return die_(); }
};
- template< typename Pool >
- class worker_object : public worker_base,
- private noncopyable
- {
- private:
- class random_idx
- {
- private:
- rand48 rng_;
- uniform_int<> six_;
- variate_generator< rand48 &, uniform_int<> > die_;
-
- public:
- random_idx( std::size_t size)
- :
- rng_(),
- six_( 0, size - 1),
- die_( rng_, six_)
- {}
-
- std::size_t operator()()
- { return die_(); }
- };
-
- Pool & pool_;
- shared_ptr< thread > thrd_;
- wsq wsq_;
- semaphore shtdwn_sem_;
- semaphore shtdwn_now_sem_;
- bool shtdwn_;
- posix_time::time_duration asleep_;
- scanns max_scns_;
- std::size_t scns_;
- random_idx rnd_idx_;
+ Pool & pool_;
+ shared_ptr< thread > thrd_;
+ wsq wsq_;
+ semaphore shtdwn_sem_;
+ semaphore shtdwn_now_sem_;
+ bool shtdwn_;
+ posix_time::time_duration asleep_;
+ scanns max_scns_;
+ std::size_t scns_;
+ random_idx rnd_idx_;
- void execute_( callable & ca)
+ void execute_( callable & ca)
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ guard grd( get_pool().active_worker_);
{
- BOOST_ASSERT( ! ca.empty() );
- guard grd( get_pool().active_worker_);
- {
- context_guard lk( ca, thrd_);
- ca();
- }
- ca.clear();
- BOOST_ASSERT( ca.empty() );
+ context_guard lk( ca, thrd_);
+ ca();
}
+ ca.clear();
+ BOOST_ASSERT( ca.empty() );
+ }
- void next_callable_( callable & ca)
+ void next_callable_( callable & ca)
+ {
+ if ( ! try_take( ca) )
{
- if ( ! try_take( ca) )
+ if ( ! get_pool().channel_.try_take( ca) )
{
- if ( ! get_pool().channel_.try_take( ca) )
+ std::size_t idx( rnd_idx_() );
+ for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
{
- std::size_t idx( rnd_idx_() );
- for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
- {
- worker other( get_pool().wg_[idx]);
- if ( this_thread::get_id() == other.get_id() ) continue;
- if ( ++idx >= get_pool().wg_.size() ) idx = 0;
- if ( other.try_steal( ca) ) break;
- }
+ Worker other( get_pool().wg_[idx]);
+ if ( this_thread::get_id() == other.get_id() ) continue;
+ if ( ++idx >= get_pool().wg_.size() ) idx = 0;
+ if ( other.try_steal( ca) ) break;
+ }
- if ( ca.empty() )
+ if ( ca.empty() )
+ {
+ guard grd( get_pool().idle_worker_);
+ if ( shutdown_() ) return;
+ ++scns_;
+ if ( scns_ >= max_scns_)
{
- guard grd( get_pool().idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- if ( get_pool().size_() == get_pool().idle_worker_)
- get_pool().channel_.take( ca, asleep_);
- else
- this_thread::sleep( asleep_);
- scns_ = 0;
- }
+ if ( get_pool().size_() == get_pool().idle_worker_)
+ get_pool().channel_.take( ca, asleep_);
else
- this_thread::yield();
+ this_thread::sleep( asleep_);
+ scns_ = 0;
}
+ else
+ this_thread::yield();
}
}
}
+ }
- void next_local_callable_( callable & ca)
+ void next_local_callable_( callable & ca)
+ {
+ if ( ! try_take( ca) )
{
- if ( ! try_take( ca) )
+ guard grd( get_pool().idle_worker_);
+ if ( shutdown_() ) return;
+ ++scns_;
+ if ( scns_ >= max_scns_)
{
- guard grd( get_pool().idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- this_thread::sleep( asleep_);
- scns_ = 0;
- }
- else
- this_thread::yield();
+ this_thread::sleep( asleep_);
+ scns_ = 0;
}
+ else
+ this_thread::yield();
}
+ }
- bool shutdown_()
- {
- if ( shutdown__() && get_pool().channel_.empty() )
- return true;
- else if ( shutdown_now__() )
- return true;
- return false;
- }
+ bool shutdown_()
+ {
+ if ( shutdown__() && get_pool().channel_.empty() )
+ return true;
+ else if ( shutdown_now__() )
+ return true;
+ return false;
+ }
- bool shutdown__()
- {
- if ( ! shtdwn_)
- shtdwn_ = shtdwn_sem_.try_wait();
- return shtdwn_;
- }
-
- bool shutdown_now__()
- { return shtdwn_now_sem_.try_wait(); }
+ bool shutdown__()
+ {
+ if ( ! shtdwn_)
+ shtdwn_ = shtdwn_sem_.try_wait();
+ return shtdwn_;
+ }
+
+ bool shutdown_now__()
+ { return shtdwn_now_sem_.try_wait(); }
- public:
- worker_object(
- Pool & pool,
- poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
- function< void() > const& fn)
- :
- pool_( pool),
- thrd_( new thread( fn) ),
- wsq_(),
- shtdwn_sem_( 0),
- shtdwn_now_sem_( 0),
- shtdwn_( false),
- asleep_( asleep),
- max_scns_( max_scns),
- scns_( 0),
- rnd_idx_( psize)
- { BOOST_ASSERT( ! fn.empty() ); }
-
- const thread::id get_id() const
- { return thrd_->get_id(); }
-
- void join() const
- { thrd_->join(); }
-
- void
- interrupt() const
- { thrd_->interrupt(); }
-
- void signal_shutdown()
- { shtdwn_sem_.post(); }
-
- void signal_shutdown_now()
- { shtdwn_now_sem_.post(); }
+public:
+ worker_object(
+ Pool & pool,
+ poolsize const& psize,
+ posix_time::time_duration const& asleep,
+ scanns const& max_scns,
+ function< void() > const& fn)
+ :
+ pool_( pool),
+ thrd_( new thread( fn) ),
+ wsq_(),
+ shtdwn_sem_( 0),
+ shtdwn_now_sem_( 0),
+ shtdwn_( false),
+ asleep_( asleep),
+ max_scns_( max_scns),
+ scns_( 0),
+ rnd_idx_( psize)
+ { BOOST_ASSERT( ! fn.empty() ); }
+
+ const thread::id get_id() const
+ { return thrd_->get_id(); }
+
+ void join() const
+ { thrd_->join(); }
+
+ void
+ interrupt() const
+ { thrd_->interrupt(); }
+
+ void signal_shutdown()
+ { shtdwn_sem_.post(); }
+
+ void signal_shutdown_now()
+ { shtdwn_now_sem_.post(); }
- void put( callable const& ca)
- {
- BOOST_ASSERT( ! ca.empty() );
- wsq_.put( ca);
- }
+ void put( callable const& ca)
+ {
+ BOOST_ASSERT( ! ca.empty() );
+ wsq_.put( ca);
+ }
- bool try_take( callable & ca)
- {
- callable tmp;
- bool result( wsq_.try_take( tmp) );
- if ( result)
- ca = tmp;
- return result;
- }
-
- bool try_steal( callable & ca)
- {
- callable tmp;
- bool result( wsq_.try_steal( tmp) );
- if ( result)
- ca = tmp;
- return result;
- }
+ bool try_take( callable & ca)
+ {
+ callable tmp;
+ bool result( wsq_.try_take( tmp) );
+ if ( result)
+ ca = tmp;
+ return result;
+ }
+
+ bool try_steal( callable & ca)
+ {
+ callable tmp;
+ bool result( wsq_.try_steal( tmp) );
+ if ( result)
+ ca = tmp;
+ return result;
+ }
- Pool & get_pool() const
- { return pool_; }
+ Pool & get_pool() const
+ { return pool_; }
- void run()
- {
- BOOST_ASSERT( get_id() == this_thread::get_id() );
+ void run()
+ {
+ BOOST_ASSERT( get_id() == this_thread::get_id() );
- callable ca;
- while ( ! shutdown_() )
+ callable ca;
+ while ( ! shutdown_() )
+ {
+ next_callable_( ca);
+ if( ! ca.empty() )
{
- next_callable_( ca);
- if( ! ca.empty() )
- {
- execute_( ca);
- scns_ = 0;
- }
+ execute_( ca);
+ scns_ = 0;
}
}
+ }
- void reschedule_until( function< bool() > const& pred)
+ void reschedule_until( function< bool() > const& pred)
+ {
+ callable ca;
+ while ( ! pred() )
{
- callable ca;
- while ( ! pred() )
+ next_local_callable_( ca);
+ if( ! ca.empty() )
{
- next_local_callable_( ca);
- if( ! ca.empty() )
- {
- execute_( ca);
- scns_ = 0;
- }
+ execute_( ca);
+ scns_ = 0;
}
}
- };
+ }
+};
+
+class BOOST_TASK_DECL worker
+{
+private:
+ static thread_specific_ptr< worker > tss_;
shared_ptr< worker_base > impl_;
@@ -299,7 +294,7 @@
function< void() > const& fn)
:
impl_(
- new worker_object< Pool >(
+ new worker_object< Pool, worker >(
pool,
psize,
asleep,
@@ -323,7 +318,7 @@
template< typename Pool >
Pool & get_pool() const
{
- worker_object< Pool > * p( dynamic_cast< worker_object< Pool > * >( impl_.get() ) );
+ worker_object< Pool, worker > * p( dynamic_cast< worker_object< Pool, worker > * >( impl_.get() ) );
BOOST_ASSERT( p);
return p->get_pool();
}
Modified: sandbox/task/boost/task/static_pool.hpp
==============================================================================
--- sandbox/task/boost/task/static_pool.hpp (original)
+++ sandbox/task/boost/task/static_pool.hpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -8,26 +8,16 @@
#define BOOST_TASK_STATIC_POOL_H
#include <cstddef>
-#include <utility>
-#include <vector>
-#include <boost/assert.hpp>
-#include <boost/bind.hpp>
#include <boost/config.hpp>
-#include <boost/cstdint.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/function.hpp>
-#include <boost/thread.hpp>
#include <boost/thread/detail/move.hpp>
-#include <boost/task/callable.hpp>
-#include <boost/task/context.hpp>
-#include <boost/task/detail/atomic.hpp>
#include <boost/task/detail/bind_processor.hpp>
-#include <boost/task/detail/worker.hpp>
+#include <boost/task/detail/pool_base.hpp>
+
#include <boost/task/detail/worker_group.hpp>
#include <boost/task/exceptions.hpp>
-#include <boost/task/future.hpp>
#include <boost/task/handle.hpp>
#include <boost/task/poolsize.hpp>
#include <boost/task/scanns.hpp>
@@ -45,305 +35,14 @@
typedef Channel channel;
private:
- template< typename Pool >
- friend struct has_attribute;
-
- template< typename Pool >
- friend struct attribute_type;
-
- friend class detail::worker;
+ template< typename T, typename X >
+ friend class detail::worker_object;
- template< typename T >
- friend class detail::worker::worker_object;
-
- typedef typename channel::item channel_item;
-
# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
struct tag_bind_to_processors {};
# endif
- class pool_base
- {
- private:
- friend class detail::worker;
-
- template< typename T >
- friend class detail::worker::worker_object;
-
- detail::worker_group wg_;
- shared_mutex mtx_wg_;
- volatile uint32_t state_;
- channel channel_;
- volatile uint32_t active_worker_;
- volatile uint32_t idle_worker_;
-
- void worker_entry_()
- {
- shared_lock< shared_mutex > lk( mtx_wg_);
- typename detail::worker_group::iterator i( wg_.find( this_thread::get_id() ) );
- lk.unlock();
- BOOST_ASSERT( i != wg_.end() );
-
- detail::worker w( * i);
- w.run();
- }
-
- void create_worker_(
- poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns)
- {
- wg_.insert(
- detail::worker(
- * this,
- psize,
- asleep,
- max_scns,
- boost::bind(
- & pool_base::worker_entry_,
- this) ) );
- }
-
-# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
- void worker_entry_( std::size_t n)
- {
- this_thread::bind_to_processor( n);
- worker_entry_();
- }
-
- void create_worker_(
- poolsize const& psize,
- posix_time::time_duration const& asleep,
- scanns const& max_scns,
- std::size_t n)
- {
- wg_.insert(
- detail::worker(
- * this,
- psize,
- asleep,
- max_scns,
- boost::bind(
- & pool_base::worker_entry_,
- this,
- n) ) );
- }
-# endif
-
- std::size_t active_() const
- { return active_worker_; }
-
- std::size_t idle_() const
- { return size_() - active_(); }
-
- std::size_t size_() const
- { return wg_.size(); }
-
- bool closed_() const
- { return state_ > 0; }
-
- bool close_()
- { return detail::atomic_fetch_add( & state_, 1) > 1; }
-
- public:
- explicit pool_base(
- poolsize const& psize,
- posix_time::time_duration const& asleep = posix_time::microseconds( 10),
- scanns const& max_scns = scanns( 20) )
- :
- wg_(),
- mtx_wg_(),
- state_( 0),
- channel_(),
- active_worker_( 0),
- idle_worker_( 0)
- {
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
- channel_.activate();
- lock_guard< shared_mutex > lk( mtx_wg_);
- for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns);
- }
-
- explicit pool_base(
- poolsize const& psize,
- high_watermark const& hwm,
- low_watermark const& lwm,
- posix_time::time_duration const& asleep = posix_time::microseconds( 100),
- scanns const& max_scns = scanns( 20) )
- :
- wg_(),
- mtx_wg_(),
- state_( 0),
- channel_(
- hwm,
- lwm),
- active_worker_( 0),
- idle_worker_( 0)
- {
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
- channel_.activate();
- lock_guard< shared_mutex > lk( mtx_wg_);
- for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns);
- }
-
-# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
- explicit pool_base(
- posix_time::time_duration const& asleep = posix_time::microseconds( 10),
- scanns const& max_scns = scanns( 20) )
- :
- wg_(),
- mtx_wg_(),
- state_( 0),
- channel_(),
- active_worker_( 0),
- idle_worker_( 0)
- {
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
- poolsize psize( thread::hardware_concurrency() );
- BOOST_ASSERT( psize > 0);
- channel_.activate();
- lock_guard< shared_mutex > lk( mtx_wg_);
- for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns, i);
- }
-
- explicit pool_base(
- high_watermark const& hwm,
- low_watermark const& lwm,
- posix_time::time_duration const& asleep = posix_time::microseconds( 100),
- scanns const& max_scns = scanns( 20) )
- :
- wg_(),
- mtx_wg_(),
- state_( 0),
- channel_(
- hwm,
- lwm),
- active_worker_( 0),
- idle_worker_( 0)
- {
- if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration();
- poolsize psize( thread::hardware_concurrency() );
- BOOST_ASSERT( psize > 0);
- channel_.activate();
- lock_guard< shared_mutex > lk( mtx_wg_);
- for ( std::size_t i( 0); i < psize; ++i)
- create_worker_( psize, asleep, max_scns, i);
- }
-# endif
-
- ~pool_base()
- { shutdown(); }
-
- std::size_t active()
- {
- shared_lock< shared_mutex > lk( mtx_wg_);
- return active_();
- }
-
- std::size_t idle()
- {
- shared_lock< shared_mutex > lk( mtx_wg_);
- return idle_();
- }
-
- void interrupt_all_worker()
- {
- if ( closed_() ) return;
-
- shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.interrupt_all();
- }
-
- void shutdown()
- {
- if ( closed_() || close_() ) return;
-
- channel_.deactivate();
- shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.signal_shutdown_all();
- wg_.join_all();
- }
-
- const void shutdown_now()
- {
- if ( closed_() || close_() ) return;
-
- channel_.deactivate_now();
- shared_lock< shared_mutex > lk( mtx_wg_);
- wg_.signal_shutdown_now_all();
- wg_.interrupt_all();
- wg_.join_all();
- }
-
- std::size_t size()
- {
- shared_lock< shared_mutex > lk( mtx_wg_);
- return size_();
- }
-
- bool closed()
- { return closed_(); }
-
- void clear()
- { channel_.clear(); }
-
- bool empty()
- { return channel_.empty(); }
-
- std::size_t pending()
- { return channel_.size(); }
-
- std::size_t upper_bound()
- { return channel_.upper_bound(); }
-
- void upper_bound( high_watermark const& hwm)
- { channel_.upper_bound( hwm); }
-
- std::size_t lower_bound()
- { return channel_.lower_bound(); }
-
- void lower_bound( low_watermark const lwm)
- { channel_.lower_bound( lwm); }
-
- template< typename R >
- handle< R > submit( task< R > t)
- {
- if ( closed_() )
- throw task_rejected("pool is closed");
-
- shared_future< R > f( t.get_future() );
- context ctx;
- handle< R > h( ctx.get_handle( f) );
- channel_.put(
- ctx.get_callable( boost::move( t) ) );
- return h;
- }
-
- template< typename R, typename Attr >
- handle< R > submit( task< R > t, Attr const& attr)
- {
- if ( closed_() )
- throw task_rejected("pool is closed");
-
- shared_future< R > f( t.get_future() );
- context ctx;
- handle< R > h( ctx.get_handle( f) );
- channel_.put(
- channel_item(
- ctx.get_callable( boost::move( t) ),
- attr) );
- return h;
- }
- };
-
- shared_ptr< pool_base > pool_;
+ shared_ptr< detail::pool_base< Channel > > pool_;
static_pool( static_pool &);
static_pool & operator=( static_pool &);
@@ -357,7 +56,7 @@
poolsize const& psize,
posix_time::time_duration const& asleep = posix_time::microseconds( 10),
scanns const& max_scns = scanns( 20) )
- : pool_( new pool_base( psize, asleep, max_scns) )
+ : pool_( new detail::pool_base< Channel >( psize, asleep, max_scns) )
{}
explicit static_pool(
@@ -366,7 +65,7 @@
low_watermark const& lwm,
posix_time::time_duration const& asleep = posix_time::microseconds( 100),
scanns const& max_scns = scanns( 20) )
- : pool_( new pool_base( psize, hwm, lwm, asleep, max_scns) )
+ : pool_( new detail::pool_base< Channel >( psize, hwm, lwm, asleep, max_scns) )
{}
# if defined(BOOST_HAS_PROCESSOR_BINDINGS)
@@ -374,7 +73,7 @@
tag_bind_to_processors,
posix_time::time_duration const& asleep = posix_time::microseconds( 10),
scanns const& max_scns = scanns( 20) )
- : pool_( new pool_base( asleep, max_scns) )
+ : pool_( new detail::pool_base< Channel >( asleep, max_scns) )
{}
explicit static_pool(
@@ -383,7 +82,7 @@
low_watermark const& lwm,
posix_time::time_duration const& asleep = posix_time::microseconds( 100),
scanns const& max_scns = scanns( 20) )
- : pool_( new pool_base( hwm, lwm, asleep, max_scns) )
+ : pool_( new detail::pool_base< Channel >( hwm, lwm, asleep, max_scns) )
{}
static tag_bind_to_processors bind_to_processors()
@@ -540,7 +239,7 @@
return pool_->submit( boost::move( t), attr);
}
- typedef typename shared_ptr< pool_base >::unspecified_bool_type unspecified_bool_type;
+ typedef typename shared_ptr< detail::pool_base< Channel > >::unspecified_bool_type unspecified_bool_type;
operator unspecified_bool_type() const // throw()
{ return pool_; }
@@ -566,6 +265,7 @@
task::static_pool< Channel > move( boost::detail::thread_move_t< task::static_pool< Channel > > t)
{ return task::static_pool< Channel >( t); }
# endif
+
}
#include <boost/config/abi_suffix.hpp>
Modified: sandbox/task/libs/task/doc/overview.qbk
==============================================================================
--- sandbox/task/libs/task/doc/overview.qbk (original)
+++ sandbox/task/libs/task/doc/overview.qbk 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -133,6 +133,8 @@
[note __boost_task__ uses __boost_future__ from Anthony Williams (will be integrated in some of the next releases of __boost_thread__).]
+[warning This library is NOT an official Boost library]
+
[note Please note that __boost_task__ is not optimized yet.]
Modified: sandbox/task/libs/task/doc/todo.qbk
==============================================================================
--- sandbox/task/libs/task/doc/todo.qbk (original)
+++ sandbox/task/libs/task/doc/todo.qbk 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -39,16 +39,15 @@
[heading Interdepended task]
-* With special support of __boost_fiber__ interdepended tasks (using communication and synchronisation abstractions above) work in
-__thread_pools__ without deadlocking the pool.
+* integration of __boost_fiber__ - interdepended tasks (using communication and synchronisation abstractions above) work in
+__thread_pools__ without deadlocking the pool
[heading Optimizations]
-* two-lock-queue as global queue in __thread_pool__
-
-* maybe lock-free-queue as global queue too (how to provide the scheduling policies fifo, priority, smart?)
+* finer-grainer bounded_channel and unbounded_channel using two-lock-queue
+* lock-free-queue with fifo ordering as channel
[endsect]
Modified: sandbox/task/libs/task/src/guard.cpp
==============================================================================
--- sandbox/task/libs/task/src/guard.cpp (original)
+++ sandbox/task/libs/task/src/guard.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,10 +10,10 @@
#include <boost/task/detail/atomic.hpp>
-namespace boost { namespace task
-{
+namespace boost { namespace task {
namespace detail
{
+
guard::guard( volatile uint32_t & active_worker)
: active_worker_( active_worker)
{
@@ -26,5 +26,5 @@
atomic_fetch_sub( & active_worker_, 1);
BOOST_ASSERT( active_worker_ >= 0);
}
-} } }
+}}}
Modified: sandbox/task/libs/task/src/interrupter.cpp
==============================================================================
--- sandbox/task/libs/task/src/interrupter.cpp (original)
+++ sandbox/task/libs/task/src/interrupter.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -8,8 +8,7 @@
#include <boost/assert.hpp>
-namespace boost { namespace task
-{
+namespace boost { namespace task {
namespace detail
{
Modified: sandbox/task/libs/task/src/poolsize.cpp
==============================================================================
--- sandbox/task/libs/task/src/poolsize.cpp (original)
+++ sandbox/task/libs/task/src/poolsize.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,11 +10,13 @@
namespace boost { namespace task
{
+
poolsize::poolsize( std::size_t value)
: value_( value)
{ if ( value <= 0) throw invalid_poolsize(); }
poolsize::operator std::size_t () const
{ return value_; }
+
} }
Modified: sandbox/task/libs/task/src/scanns.cpp
==============================================================================
--- sandbox/task/libs/task/src/scanns.cpp (original)
+++ sandbox/task/libs/task/src/scanns.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,11 +10,12 @@
namespace boost { namespace task
{
+
scanns::scanns( std::size_t value)
: value_( value)
{ if ( value < 0) throw invalid_scanns(); }
scanns::operator std::size_t () const
{ return value_; }
-} }
+}}
Modified: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_posix.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -15,6 +15,7 @@
namespace boost { namespace task
{
+
semaphore::semaphore( int value)
: handle_()
{
@@ -81,4 +82,5 @@
throw system::system_error( errno, system::system_category);
return value;
}
+
}}
Modified: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_windows.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -16,6 +16,7 @@
namespace boost { namespace task
{
+
semaphore::semaphore( int value)
: handle_()
{
@@ -77,4 +78,5 @@
}
return value;
}
+
}}
Modified: sandbox/task/libs/task/src/watermark.cpp
==============================================================================
--- sandbox/task/libs/task/src/watermark.cpp (original)
+++ sandbox/task/libs/task/src/watermark.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,6 +10,7 @@
namespace boost { namespace task
{
+
high_watermark::high_watermark( std::size_t value)
: value_( value)
{
@@ -29,4 +30,5 @@
low_watermark::operator std::size_t () const
{ return value_; }
-} }
+
+}}
Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp (original)
+++ sandbox/task/libs/task/src/worker.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -6,10 +6,10 @@
#include "boost/task/detail/worker.hpp"
-namespace boost { namespace task
-{
+namespace boost { namespace task {
namespace detail
{
+
thread_specific_ptr< worker > worker::tss_;
const thread::id
@@ -51,6 +51,7 @@
void
worker::run()
{
+ // FIXME: ugly
worker::tss_.reset( new worker( * this) );
impl_->run();
}
@@ -58,5 +59,5 @@
worker *
worker::tss_get()
{ return worker::tss_.get(); }
-} } }
+}}}
Modified: sandbox/task/libs/task/src/worker_group.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker_group.cpp (original)
+++ sandbox/task/libs/task/src/worker_group.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -9,10 +9,10 @@
#include <boost/foreach.hpp>
#include <boost/utility.hpp>
-namespace boost { namespace task
-{
+namespace boost { namespace task {
namespace detail
{
+
worker_group::worker_group()
:
cont_(),
@@ -99,5 +99,5 @@
BOOST_FOREACH( worker w, cont_)
{ w.signal_shutdown_now(); }
}
-} } }
+}}}
Modified: sandbox/task/libs/task/src/wsq.cpp
==============================================================================
--- sandbox/task/libs/task/src/wsq.cpp (original)
+++ sandbox/task/libs/task/src/wsq.cpp 2009-07-11 15:57:39 EDT (Sat, 11 Jul 2009)
@@ -10,10 +10,10 @@
#include <boost/task/detail/atomic.hpp>
-namespace boost { namespace task
-{
+namespace boost { namespace task {
namespace detail
{
+
wsq::wsq()
:
initial_size_( 32),
@@ -114,6 +114,5 @@
}
return false;
}
-} } }
-
+}}}
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk