Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55609 - in sandbox/task: boost boost/task boost/task/detail libs/task/examples libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-08-16 04:41:57


Author: olli
Date: 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
New Revision: 55609
URL: http://svn.boost.org/trac/boost/changeset/55609

Log:
 - fiber support enabled

Text files modified:
   sandbox/task/boost/task.hpp | 1
   sandbox/task/boost/task/detail/fiber_posix.hpp | 60 +++++++-------
   sandbox/task/boost/task/detail/fiber_windows.hpp | 73 ++++++++---------
   sandbox/task/boost/task/detail/worker.hpp | 161 ++++++++++++++++-----------------------
   sandbox/task/boost/task/task.hpp | 4
   sandbox/task/boost/task/utility.hpp | 15 --
   sandbox/task/libs/task/examples/Jamfile.v2 | 13 ++
   sandbox/task/libs/task/src/semaphore_posix.cpp | 5
   sandbox/task/libs/task/src/semaphore_windows.cpp | 5
   sandbox/task/libs/task/src/worker.cpp | 4
   10 files changed, 155 insertions(+), 186 deletions(-)

Modified: sandbox/task/boost/task.hpp
==============================================================================
--- sandbox/task/boost/task.hpp (original)
+++ sandbox/task/boost/task.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -27,6 +27,7 @@
 #include <boost/task/static_pool.hpp>
 #include <boost/task/smart.hpp>
 #include <boost/task/task.hpp>
+#include <boost/task/unbounded_buffer.hpp>
 #include <boost/task/unbounded_channel.hpp>
 #include <boost/task/utility.hpp>
 #include <boost/task/watermark.hpp>

Modified: sandbox/task/boost/task/detail/fiber_posix.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_posix.hpp (original)
+++ sandbox/task/boost/task/detail/fiber_posix.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -18,6 +18,7 @@
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
 #include <boost/shared_array.hpp>
+#include <boost/shared_ptr.hpp>
 #include <boost/system/system_error.hpp>
 
 namespace boost { namespace task {
@@ -54,6 +55,18 @@
         shared_array< char > stack_;
         st_state state_;
 
+ fiber(
+ function< void() > fn,
+ std::size_t stack_size)
+ :
+ fn_( fn),
+ stack_size_( stack_size),
+ caller_(),
+ callee_(),
+ stack_( new char[stack_size]),
+ state_( st_uninitialized)
+ { BOOST_ASSERT( stack_size_ > 0); }
+
         bool uninitialized_() const
         { return state_ == st_uninitialized; }
 
@@ -66,7 +79,7 @@
         bool exited_() const
         { return state_ == st_exited; }
 
- void yield_()
+ void exit_()
         {
                 if ( ::swapcontext( & callee_, & caller_) == -1)
                         throw system::system_error(
@@ -75,11 +88,10 @@
                                         system::system_category) );
         }
 
- void yield_to_( fiber & to)
+ void switch_to_( fiber & to)
         {
                 std::swap( caller_, to.caller_);
                 std::swap( state_, to.state_);
-
                 if ( ::swapcontext( & callee_, & to.callee_) == -1)
                         throw system::system_error(
                                 system::error_code(
@@ -128,22 +140,15 @@
         }
 
 public:
- static void convert_thread_to_fiber() {}
+ typedef shared_ptr< fiber > sptr_t;
 
- fiber(
+ static void convert_thread_to_fiber()
+ {}
+
+ static sptr_t create(
                 function< void() > fn,
                 std::size_t stack_size)
- :
- fn_( fn),
- stack_size_( stack_size),
- caller_(),
- callee_(),
- stack_( new char[stack_size]),
- state_( st_uninitialized)
- {
- BOOST_ASSERT( ! fn_.empty() );
- BOOST_ASSERT( stack_size_ > 0);
- }
+ { return sptr_t( new fiber( fn, stack_size) ); }
 
         ~fiber()
         { BOOST_ASSERT( ! running_() ); }
@@ -151,25 +156,18 @@
         bool ready() const
         { return uninitialized_() || ready_(); }
 
- bool running() const
+ bool running() const
         { return running_(); }
 
- bool exited() const
+ bool exited() const
         { return exited_(); }
 
- void yield()
- {
- BOOST_ASSERT( running_() );
- state_ = st_ready;
- yield_();
- BOOST_ASSERT( running_() );
- }
-
- void yield_to( fiber & to)
+ void switch_to( sptr_t & to)
         {
+ BOOST_ASSERT( to);
                 BOOST_ASSERT( running_() );
- if ( to.uninitialized_() ) to.init_();
- yield_to_( to);
+ if ( to->uninitialized_() ) to->init_();
+ switch_to_( * to);
                 BOOST_ASSERT( running_() );
         }
 
@@ -187,11 +185,11 @@
         {
                 BOOST_ASSERT( running_() ) ;
                 state_ = st_exited;
- yield_();
+ exit_();
                 BOOST_ASSERT(!"should never be reached");
         }
 };
-}}}
+} } }
 
 #endif // BOOST_TASK_DETAIL_FIBER_POSIX_H
 

Modified: sandbox/task/boost/task/detail/fiber_windows.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_windows.hpp (original)
+++ sandbox/task/boost/task/detail/fiber_windows.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -17,9 +17,10 @@
 
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
 #include <boost/system/system_error.hpp>
 
-namespace boost { namespace task {
+namespace boost { namespace fibers {
 namespace detail
 {
 template< typename Fiber >
@@ -47,11 +48,22 @@
         friend
         VOID CALLBACK trampoline( LPVOID);
 
- function< void() > fn_;
- std::size_t stack_size_;
- LPVOID caller_;
- LPVOID callee_;
- st_state state_;
+ function< void( context< fiber > &) > fn_;
+ std::size_t stack_size_;
+ LPVOID caller_;
+ LPVOID callee_;
+ st_state state_;
+
+ fiber(
+ function< void() > fn,
+ std::size_t stack_size)
+ :
+ fn_( fn),
+ stack_size_( stack_size),
+ caller_( 0),
+ callee_( 0),
+ state_( st_uninitialized)
+ { BOOST_ASSERT( stack_size_ > 0); }
 
         bool uninitialized_() const
         { return state_ == st_uninitialized; }
@@ -65,14 +77,13 @@
         bool exited_() const
         { return state_ == st_exited; }
 
- void yield_()
+ void exit_()
         { ::SwitchToFiber( caller_); }
 
- void yield_to_( fiber & to)
+ void switch_to_( fiber & to)
         {
                 std::swap( caller_, to.caller_);
                 std::swap( state_, to.state_);
-
                 ::SwitchToFiber( to.callee_);
         }
 
@@ -103,29 +114,21 @@
         }
 
 public:
+ typedef shared_ptr< fiber > sptr_t;
+
         static void convert_thread_to_fiber()
         {
- if ( ! ::ConvertThreadToFiber( 0) )
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
-
+ if ( ! ::ConvertThreadToFiber( 0)_)
+ throw system::system_error(
+ system::error_code(
+ ::GetLastError(),
+ system::system_category) );
         }
 
- fiber(
+ static sptr-t create(
                 function< void() > fn,
                 std::size_t stack_size)
- :
- fn_( fn),
- stack_size_( stack_size),
- caller_( 0),
- callee_( 0),
- state_( st_uninitialized)
- {
- BOOST_ASSERT( ! fn_.empty() );
- BOOST_ASSERT( stack_size_ > 0);
- }
+ { return sptr_t( new fiber( fn, stack_size) ); }
 
         ~fiber()
         {
@@ -142,19 +145,11 @@
     bool exited() const
         { return exited_(); }
 
- void yield()
- {
- BOOST_ASSERT( running_() );
- state_ = st_ready;
- yield_();
- BOOST_ASSERT( running_() );
- }
-
- void yield_to( fiber & to)
+ void switch_to( sptr_t & to)
         {
                 BOOST_ASSERT( running_() );
- if ( to.uninitialized_() ) to.init_();
- yield_to_( to);
+ if ( to->uninitialized_() ) to->init_();
+ switch_to_( * to);
                 BOOST_ASSERT( running_() );
         }
 
@@ -172,11 +167,11 @@
         {
                 BOOST_ASSERT( running_() || ready_() ) ;
                 state_ = st_exited;
- yield_();
+ exit_();
                 BOOST_ASSERT(!"should never be reached");
         }
 };
-}}}
+} } }
 
 #endif // BOOST_TASK_DETAIL_FIBER_WINDOWS_H
 

Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -9,6 +9,7 @@
 
 #include <cstddef>
 #include <list>
+#include <set>
 #include <utility>
 
 #include <boost/assert.hpp>
@@ -62,7 +63,7 @@
 
         virtual void reschedule_until( function< bool() > const&) = 0;
 
- virtual bool block() = 0;
+ virtual void block() = 0;
 };
 
 template<
@@ -92,15 +93,14 @@
                 { return die_(); }
         };
 
- typedef shared_ptr< fiber > fiber_t;
         typedef shared_ptr< thread > thread_t;
 
         Pool & pool_;
         thread_t thrd_;
- fiber_t fib_;
+ fiber::sptr_t fib_;
         wsq wsq_;
- std::list< fiber_t > blocked_fibers_;
- std::list< fiber_t > runnable_fiber_lst;
+ std::list< fiber::sptr_t > blocked_fibers_;
+ std::list< fiber::sptr_t > runnable_fibers_;
         semaphore shtdwn_sem_;
         semaphore shtdwn_now_sem_;
         bool shtdwn_;
@@ -124,7 +124,16 @@
 
         void try_blocked_fibers_()
         {
- td::list< fiber_t >
+ if ( ! blocked_fibers_.empty() )
+ {
+ fiber::sptr_t this_fib = fib_;
+ runnable_fibers_.push_back( this_fib);
+ fiber::sptr_t blocked_fib = blocked_fibers_.front();
+ blocked_fibers_.pop_front();
+ fib_ = blocked_fib;
+ this_fib->switch_to( blocked_fib);
+ fib_ = this_fib;
+ }
         }
 
         bool take_global_callable_(
@@ -152,49 +161,52 @@
                 return false;
         }
 
- void run_()
+ void process_( bool all)
         {
                 callable ca;
- while ( ! shutdown_() )
+ if ( all ? try_take_local_callable_( ca) ||
+ try_steal_other_callable_( ca) ||
+ try_take_global_callable_( ca)
+ : try_take_local_callable_( ca) )
                 {
- try_runnable_fibers_();
- if ( try_take_local_callable_( ca) ||
- try_take_global_callable_( ca) ||
- try_steal_other_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
+ execute_( ca);
+ scns_ = 0;
+ }
+ else
+ {
+ guard grd( pool_.idle_worker_);
+ ++scns_;
+ if ( scns_ >= max_scns_)
                         {
- guard grd( pool_.idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
+ if ( pool_.size_() > pool_.idle_worker_)
+ {
+ if ( take_global_callable_( ca, asleep_) )
+ execute_( ca);
+ }
+ else if ( blocked_fibers_.empty() )
                                 {
- // should the comparation be atomic or
- // at least the read of idle_worker_ be atomic ?
- if ( pool_.size_() == pool_.idle_worker_)
- {
- if ( take_global_callable_( ca, asleep_) )
- execute_( ca);
- }
- else
- try
- { this_thread::sleep( asleep_); }
- catch ( thread_interrupted const&)
- { return; }
- scns_ = 0;
+ try
+ { this_thread::sleep( asleep_); }
+ catch ( thread_interrupted const&)
+ { return; }
                                 }
- else
- this_thread::yield();
+ scns_ = 0;
                         }
+ else
+ this_thread::yield();
                 }
+ try_blocked_fibers_();
+ }
+
+ void run_()
+ {
+ while ( ! shutdown_() )
+ process_( true);
         }
 
         bool shutdown_()
         {
- if ( shutdown__() && get_pool().channel_.empty() )
+ if ( shutdown__() && pool_.channel_.empty() && blocked_fibers_.empty() )
                         return true;
                 else if ( shutdown_now__() )
                         return true;
@@ -225,7 +237,7 @@
         fib_(),
         wsq_(),
         blocked_fibers_(),
- runnable_fiber_lst(),
+ runnable_fibers_(),
         shtdwn_sem_( 0),
         shtdwn_now_sem_( 0),
         shtdwn_( false),
@@ -261,76 +273,47 @@
         bool try_steal( callable & ca)
         { return wsq_.try_steal( ca); }
 
- Pool & get_pool() const
- { return pool_; }
-
         void run()
         {
                 BOOST_ASSERT( get_id() == this_thread::get_id() );
 
                 fiber::convert_thread_to_fiber();
 
- fiber_t fib(
- new fiber(
- bind(
- & worker_object::run_,
- this),
+ fiber::sptr_t fib(
+ fiber::create(
+ bind( & worker_object::run_, this),
                                 stack_size_) );
                 fib_.swap( fib);
                 fib_->run();
- BOOST_ASSERT( fib_->exited() );
                 fib_.reset();
         }
 
         void reschedule_until( function< bool() > const& pred)
         {
- callable ca;
- while ( ! pred() /* && ! shutdown_() */)
- {
- if ( try_take_local_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
- {
- guard grd( get_pool().idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- this_thread::sleep( asleep_);
- scns_ = 0;
- }
- else
- this_thread::yield();
- }
- }
+ while ( ! pred() )
+ process_( false);
         }
 
- bool block()
+ void block()
         {
- blocked_fibers_.push_back( fib_);
- fiber_t fib;
+ fiber::sptr_t this_fib = fib_;
+ blocked_fibers_.push_back( this_fib);
+ fiber::sptr_t runnable_fib;
                 if ( runnable_fibers_.empty() )
                 {
- fib.reset(
- new fiber(
- bind(
- & worker_object::run_,
- this),
- stack_size_) );
+ runnable_fib = fiber::create(
+ bind( & worker_object::run_, this),
+ stack_size_);
                 }
                 else
                 {
- fib = runnable_fibers_.front();
+ runnable_fib = runnable_fibers_.front();
                         runnable_fibers_.pop_front();
                 }
- fib_.swap( fib);
- fib_->run();
- runnable_fibers_.push_back( fib);
-
- return ! shutdown_();
+ BOOST_ASSERT( runnable_fib);
+ fib_ = runnable_fib;
+ this_fib->switch_to( runnable_fib);
+ fib_ = this_fib;
         }
 };
 
@@ -371,17 +354,9 @@
         void put( callable const&);
         bool try_steal( callable &);
 
- template< typename Pool >
- Pool & get_pool() const
- {
- worker_object< Pool, worker > * p( dynamic_cast< worker_object< Pool, worker > * >( impl_.get() ) );
- BOOST_ASSERT( p);
- return p->get_pool();
- }
-
         void run();
         void reschedule_until( function< bool() > const&);
- bool block();
+ void block();
 
         static worker * tss_get();
 };

Modified: sandbox/task/boost/task/task.hpp
==============================================================================
--- sandbox/task/boost/task/task.hpp (original)
+++ sandbox/task/boost/task/task.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -70,6 +70,8 @@
                 { throw task_already_executed(); }
                 catch ( thread_interrupted const&)
                 { this->prom_.set_exception( copy_exception( task_interrupted() ) ); }
+ catch ( task_interrupted const& e)
+ { this->prom_.set_exception( copy_exception( e) ); }
                 catch ( boost::exception const& e)
                 { this->prom_.set_exception( copy_exception( e) ); }
                 catch ( std::ios_base::failure const& e)
@@ -127,6 +129,8 @@
                 { throw task_already_executed(); }
                 catch ( thread_interrupted const&)
                 { this->prom_.set_exception( copy_exception( task_interrupted() ) ); }
+ catch ( task_interrupted const& e)
+ { this->prom_.set_exception( copy_exception( e) ); }
                 catch ( boost::exception const& e)
                 { this->prom_.set_exception( copy_exception( e) ); }
                 catch ( std::ios_base::failure const& e)

Modified: sandbox/task/boost/task/utility.hpp
==============================================================================
--- sandbox/task/boost/task/utility.hpp (original)
+++ sandbox/task/boost/task/utility.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -9,7 +9,6 @@
 
 #include <boost/assert.hpp>
 #include <boost/thread.hpp>
-#include <boost/thread/thread_time.hpp>
 
 #include <boost/task/detail/worker.hpp>
 
@@ -17,12 +16,12 @@
 
 namespace boost { namespace this_task
 {
-template< typename Pool >
-Pool & get_pool()
+inline
+void block()
 {
         task::detail::worker * w( task::detail::worker::tss_get() );
         BOOST_ASSERT( w);
- return w->get_pool< Pool >();
+ w->block();
 }
 
 inline
@@ -30,14 +29,6 @@
 { return task::detail::worker::tss_get() != 0; }
 
 inline
-bool block()
-{
- task::detail::worker * w( task::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- return w->block();
-}
-
-inline
 thread::id worker_id()
 {
         task::detail::worker * w( task::detail::worker::tss_get() );

Modified: sandbox/task/libs/task/examples/Jamfile.v2
==============================================================================
--- sandbox/task/libs/task/examples/Jamfile.v2 (original)
+++ sandbox/task/libs/task/examples/Jamfile.v2 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -25,10 +25,21 @@
     ;
 
 exe bind_to_processors : bind_to_processors.cpp ;
+exe buffer_multi : buffer_multi.cpp ;
+exe buffer_multi2 : buffer_multi2.cpp ;
+exe buffer_pool : buffer_pool.cpp ;
+exe buffer_pool_thread : buffer_pool_thread.cpp ;
+exe buffer_thread : buffer_thread.cpp ;
 exe fork_join : fork_join.cpp ;
 exe interrupt : interrupt.cpp ;
+exe no_deadlock_pool : no_deadlock_pool.cpp ;
+exe no_deadlock_pool2 : no_deadlock_pool2.cpp ;
+exe no_deadlock_pool3 : no_deadlock_pool3.cpp ;
 exe pending : pending.cpp ;
 exe priority : priority.cpp ;
-exe shutdonw_now : shutdown_now.cpp ;
+exe semaphore_thread : semaphore_thread.cpp ;
+exe semaphore_pool : semaphore_pool.cpp ;
+exe semaphore_pool_thread : semaphore_pool_thread.cpp ;
+exe shutdown_now : shutdown_now.cpp ;
 exe smart : smart.cpp ;
 exe submit : submit.cpp ;

Modified: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_posix.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -41,10 +41,7 @@
         if ( this_task::runs_in_pool() )
         {
                 while ( ! try_wait() )
- {
- if ( ! this_task::block() )
- throw task_interrupted();
- }
+ this_task::block();
         }
         else
         {

Modified: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_windows.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -41,10 +41,7 @@
         if ( this_task::runs_in_pool() )
         {
                 while ( ! try_wait() )
- {
- if ( ! this_task::block() )
- throw task_interrupted();
- }
+ this_task::block();
         }
         else
         {

Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp (original)
+++ sandbox/task/libs/task/src/worker.cpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -52,9 +52,9 @@
 worker::reschedule_until( function< bool() > const& pred)
 { impl_->reschedule_until( pred); }
 
-bool
+void
 worker::block()
-{ return impl_->block(); }
+{ impl_->block(); }
 
 worker *
 worker::tss_get()


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk