|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r55609 - in sandbox/task: boost boost/task boost/task/detail libs/task/examples libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-08-16 04:41:57
Author: olli
Date: 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
New Revision: 55609
URL: http://svn.boost.org/trac/boost/changeset/55609
Log:
- fiber support enabled
Text files modified:
sandbox/task/boost/task.hpp | 1
sandbox/task/boost/task/detail/fiber_posix.hpp | 60 +++++++-------
sandbox/task/boost/task/detail/fiber_windows.hpp | 73 ++++++++---------
sandbox/task/boost/task/detail/worker.hpp | 161 ++++++++++++++++-----------------------
sandbox/task/boost/task/task.hpp | 4
sandbox/task/boost/task/utility.hpp | 15 --
sandbox/task/libs/task/examples/Jamfile.v2 | 13 ++
sandbox/task/libs/task/src/semaphore_posix.cpp | 5
sandbox/task/libs/task/src/semaphore_windows.cpp | 5
sandbox/task/libs/task/src/worker.cpp | 4
10 files changed, 155 insertions(+), 186 deletions(-)
Modified: sandbox/task/boost/task.hpp
==============================================================================
--- sandbox/task/boost/task.hpp (original)
+++ sandbox/task/boost/task.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -27,6 +27,7 @@
#include <boost/task/static_pool.hpp>
#include <boost/task/smart.hpp>
#include <boost/task/task.hpp>
+#include <boost/task/unbounded_buffer.hpp>
#include <boost/task/unbounded_channel.hpp>
#include <boost/task/utility.hpp>
#include <boost/task/watermark.hpp>
Modified: sandbox/task/boost/task/detail/fiber_posix.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_posix.hpp (original)
+++ sandbox/task/boost/task/detail/fiber_posix.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -18,6 +18,7 @@
#include <boost/assert.hpp>
#include <boost/function.hpp>
#include <boost/shared_array.hpp>
+#include <boost/shared_ptr.hpp>
#include <boost/system/system_error.hpp>
namespace boost { namespace task {
@@ -54,6 +55,18 @@
shared_array< char > stack_;
st_state state_;
+ fiber(
+ function< void() > fn,
+ std::size_t stack_size)
+ :
+ fn_( fn),
+ stack_size_( stack_size),
+ caller_(),
+ callee_(),
+ stack_( new char[stack_size]),
+ state_( st_uninitialized)
+ { BOOST_ASSERT( stack_size_ > 0); }
+
bool uninitialized_() const
{ return state_ == st_uninitialized; }
@@ -66,7 +79,7 @@
bool exited_() const
{ return state_ == st_exited; }
- void yield_()
+ void exit_()
{
if ( ::swapcontext( & callee_, & caller_) == -1)
throw system::system_error(
@@ -75,11 +88,10 @@
system::system_category) );
}
- void yield_to_( fiber & to)
+ void switch_to_( fiber & to)
{
std::swap( caller_, to.caller_);
std::swap( state_, to.state_);
-
if ( ::swapcontext( & callee_, & to.callee_) == -1)
throw system::system_error(
system::error_code(
@@ -128,22 +140,15 @@
}
public:
- static void convert_thread_to_fiber() {}
+ typedef shared_ptr< fiber > sptr_t;
- fiber(
+ static void convert_thread_to_fiber()
+ {}
+
+ static sptr_t create(
function< void() > fn,
std::size_t stack_size)
- :
- fn_( fn),
- stack_size_( stack_size),
- caller_(),
- callee_(),
- stack_( new char[stack_size]),
- state_( st_uninitialized)
- {
- BOOST_ASSERT( ! fn_.empty() );
- BOOST_ASSERT( stack_size_ > 0);
- }
+ { return sptr_t( new fiber( fn, stack_size) ); }
~fiber()
{ BOOST_ASSERT( ! running_() ); }
@@ -151,25 +156,18 @@
bool ready() const
{ return uninitialized_() || ready_(); }
- bool running() const
+ bool running() const
{ return running_(); }
- bool exited() const
+ bool exited() const
{ return exited_(); }
- void yield()
- {
- BOOST_ASSERT( running_() );
- state_ = st_ready;
- yield_();
- BOOST_ASSERT( running_() );
- }
-
- void yield_to( fiber & to)
+ void switch_to( sptr_t & to)
{
+ BOOST_ASSERT( to);
BOOST_ASSERT( running_() );
- if ( to.uninitialized_() ) to.init_();
- yield_to_( to);
+ if ( to->uninitialized_() ) to->init_();
+ switch_to_( * to);
BOOST_ASSERT( running_() );
}
@@ -187,11 +185,11 @@
{
BOOST_ASSERT( running_() ) ;
state_ = st_exited;
- yield_();
+ exit_();
BOOST_ASSERT(!"should never be reached");
}
};
-}}}
+} } }
#endif // BOOST_TASK_DETAIL_FIBER_POSIX_H
Modified: sandbox/task/boost/task/detail/fiber_windows.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_windows.hpp (original)
+++ sandbox/task/boost/task/detail/fiber_windows.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -17,9 +17,10 @@
#include <boost/assert.hpp>
#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
#include <boost/system/system_error.hpp>
-namespace boost { namespace task {
+namespace boost { namespace fibers {
namespace detail
{
template< typename Fiber >
@@ -47,11 +48,22 @@
friend
VOID CALLBACK trampoline( LPVOID);
- function< void() > fn_;
- std::size_t stack_size_;
- LPVOID caller_;
- LPVOID callee_;
- st_state state_;
+ function< void( context< fiber > &) > fn_;
+ std::size_t stack_size_;
+ LPVOID caller_;
+ LPVOID callee_;
+ st_state state_;
+
+ fiber(
+ function< void() > fn,
+ std::size_t stack_size)
+ :
+ fn_( fn),
+ stack_size_( stack_size),
+ caller_( 0),
+ callee_( 0),
+ state_( st_uninitialized)
+ { BOOST_ASSERT( stack_size_ > 0); }
bool uninitialized_() const
{ return state_ == st_uninitialized; }
@@ -65,14 +77,13 @@
bool exited_() const
{ return state_ == st_exited; }
- void yield_()
+ void exit_()
{ ::SwitchToFiber( caller_); }
- void yield_to_( fiber & to)
+ void switch_to_( fiber & to)
{
std::swap( caller_, to.caller_);
std::swap( state_, to.state_);
-
::SwitchToFiber( to.callee_);
}
@@ -103,29 +114,21 @@
}
public:
+ typedef shared_ptr< fiber > sptr_t;
+
static void convert_thread_to_fiber()
{
- if ( ! ::ConvertThreadToFiber( 0) )
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
-
+ if ( ! ::ConvertThreadToFiber( 0)_)
+ throw system::system_error(
+ system::error_code(
+ ::GetLastError(),
+ system::system_category) );
}
- fiber(
+ static sptr-t create(
function< void() > fn,
std::size_t stack_size)
- :
- fn_( fn),
- stack_size_( stack_size),
- caller_( 0),
- callee_( 0),
- state_( st_uninitialized)
- {
- BOOST_ASSERT( ! fn_.empty() );
- BOOST_ASSERT( stack_size_ > 0);
- }
+ { return sptr_t( new fiber( fn, stack_size) ); }
~fiber()
{
@@ -142,19 +145,11 @@
bool exited() const
{ return exited_(); }
- void yield()
- {
- BOOST_ASSERT( running_() );
- state_ = st_ready;
- yield_();
- BOOST_ASSERT( running_() );
- }
-
- void yield_to( fiber & to)
+ void switch_to( sptr_t & to)
{
BOOST_ASSERT( running_() );
- if ( to.uninitialized_() ) to.init_();
- yield_to_( to);
+ if ( to->uninitialized_() ) to->init_();
+ switch_to_( * to);
BOOST_ASSERT( running_() );
}
@@ -172,11 +167,11 @@
{
BOOST_ASSERT( running_() || ready_() ) ;
state_ = st_exited;
- yield_();
+ exit_();
BOOST_ASSERT(!"should never be reached");
}
};
-}}}
+} } }
#endif // BOOST_TASK_DETAIL_FIBER_WINDOWS_H
Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -9,6 +9,7 @@
#include <cstddef>
#include <list>
+#include <set>
#include <utility>
#include <boost/assert.hpp>
@@ -62,7 +63,7 @@
virtual void reschedule_until( function< bool() > const&) = 0;
- virtual bool block() = 0;
+ virtual void block() = 0;
};
template<
@@ -92,15 +93,14 @@
{ return die_(); }
};
- typedef shared_ptr< fiber > fiber_t;
typedef shared_ptr< thread > thread_t;
Pool & pool_;
thread_t thrd_;
- fiber_t fib_;
+ fiber::sptr_t fib_;
wsq wsq_;
- std::list< fiber_t > blocked_fibers_;
- std::list< fiber_t > runnable_fiber_lst;
+ std::list< fiber::sptr_t > blocked_fibers_;
+ std::list< fiber::sptr_t > runnable_fibers_;
semaphore shtdwn_sem_;
semaphore shtdwn_now_sem_;
bool shtdwn_;
@@ -124,7 +124,16 @@
void try_blocked_fibers_()
{
- td::list< fiber_t >
+ if ( ! blocked_fibers_.empty() )
+ {
+ fiber::sptr_t this_fib = fib_;
+ runnable_fibers_.push_back( this_fib);
+ fiber::sptr_t blocked_fib = blocked_fibers_.front();
+ blocked_fibers_.pop_front();
+ fib_ = blocked_fib;
+ this_fib->switch_to( blocked_fib);
+ fib_ = this_fib;
+ }
}
bool take_global_callable_(
@@ -152,49 +161,52 @@
return false;
}
- void run_()
+ void process_( bool all)
{
callable ca;
- while ( ! shutdown_() )
+ if ( all ? try_take_local_callable_( ca) ||
+ try_steal_other_callable_( ca) ||
+ try_take_global_callable_( ca)
+ : try_take_local_callable_( ca) )
{
- try_runnable_fibers_();
- if ( try_take_local_callable_( ca) ||
- try_take_global_callable_( ca) ||
- try_steal_other_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
+ execute_( ca);
+ scns_ = 0;
+ }
+ else
+ {
+ guard grd( pool_.idle_worker_);
+ ++scns_;
+ if ( scns_ >= max_scns_)
{
- guard grd( pool_.idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
+ if ( pool_.size_() > pool_.idle_worker_)
+ {
+ if ( take_global_callable_( ca, asleep_) )
+ execute_( ca);
+ }
+ else if ( blocked_fibers_.empty() )
{
- // should the comparation be atomic or
- // at least the read of idle_worker_ be atomic ?
- if ( pool_.size_() == pool_.idle_worker_)
- {
- if ( take_global_callable_( ca, asleep_) )
- execute_( ca);
- }
- else
- try
- { this_thread::sleep( asleep_); }
- catch ( thread_interrupted const&)
- { return; }
- scns_ = 0;
+ try
+ { this_thread::sleep( asleep_); }
+ catch ( thread_interrupted const&)
+ { return; }
}
- else
- this_thread::yield();
+ scns_ = 0;
}
+ else
+ this_thread::yield();
}
+ try_blocked_fibers_();
+ }
+
+ void run_()
+ {
+ while ( ! shutdown_() )
+ process_( true);
}
bool shutdown_()
{
- if ( shutdown__() && get_pool().channel_.empty() )
+ if ( shutdown__() && pool_.channel_.empty() && blocked_fibers_.empty() )
return true;
else if ( shutdown_now__() )
return true;
@@ -225,7 +237,7 @@
fib_(),
wsq_(),
blocked_fibers_(),
- runnable_fiber_lst(),
+ runnable_fibers_(),
shtdwn_sem_( 0),
shtdwn_now_sem_( 0),
shtdwn_( false),
@@ -261,76 +273,47 @@
bool try_steal( callable & ca)
{ return wsq_.try_steal( ca); }
- Pool & get_pool() const
- { return pool_; }
-
void run()
{
BOOST_ASSERT( get_id() == this_thread::get_id() );
fiber::convert_thread_to_fiber();
- fiber_t fib(
- new fiber(
- bind(
- & worker_object::run_,
- this),
+ fiber::sptr_t fib(
+ fiber::create(
+ bind( & worker_object::run_, this),
stack_size_) );
fib_.swap( fib);
fib_->run();
- BOOST_ASSERT( fib_->exited() );
fib_.reset();
}
void reschedule_until( function< bool() > const& pred)
{
- callable ca;
- while ( ! pred() /* && ! shutdown_() */)
- {
- if ( try_take_local_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
- {
- guard grd( get_pool().idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- this_thread::sleep( asleep_);
- scns_ = 0;
- }
- else
- this_thread::yield();
- }
- }
+ while ( ! pred() )
+ process_( false);
}
- bool block()
+ void block()
{
- blocked_fibers_.push_back( fib_);
- fiber_t fib;
+ fiber::sptr_t this_fib = fib_;
+ blocked_fibers_.push_back( this_fib);
+ fiber::sptr_t runnable_fib;
if ( runnable_fibers_.empty() )
{
- fib.reset(
- new fiber(
- bind(
- & worker_object::run_,
- this),
- stack_size_) );
+ runnable_fib = fiber::create(
+ bind( & worker_object::run_, this),
+ stack_size_);
}
else
{
- fib = runnable_fibers_.front();
+ runnable_fib = runnable_fibers_.front();
runnable_fibers_.pop_front();
}
- fib_.swap( fib);
- fib_->run();
- runnable_fibers_.push_back( fib);
-
- return ! shutdown_();
+ BOOST_ASSERT( runnable_fib);
+ fib_ = runnable_fib;
+ this_fib->switch_to( runnable_fib);
+ fib_ = this_fib;
}
};
@@ -371,17 +354,9 @@
void put( callable const&);
bool try_steal( callable &);
- template< typename Pool >
- Pool & get_pool() const
- {
- worker_object< Pool, worker > * p( dynamic_cast< worker_object< Pool, worker > * >( impl_.get() ) );
- BOOST_ASSERT( p);
- return p->get_pool();
- }
-
void run();
void reschedule_until( function< bool() > const&);
- bool block();
+ void block();
static worker * tss_get();
};
Modified: sandbox/task/boost/task/task.hpp
==============================================================================
--- sandbox/task/boost/task/task.hpp (original)
+++ sandbox/task/boost/task/task.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -70,6 +70,8 @@
{ throw task_already_executed(); }
catch ( thread_interrupted const&)
{ this->prom_.set_exception( copy_exception( task_interrupted() ) ); }
+ catch ( task_interrupted const& e)
+ { this->prom_.set_exception( copy_exception( e) ); }
catch ( boost::exception const& e)
{ this->prom_.set_exception( copy_exception( e) ); }
catch ( std::ios_base::failure const& e)
@@ -127,6 +129,8 @@
{ throw task_already_executed(); }
catch ( thread_interrupted const&)
{ this->prom_.set_exception( copy_exception( task_interrupted() ) ); }
+ catch ( task_interrupted const& e)
+ { this->prom_.set_exception( copy_exception( e) ); }
catch ( boost::exception const& e)
{ this->prom_.set_exception( copy_exception( e) ); }
catch ( std::ios_base::failure const& e)
Modified: sandbox/task/boost/task/utility.hpp
==============================================================================
--- sandbox/task/boost/task/utility.hpp (original)
+++ sandbox/task/boost/task/utility.hpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -9,7 +9,6 @@
#include <boost/assert.hpp>
#include <boost/thread.hpp>
-#include <boost/thread/thread_time.hpp>
#include <boost/task/detail/worker.hpp>
@@ -17,12 +16,12 @@
namespace boost { namespace this_task
{
-template< typename Pool >
-Pool & get_pool()
+inline
+void block()
{
task::detail::worker * w( task::detail::worker::tss_get() );
BOOST_ASSERT( w);
- return w->get_pool< Pool >();
+ w->block();
}
inline
@@ -30,14 +29,6 @@
{ return task::detail::worker::tss_get() != 0; }
inline
-bool block()
-{
- task::detail::worker * w( task::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- return w->block();
-}
-
-inline
thread::id worker_id()
{
task::detail::worker * w( task::detail::worker::tss_get() );
Modified: sandbox/task/libs/task/examples/Jamfile.v2
==============================================================================
--- sandbox/task/libs/task/examples/Jamfile.v2 (original)
+++ sandbox/task/libs/task/examples/Jamfile.v2 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -25,10 +25,21 @@
;
exe bind_to_processors : bind_to_processors.cpp ;
+exe buffer_multi : buffer_multi.cpp ;
+exe buffer_multi2 : buffer_multi2.cpp ;
+exe buffer_pool : buffer_pool.cpp ;
+exe buffer_pool_thread : buffer_pool_thread.cpp ;
+exe buffer_thread : buffer_thread.cpp ;
exe fork_join : fork_join.cpp ;
exe interrupt : interrupt.cpp ;
+exe no_deadlock_pool : no_deadlock_pool.cpp ;
+exe no_deadlock_pool2 : no_deadlock_pool2.cpp ;
+exe no_deadlock_pool3 : no_deadlock_pool3.cpp ;
exe pending : pending.cpp ;
exe priority : priority.cpp ;
-exe shutdonw_now : shutdown_now.cpp ;
+exe semaphore_thread : semaphore_thread.cpp ;
+exe semaphore_pool : semaphore_pool.cpp ;
+exe semaphore_pool_thread : semaphore_pool_thread.cpp ;
+exe shutdown_now : shutdown_now.cpp ;
exe smart : smart.cpp ;
exe submit : submit.cpp ;
Modified: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_posix.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -41,10 +41,7 @@
if ( this_task::runs_in_pool() )
{
while ( ! try_wait() )
- {
- if ( ! this_task::block() )
- throw task_interrupted();
- }
+ this_task::block();
}
else
{
Modified: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_windows.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -41,10 +41,7 @@
if ( this_task::runs_in_pool() )
{
while ( ! try_wait() )
- {
- if ( ! this_task::block() )
- throw task_interrupted();
- }
+ this_task::block();
}
else
{
Modified: sandbox/task/libs/task/src/worker.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker.cpp (original)
+++ sandbox/task/libs/task/src/worker.cpp 2009-08-16 04:41:56 EDT (Sun, 16 Aug 2009)
@@ -52,9 +52,9 @@
worker::reschedule_until( function< bool() > const& pred)
{ impl_->reschedule_until( pred); }
-bool
+void
worker::block()
-{ return impl_->block(); }
+{ impl_->block(); }
worker *
worker::tss_get()
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk