Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r51476 - in sandbox/threadpool/boost/tp: . detail
From: oliver.kowalke_at_[hidden]
Date: 2009-02-27 16:04:09


Author: olli
Date: 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
New Revision: 51476
URL: http://svn.boost.org/trac/boost/changeset/51476

Log:
* rescheduling of current task until condition evaluates to true
* boost::this_task namespace introduced
* boost::this_task::reschedule_until() in order to synchronize with other sources

Added:
   sandbox/threadpool/boost/tp/detail/pool_base.hpp (contents, props changed)
   sandbox/threadpool/boost/tp/detail/worker_base.hpp (contents, props changed)
Text files modified:
   sandbox/threadpool/boost/tp/pool.hpp | 117 +++++++++++++++++++++++++--------------
   1 files changed, 74 insertions(+), 43 deletions(-)

Added: sandbox/threadpool/boost/tp/detail/pool_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/pool_base.hpp 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -0,0 +1,27 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_POOL_BASE_H
+#define BOOST_TP_DETAIL_POOL_BASE_H
+
+#include <boost/thread/tss.hpp>
+
+#include <boost/tp/detail/worker_base.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct pool_base
+{
+ static thread_specific_ptr< worker_base > tss_worker_;
+};
+
+thread_specific_ptr< worker_base >
+pool_base::tss_worker_;
+
+} } }
+
+#endif // BOOST_TP_DETAIL_POOL_BASE_H
+

Added: sandbox/threadpool/boost/tp/detail/worker_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/worker_base.hpp 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -0,0 +1,67 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_WORKER_BASE_H
+#define BOOST_TP_DETAIL_WORKER_BASE_H
+
+#include <cstddef>
+
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread.hpp>
+
+#include <boost/tp/detail/callable.hpp>
+#include <boost/tp/detail/interrupter.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct worker_base
+{
+ virtual ~worker_base() {}
+
+ virtual const shared_ptr< thread > thrd() const = 0;
+
+ virtual const thread::id get_id() const = 0;
+
+ virtual void join() const = 0;
+
+ virtual void interrupt() const = 0;
+
+ virtual void put(
+ callable const& ca,
+ interrupter const& intr) = 0;
+
+ virtual bool try_take(
+ callable & ca,
+ interrupter & intr) = 0;
+
+ virtual bool try_steal(
+ callable & ca,
+ interrupter & intr) = 0;
+
+ virtual bool empty() const = 0;
+
+ virtual void signal_shutdown() = 0;
+
+ virtual void signal_shutdown_now() = 0;
+
+ virtual bool shutdown() = 0;
+
+ virtual bool shutdown_now() = 0;
+
+ virtual std::size_t scanns() const = 0;
+
+ virtual void increment_scanns() = 0;
+
+ virtual void reset_scanns() = 0;
+
+ virtual void reschedule_until( function< bool() > const&) = 0;
+};
+
+}}}
+
+#endif // BOOST_TP_DETAIL_WORKER_BASE_H
+

Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-02-27 16:04:09 EST (Fri, 27 Feb 2009)
@@ -40,6 +40,8 @@
 #ifdef BOOST_BIND_WORKER_TO_PROCESSORS
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
+#include <boost/tp/detail/pool_base.hpp>
+#include <boost/tp/detail/worker_base.hpp>
 #include <boost/tp/detail/wsq.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/poolsize.hpp>
@@ -61,8 +63,8 @@
 namespace tp
 {
 template< typename Channel >
-class pool
-: private noncopyable
+class pool : private noncopyable,
+ private detail::pool_base
 {
 private:
         typedef Channel channel;
@@ -75,7 +77,7 @@
                 terminated_state
         };
 
- class worker
+ class worker : public detail::worker_base
         {
         private:
                 class wimpl : private noncopyable
@@ -83,6 +85,7 @@
                 private:
                         typedef std::pair< detail::callable, detail::interrupter > item;
 
+ pool * pool_ptr_;
                         shared_ptr< thread > thrd_;
                         detail::wsq< item > wsq_;
                         interprocess::interprocess_semaphore shtdwn_sem_;
@@ -91,8 +94,11 @@
                         std::size_t scns_;
 
                 public:
- wimpl( function< void() > const& fn)
+ wimpl(
+ pool * pool_ptr,
+ function< void() > const& fn)
                         :
+ pool_ptr_( pool_ptr),
                         thrd_( new thread( fn) ),
                         wsq_(),
                         shtdwn_sem_( 0),
@@ -176,13 +182,18 @@
         
                         void reset_scanns()
                         { scns_ = 0; }
+
+ void reschedule_until( function< bool() > const& cond)
+ { pool_ptr_->reschedule_until_( cond); }
                 };
 
                 shared_ptr< wimpl > impl_;
 
         public:
- worker( function< void() > const& fn)
- : impl_( new wimpl( fn) )
+ worker(
+ pool * pool_ptr,
+ function< void() > const& fn)
+ : impl_( new wimpl( pool_ptr, fn) )
                 {}
 
                 const shared_ptr< thread > thrd() const
@@ -235,6 +246,9 @@
 
                 void reset_scanns()
                 { impl_->reset_scanns(); }
+
+ void reschedule_until( function< bool() > const& cond)
+ { impl_->reschedule_until( cond); }
         };
 
         struct id_idx_tag {};
@@ -279,10 +293,10 @@
                 { return die_(); }
         };
 
+ static thread_specific_ptr< random_idx > tss_rnd_idx_;
+
         worker_list worker_;
         shared_mutex mtx_worker_;
- thread_specific_ptr< worker > tss_worker_;
- thread_specific_ptr< random_idx > tss_rnd_idx_;
         state state_;
         shared_mutex mtx_state_;
         channel channel_;
@@ -293,7 +307,6 @@
         volatile uint32_t running_worker_;
 
         void execute_(
- worker & w,
                 detail::callable & ca,
                 detail::interrupter & intr,
                 shared_ptr< thread > const& thrd)
@@ -311,10 +324,10 @@
                 BOOST_ASSERT( ca.empty() );
         }
 
- void next_callable_( worker & w, detail::callable & ca, detail::interrupter & intr)
+ void next_callable_( detail::worker_base * w, detail::callable & ca, detail::interrupter & intr)
         {
                 rnd_idx & ridx( worker_.get< rnd_idx_tag >() );
- if ( ! w.try_take( ca, intr) )
+ if ( ! w->try_take( ca, intr) )
                 {
                         if ( ! channel_.try_take( ca, intr) )
                         {
@@ -331,14 +344,14 @@
                                 {
                                         detail::guard grd( idle_worker_);
                                         if ( shutdown_( w) ) return;
- w.increment_scanns();
- if ( w.scanns() >= scns_)
+ w->increment_scanns();
+ if ( w->scanns() >= scns_)
                                         {
                                                 if ( size_() == idle_worker_)
                                                         channel_.take( ca, intr, asleep_);
                                                 else
                                                         this_thread::sleep( asleep_);
- w.reset_scanns();
+ w->reset_scanns();
                                         }
                                         else
                                                 this_thread::yield();
@@ -347,21 +360,20 @@
                 }
         }
 
- template< typename R >
- void re_schedule_until_( jss::shared_future< R > const& f)
+ void reschedule_until_( function< bool() > const& cond)
         {
- worker * w( tss_worker_.get() );
+ detail::worker_base * w( pool_base::tss_worker_.get() );
                 BOOST_ASSERT( w);
                 shared_ptr< thread > thrd( w->thrd() );
                 BOOST_ASSERT( thrd);
                 detail::interrupter intr;
                 detail::callable ca;
- while ( ! f.is_ready() )
+ while ( ! cond() )
                 {
- next_callable_( * w, ca, intr);
+ next_callable_( w, ca, intr);
                         if( ! ca.empty() )
                         {
- execute_( *w, ca, intr, thrd);
+ execute_( ca, intr, thrd);
                                 w->reset_scanns();
                         }
                 }
@@ -377,22 +389,22 @@
 
                 worker w( * i);
                 BOOST_ASSERT( w.get_id() == this_thread::get_id() );
- tss_worker_.reset( new worker( w) );
+ pool_base::tss_worker_.reset( new worker( w) );
                 shared_ptr< thread > thrd( w.thrd() );
                 BOOST_ASSERT( thrd);
                 detail::callable ca;
                 detail::interrupter intr;
 
- tss_rnd_idx_.reset( new random_idx( worker_) );
+ pool::tss_rnd_idx_.reset( new random_idx( worker_) );
 
                 detail::guard grd( running_worker_);
 
- while ( ! shutdown_( w) )
+ while ( ! shutdown_( & w) )
                 {
- next_callable_( w, ca, intr);
+ next_callable_( & w, ca, intr);
                         if( ! ca.empty() )
                         {
- execute_( w, ca, intr, thrd);
+ execute_( ca, intr, thrd);
                                 w.reset_scanns();
                         }
                 }
@@ -403,6 +415,7 @@
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
                 worker_.insert(
                         worker(
+ this,
                                 boost::bind(
                                         & pool::entry_,
                                         this) ) );
@@ -419,6 +432,7 @@
         {
                 BOOST_ASSERT( ! terminateing_() && ! terminated_() );
                 worker w(
+ this,
                                 boost::bind(
                                         & pool::entry_,
                                         this,
@@ -443,11 +457,11 @@
         bool terminateing_() const
         { return state_ == terminateing_state; }
 
- bool shutdown_( worker & w)
+ bool shutdown_( detail::worker_base * w)
         {
- if ( w.shutdown() && channel_.empty() )
+ if ( w->shutdown() && channel_.empty() )
                         return true;
- else if ( w.shutdown_now() )
+ else if ( w->shutdown_now() )
                         return true;
                 return false;
         }
@@ -460,8 +474,6 @@
         :
         worker_(),
         mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(),
@@ -489,8 +501,6 @@
         :
         worker_(),
         mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(
@@ -518,8 +528,6 @@
         :
         worker_(),
         mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(),
@@ -548,8 +556,6 @@
         :
         worker_(),
         mtx_worker_(),
- tss_worker_(),
- tss_rnd_idx_(),
         state_( active_state),
         mtx_state_(),
         channel_(
@@ -679,14 +685,18 @@
                 detail::interrupter intr;
                 jss::packaged_task< R > tsk( act);
                 jss::shared_future< R > fut( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker_base * w( pool_base::tss_worker_.get() );
                 if ( w)
                 {
+ function< bool() > cond(
+ bind(
+ & jss::shared_future< R >::is_ready,
+ fut) );
                         tsk.set_wait_callback(
                                 bind(
- ( void ( pool< Channel >::*)( jss::shared_future< R > const&) ) & pool< Channel >::re_schedule_until_,
+ ( void ( pool< Channel >::*)( function< bool() > const&) ) & pool< Channel >::reschedule_until_,
                                         this,
- fut) );
+ cond) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( fut, intr);
                 }
@@ -716,14 +726,18 @@
                 detail::interrupter intr;
                 jss::packaged_task< R > tsk( act);
                 jss::shared_future< R > fut( tsk.get_future() );
- worker * w( tss_worker_.get() );
+ detail::worker_base * w( pool_base::tss_worker_.get() );
                 if ( w)
                 {
+ function< bool() > cond(
+ bind(
+ & jss::shared_future< R >::is_ready,
+ fut) );
                         tsk.set_wait_callback(
                                 bind(
- ( void ( pool< Channel >::*)( jss::shared_future< R > const&) ) & pool< Channel >::re_schedule_until_,
+ ( void ( pool< Channel >::*)( function< bool() > const&) ) & pool< Channel >::reschedule_until_,
                                         this,
- fut) );
+ cond) );
                         w->put( detail::callable( move( tsk) ), intr);
                         return task< R >( fut, intr);
                 }
@@ -742,7 +756,24 @@
         }
 };
 
-} }
+template< typename Channel >
+thread_specific_ptr< typename pool< Channel >::random_idx >
+pool< Channel >::tss_rnd_idx_;
+
+}
+
+namespace this_task
+{
+ inline
+ void reschedule_until( function< bool() > const& cond)
+ {
+ tp::detail::worker_base * w( tp::detail::pool_base::tss_worker_.get() );
+ BOOST_ASSERT ( w);
+ w->reschedule_until( cond);
+ }
+}
+
+}
 
 #endif // BOOST_TP_POOL_H
 


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk