|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r51476 - in sandbox/threadpool/boost/tp: . detail
From: oliver.kowalke_at_[hidden]
Date: 2009-02-27 16:04:09
Author: olli
Date: 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
New Revision: 51476
URL: http://svn.boost.org/trac/boost/changeset/51476
Log:
* rescheduling of current task until condition evaluates to true
* boost::this_task namespace introduced
* boost::this_task::reschedule_until() in order to synchronize with other sources
Added:
sandbox/threadpool/boost/tp/detail/pool_base.hpp (contents, props changed)
sandbox/threadpool/boost/tp/detail/worker_base.hpp (contents, props changed)
Text files modified:
sandbox/threadpool/boost/tp/pool.hpp | 117 +++++++++++++++++++++++++--------------
1 files changed, 74 insertions(+), 43 deletions(-)
Added: sandbox/threadpool/boost/tp/detail/pool_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/pool_base.hpp 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -0,0 +1,27 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_POOL_BASE_H
+#define BOOST_TP_DETAIL_POOL_BASE_H
+
+#include <boost/thread/tss.hpp>
+
+#include <boost/tp/detail/worker_base.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct pool_base
+{
+ static thread_specific_ptr< worker_base > tss_worker_;
+};
+
+thread_specific_ptr< worker_base >
+pool_base::tss_worker_;
+
+} } }
+
+#endif // BOOST_TP_DETAIL_POOL_BASE_H
+
Added: sandbox/threadpool/boost/tp/detail/worker_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker_base.hpp 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -0,0 +1,67 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_BASE_H
+#define BOOST_TP_DETAIL_WORKER_BASE_H
+
+#include <cstddef>
+
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct worker_base
+{
+ virtual ~worker_base() {}
+
+ virtual const shared_ptr< thread > thrd() const = 0;
+
+ virtual const thread::id get_id() const = 0;
+
+ virtual void join() const = 0;
+
+ virtual void interrupt() const = 0;
+
+ virtual void put(
+ callable const& ca,
+ interrupter const& intr) = 0;
+
+ virtual bool try_take(
+ callable & ca,
+ interrupter & intr) = 0;
+
+ virtual bool try_steal(
+ callable & ca,
+ interrupter & intr) = 0;
+
+ virtual bool empty() const = 0;
+
+ virtual void signal_shutdown() = 0;
+
+ virtual void signal_shutdown_now() = 0;
+
+ virtual bool shutdown() = 0;
+
+ virtual bool shutdown_now() = 0;
+
+ virtual std::size_t scanns() const = 0;
+
+ virtual void increment_scanns() = 0;
+
+ virtual void reset_scanns() = 0;
+
+ virtual void reschedule_until( function< bool() > const&) = 0;
+};
+
+}}}
+
+#endif // BOOST_TP_DETAIL_WORKER_BASE_H
+
Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -40,6 +40,8 @@
#ifdef BOOST_BIND_WORKER_TO_PROCESSORS
#include <boost/tp/detail/bind_processor.hpp>
#endif
+#include <boost/tp/detail/pool_base.hpp>
+#include <boost/tp/detail/worker_base.hpp>
#include <boost/tp/detail/wsq.hpp>
#include <boost/tp/exceptions.hpp>
#include <boost/tp/poolsize.hpp>
@@ -61,8 +63,8 @@
namespace tp
{
template< typename Channel >
-class pool
-: private noncopyable
+class pool : private noncopyable,
+ private detail::pool_base
{
private:
typedef Channel channel;
@@ -75,7 +77,7 @@
terminated_state
};
- class worker
+ class worker : public detail::worker_base
{
private:
class wimpl : private noncopyable
@@ -83,6 +85,7 @@
private:
typedef std::pair< detail::callable, detail::interrupter > item;
+ pool * pool_ptr_;
shared_ptr< thread > thrd_;
detail::wsq< item > wsq_;
interprocess::interprocess_semaphore shtdwn_sem_;
@@ -91,8 +94,11 @@
std::size_t scns_;
public:
- wimpl( function< void() > const& fn)
+ wimpl(
+ pool * pool_ptr,
+ function< void() > const& fn)
:
+ pool_ptr_( pool_ptr),
thrd_( new thread( fn) ),
wsq_(),
shtdwn_sem_( 0),
@@ -176,13 +182,18 @@
void reset_scanns()
{ scns_ = 0; }
+
+ void reschedule_until( function< bool() > const& cond)
+ { pool_ptr_->reschedule_until_( cond); }
};
shared_ptr< wimpl > impl_;
public:
- worker( function< void() > const& fn)
- : impl_( new wimpl( fn) )
+ worker(
+ pool * pool_ptr,
+ function< void() > const& fn)
+ : impl_( new wimpl( pool_ptr, fn) )
{}
const shared_ptr< thread > thrd() const
@@ -235,6 +246,9 @@
void reset_scanns()
{ impl_->reset_scanns(); }
+
+ void reschedule_until( function< bool() > const& cond)
+ { impl_->reschedule_until( cond); }
};
struct id_idx_tag {};
@@ -279,10 +293,10 @@
{ return die_(); }
};
+ static thread_specific_ptr< random_idx > tss_rnd_idx_;
+
worker_list worker_;
shared_mutex mtx_worker_;
- thread_specific_ptr< worker > tss_worker_;
- thread_specific_ptr< random_idx > tss_rnd_idx_;
state state_;
shared_mutex mtx_state_;
channel channel_;
@@ -293,7 +307,6 @@
volatile uint32_t running_worker_;
void execute_(
- worker & w,
detail::callable & ca,
detail::interrupter & intr,
shared_ptr< thread > const& thrd)
@@ -311,10 +324,10 @@
BOOST_ASSERT( ca.empty() );
}
- void next_callable_( worker & w, detail::callable & ca, detail::interrupter & intr)
+ void next_callable_( detail::worker_base * w, detail::callable & ca, detail::interrupter & intr)
{
rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
- if ( ! w.try_take( ca, intr) )
+ if ( ! w->try_take( ca, intr) )
{
if ( ! channel_.try_take( ca, intr) )
{
@@ -331,14 +344,14 @@
{
detail::guard grd( idle_worker_);
if ( shutdown_( w) ) return;
- w.increment_scanns();
- if ( w.scanns() >= scns_)
+ w->increment_scanns();
+ if ( w->scanns() >= scns_)
{
if ( size_() == idle_worker_)
channel_.take( ca, intr, asleep_);
else
this_thread::sleep( asleep_);
- w.reset_scanns();
+ w->reset_scanns();
}
else
this_thread::yield();
@@ -347,21 +360,20 @@
}
}
- template< typename R >
- void re_schedule_until_( jss::shared_future< R > const& f)
+ void reschedule_until_( function< bool() > const& cond)
{
- worker * w( tss_worker_.get() );
+ detail::worker_base * w( pool_base::tss_worker_.get() );
BOOST_ASSERT( w);
shared_ptr< thread > thrd( w->thrd() );
BOOST_ASSERT( thrd);
detail::interrupter intr;
detail::callable ca;
- while ( ! f.is_ready() )
+ while ( ! cond() )
{
- next_callable_( * w, ca, intr);
+ next_callable_( w, ca, intr);
if( ! ca.empty() )
{
- execute_( *w, ca, intr, thrd);
+ execute_( ca, intr, thrd);
w->reset_scanns();
}
}
@@ -377,22 +389,22 @@
worker w( * i);
BOOST_ASSERT( w.get_id() == this_thread::get_id() );
- tss_worker_.reset( new worker( w) );
+ pool_base::tss_worker_.reset( new worker( w) );
shared_ptr< thread > thrd( w.thrd() );
BOOST_ASSERT( thrd);
detail::callable ca;
detail::interrupter intr;
- tss_rnd_idx_.reset( new random_idx( worker_) );
+ pool::tss_rnd_idx_.reset( new random_idx( worker_) );
detail::guard grd( running_worker_);
- while ( ! shutdown_( w) )
+ while ( ! shutdown_( & w) )
{
- next_callable_( w, ca, intr);
+ next_callable_( & w, ca, intr);
if( ! ca.empty() )
{
- execute_( w, ca, intr, thrd);
+ execute_( ca, intr, thrd);
w.reset_scanns();
}
}
@@ -403,6 +415,7 @@
BOOST_ASSERT( ! terminateing_() && ! terminated_() );
worker_.insert(
worker(
+ this,
boost::bind(
& pool::entry_,
this) ) );
@@ -419,6 +432,7 @@
{
BOOST_ASSERT( ! terminateing_() && ! terminated_() );
worker w(
+ this,
boost::bind(
& pool::entry_,
this,
@@ -443,11 +457,11 @@
bool terminateing_() const
{ return state_ == terminateing_state; }
- bool shutdown_( worker & w)
+ bool shutdown_( detail::worker_base * w)
{
- if ( w.shutdown() && channel_.empty() )
+ if ( w->shutdown() && channel_.empty() )
return true;
- else if ( w.shutdown_now() )
+ else if ( w->shutdown_now() )
return true;
return false;
}
@@ -460,8 +474,6 @@
:
worker_(),
mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
state_( active_state),
mtx_state_(),
channel_(),
@@ -489,8 +501,6 @@
:
worker_(),
mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
state_( active_state),
mtx_state_(),
channel_(
@@ -518,8 +528,6 @@
:
worker_(),
mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
state_( active_state),
mtx_state_(),
channel_(),
@@ -548,8 +556,6 @@
:
worker_(),
mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
state_( active_state),
mtx_state_(),
channel_(
@@ -679,14 +685,18 @@
detail::interrupter intr;
jss::packaged_task< R > tsk( act);
jss::shared_future< R > fut( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker_base * w( pool_base::tss_worker_.get() );
if ( w)
{
+ function< bool() > cond(
+ bind(
+ & jss::shared_future< R >::is_ready,
+ fut) );
tsk.set_wait_callback(
bind(
- ( void ( pool< Channel >::*)( jss::shared_future< R > const&) ) & pool< Channel >::re_schedule_until_,
+ ( void ( pool< Channel >::*)( function< bool() > const&) ) & pool< Channel >::reschedule_until_,
this,
- fut) );
+ cond) );
w->put( detail::callable( move( tsk) ), intr);
return task< R >( fut, intr);
}
@@ -716,14 +726,18 @@
detail::interrupter intr;
jss::packaged_task< R > tsk( act);
jss::shared_future< R > fut( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker_base * w( pool_base::tss_worker_.get() );
if ( w)
{
+ function< bool() > cond(
+ bind(
+ & jss::shared_future< R >::is_ready,
+ fut) );
tsk.set_wait_callback(
bind(
- ( void ( pool< Channel >::*)( jss::shared_future< R > const&) ) & pool< Channel >::re_schedule_until_,
+ ( void ( pool< Channel >::*)( function< bool() > const&) ) & pool< Channel >::reschedule_until_,
this,
- fut) );
+ cond) );
w->put( detail::callable( move( tsk) ), intr);
return task< R >( fut, intr);
}
@@ -742,7 +756,24 @@
}
};
-} }
+template< typename Channel >
+thread_specific_ptr< typename pool< Channel >::random_idx >
+pool< Channel >::tss_rnd_idx_;
+
+}
+
+namespace this_task
+{
+ inline
+ void reschedule_until( function< bool() > const& cond)
+ {
+ tp::detail::worker_base * w( tp::detail::pool_base::tss_worker_.get() );
+ BOOST_ASSERT ( w);
+ w->reschedule_until( cond);
+ }
+}
+
+}
#endif // BOOST_TP_POOL_H
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk