Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55238 - sandbox/task/boost/task/detail
From: oliver.kowalke_at_[hidden]
Date: 2009-07-30 14:44:17


Author: olli
Date: 2009-07-28 14:51:27 EDT (Tue, 28 Jul 2009)
New Revision: 55238
URL: http://svn.boost.org/trac/boost/changeset/55238

Log:
* each worker starts initial fiber

Text files modified:
   sandbox/task/boost/task/detail/fiber.hpp | 2
   sandbox/task/boost/task/detail/fiber_posix.hpp | 36 +++++-----
   sandbox/task/boost/task/detail/fiber_windows.hpp | 140 +++++++++------------------------------
   sandbox/task/boost/task/detail/worker.hpp | 85 +++++++++++++++---------
   4 files changed, 105 insertions(+), 158 deletions(-)

Modified: sandbox/task/boost/task/detail/fiber.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber.hpp (original)
+++ sandbox/task/boost/task/detail/fiber.hpp 2009-07-28 14:51:27 EDT (Tue, 28 Jul 2009)
@@ -18,7 +18,7 @@
 #include <boost/task/detail/fiber_windows.hpp>
 
 # elif defined(_POSIX_VERSION)
-#include <boost/fibers/detail/fiber_posix.hpp>
+#include <boost/task/detail/fiber_posix.hpp>
 
 # endif
 

Modified: sandbox/task/boost/task/detail/fiber_posix.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_posix.hpp (original)
+++ sandbox/task/boost/task/detail/fiber_posix.hpp 2009-07-28 14:51:27 EDT (Tue, 28 Jul 2009)
@@ -20,8 +20,6 @@
 #include <boost/shared_array.hpp>
 #include <boost/system/system_error.hpp>
 
-#include <boost/fibers/detail/context.hpp>
-
 namespace boost { namespace task {
 namespace detail
 {
@@ -29,13 +27,12 @@
 void trampoline( Fiber * fib)
 {
         BOOST_ASSERT( fib);
- context< Fiber > ctxt( * fib);
         BOOST_ASSERT( ! fib->fn_.empty() );
- fib->fn_( ctxt);
+ fib->fn_();
         fib->exit();
 }
 
-class fiber_base
+class fiber
 {
 private:
         enum st_state
@@ -50,12 +47,12 @@
         friend
         void trampoline( Fiber *);
 
- function< void( context< fiber_base > &) > fn_;
- std::size_t stack_size_;
- ::ucontext_t caller_;
- ::ucontext_t callee_;
- shared_array< char > stack_;
- st_state state_;
+ function< void() > fn_;
+ std::size_t stack_size_;
+ ::ucontext_t caller_;
+ ::ucontext_t callee_;
+ shared_array< char > stack_;
+ st_state state_;
 
         bool uninitialized_() const
         { return state_ == st_uninitialized; }
@@ -78,10 +75,11 @@
                                         system::system_category) );
         }
 
- void yield_to_( fiber_base & to)
+ void yield_to_( fiber & to)
         {
                 std::swap( caller_, to.caller_);
                 std::swap( state_, to.state_);
+
                 if ( ::swapcontext( & callee_, & to.callee_) == -1)
                         throw system::system_error(
                                 system::error_code(
@@ -116,9 +114,9 @@
                 callee_.uc_stack.ss_sp = stack_.get();
                 callee_.uc_stack.ss_size = stack_size_;
                 callee_.uc_link = 0;
- typedef void fn_type( fiber_base *);
+ typedef void fn_type( fiber *);
                 typedef void ( * st_fn)();
- fn_type * fn_ptr( trampoline< fiber_base >);
+ fn_type * fn_ptr( trampoline< fiber >);
 
                 ::makecontext(
                         & callee_,
@@ -130,8 +128,10 @@
         }
 
 public:
- fiber_base(
- function< void( context< fiber_base > &) > fn,
+ static void convert_thread_to_fiber() {}
+
+ fiber(
+ function< void() > fn,
                 std::size_t stack_size)
         :
         fn_( fn),
@@ -145,7 +145,7 @@
                 BOOST_ASSERT( stack_size_ > 0);
         }
 
- ~fiber_base()
+ ~fiber()
         { BOOST_ASSERT( ! running_() ); }
 
         bool ready() const
@@ -165,7 +165,7 @@
                 BOOST_ASSERT( running_() );
         }
 
- void yield_to( fiber_base & to)
+ void yield_to( fiber & to)
         {
                 BOOST_ASSERT( running_() );
                 if ( to.uninitialized_() ) to.init_();

Modified: sandbox/task/boost/task/detail/fiber_windows.hpp
==============================================================================
--- sandbox/task/boost/task/detail/fiber_windows.hpp (original)
+++ sandbox/task/boost/task/detail/fiber_windows.hpp 2009-07-28 14:51:27 EDT (Tue, 28 Jul 2009)
@@ -19,7 +19,6 @@
 #include <boost/function.hpp>
 #include <boost/system/system_error.hpp>
 
-
 namespace boost { namespace task {
 namespace detail
 {
@@ -28,13 +27,12 @@
 {
         Fiber * fib( static_cast< Fiber * >( vp) );
         BOOST_ASSERT( fib);
- context< Fiber > ctxt( * fib);
         BOOST_ASSERT( ! fib->fn_.empty() );
- fib->fn_( ctxt);
+ fib->fn_();
         fib->exit();
 }
 
-class fiber_base
+class fiber
 {
 private:
         enum st_state
@@ -49,11 +47,11 @@
         friend
         VOID CALLBACK trampoline( LPVOID);
 
- function< void( context< fiber_base > &) > fn_;
- std::size_t stack_size_;
- LPVOID caller_;
- LPVOID callee_;
- st_state state_;
+ function< void() > fn_;
+ std::size_t stack_size_;
+ LPVOID caller_;
+ LPVOID callee_;
+ st_state state_;
 
         bool uninitialized_() const
         { return state_ == st_uninitialized; }
@@ -67,115 +65,33 @@
         bool exited_() const
         { return state_ == st_exited; }
 
- bool is_fiber_()
- {
-#if (_WIN32_WINNT >= 0x0600)
- return ::IsThreadAFiber() == TRUE;
-#else
- LPVOID current( ::GetCurrentFiber() );
- return current != 0 && current != reinterpret_cast< LPVOID >( 0x1E00);
-#endif
- }
-
         void yield_()
- {
- if( ! is_fiber_() )
- {
- BOOST_ASSERT( ! callee_);
- callee_ = ::ConvertThreadToFiber( 0);
- if ( ! callee_)
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
- ::SwitchToFiber( caller_);
- BOOL result = ::ConvertFiberToThread();
- if ( ! result)
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
- caller_ = 0;
- }
- else
- {
- if ( ! callee_)
- callee_ = ::GetCurrentFiber();
- ::SwitchToFiber( caller_);
- if ( ! callee_)
- callee_ = 0;
- }
- }
+ { ::SwitchToFiber( caller_); }
 
- void yield_to_( fiber_base & to)
+ void yield_to_( fiber & to)
         {
                 std::swap( caller_, to.caller_);
                 std::swap( state_, to.state_);
- if( ! is_fiber_() )
- {
- BOOST_ASSERT( ! callee_);
- callee_ = ::ConvertThreadToFiber( 0);
- if ( ! callee_)
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
- ::SwitchToFiber( to.callee_);
- BOOL result = ::ConvertFiberToThread();
- if ( ! result)
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
- caller_ = 0;
- }
- else
- {
- if ( ! callee_)
- callee_ = ::GetCurrentFiber();
- ::SwitchToFiber( to.callee_);
- if ( ! callee_)
- callee_ = 0;
- }
+
+ ::SwitchToFiber( to.callee_);
         }
 
         void run_()
- {
- if( ! is_fiber_() )
- {
- BOOST_ASSERT( ! caller_);
- caller_ = ::ConvertThreadToFiber( 0);
- if ( ! caller_)
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
- ::SwitchToFiber( callee_);
- BOOL result = ::ConvertFiberToThread();
- if ( ! result)
- throw system::system_error(
- system::error_code(
- ::GetLastError(),
- system::system_category) );
- caller_ = 0;
- }
- else
- {
- if ( ! caller_)
- caller_ = ::GetCurrentFiber();
- ::SwitchToFiber( callee_);
- if ( ! caller_)
- caller_ = 0;
- }
- }
+ { ::SwitchToFiber( callee_); }
 
         void init_()
         {
                 BOOST_ASSERT( state_ == st_uninitialized);
 
+ caller_ = ::GetCurrentFiber();
+ if ( ! caller_)
+ throw system::system_error(
+ system::error_code(
+ ::GetLastError(),
+ system::system_category) );
                 callee_ = ::CreateFiber(
                         stack_size_,
- static_cast< LPFIBER_START_ROUTINE >( & trampoline< fiber_base >),
+ static_cast< LPFIBER_START_ROUTINE >( & trampoline< fiber >),
                         static_cast< LPVOID >( this) );
                 if ( ! callee_)
                         throw system::system_error(
@@ -187,8 +103,18 @@
         }
 
 public:
- fiber_base(
- function< void( context< fiber_base > &) > fn,
+ static void convert_thread_to_fiber()
+ {
+ if ( ! ::ConvertThreadToFiber( 0) )
+ throw system::system_error(
+ system::error_code(
+ ::GetLastError(),
+ system::system_category) );
+
+ }
+
+ fiber(
+ function< void() > fn,
                 std::size_t stack_size)
         :
         fn_( fn),
@@ -201,7 +127,7 @@
                 BOOST_ASSERT( stack_size_ > 0);
         }
 
- ~fiber_base()
+ ~fiber()
         {
                 BOOST_ASSERT( ! running_() );
                 ::DeleteFiber( callee_);
@@ -224,7 +150,7 @@
                 BOOST_ASSERT( running_() );
         }
 
- void yield_to( fiber_base & to)
+ void yield_to( fiber & to)
         {
                 BOOST_ASSERT( running_() );
                 if ( to.uninitialized_() ) to.init_();

Modified: sandbox/task/boost/task/detail/worker.hpp
==============================================================================
--- sandbox/task/boost/task/detail/worker.hpp (original)
+++ sandbox/task/boost/task/detail/worker.hpp 2009-07-28 14:51:27 EDT (Tue, 28 Jul 2009)
@@ -19,6 +19,7 @@
 
 #include <boost/task/callable.hpp>
 #include <boost/task/detail/config.hpp>
+#include <boost/task/detail/fiber.hpp>
 #include <boost/task/detail/guard.hpp>
 #include <boost/task/detail/interrupter.hpp>
 #include <boost/task/detail/wsq.hpp>
@@ -89,6 +90,7 @@
 
         Pool & pool_;
         shared_ptr< thread > thrd_;
+ shared_ptr< fiber > fib_;
         wsq wsq_;
         semaphore shtdwn_sem_;
         semaphore shtdwn_now_sem_;
@@ -135,6 +137,45 @@
                 return false;
         }
 
+ void run_()
+ {
+ callable ca;
+ while ( ! shutdown_() )
+ {
+ if ( try_take_local_callable_( ca) ||
+ try_take_global_callable_( ca) ||
+ try_steal_other_callable_( ca) )
+ {
+ execute_( ca);
+ scns_ = 0;
+ }
+ else
+ {
+ guard grd( pool_.idle_worker_);
+ if ( shutdown_() ) return;
+ ++scns_;
+ if ( scns_ >= max_scns_)
+ {
+ // should the comparation be atomic or
+ // at least the read of idle_worker_ be atomic ?
+ if ( pool_.size_() == pool_.idle_worker_)
+ {
+ if ( take_global_callable_( ca, asleep_) )
+ execute_( ca);
+ }
+ else
+ try
+ { this_thread::sleep( asleep_); }
+ catch ( thread_interrupted const&)
+ { return; }
+ scns_ = 0;
+ }
+ else
+ this_thread::yield();
+ }
+ }
+ }
+
         bool shutdown_()
         {
                 if ( shutdown__() && get_pool().channel_.empty() )
@@ -164,6 +205,7 @@
         :
         pool_( pool),
         thrd_( new thread( fn) ),
+ fib_(),
         wsq_(),
         shtdwn_sem_( 0),
         shtdwn_now_sem_( 0),
@@ -206,38 +248,17 @@
         {
                 BOOST_ASSERT( get_id() == this_thread::get_id() );
 
- callable ca;
- while ( ! shutdown_() )
- {
- if ( try_take_local_callable_( ca) ||
- try_take_global_callable_( ca) ||
- try_steal_other_callable_( ca) )
- {
- execute_( ca);
- scns_ = 0;
- }
- else
- {
- guard grd( pool_.idle_worker_);
- if ( shutdown_() ) return;
- ++scns_;
- if ( scns_ >= max_scns_)
- {
- // should the comparation be atomic or
- // at least the read of idle_worker_ be atomic ?
- if ( pool_.size_() == pool_.idle_worker_)
- {
- if ( take_global_callable_( ca, asleep_) )
- execute_( ca);
- }
- else
- this_thread::sleep( asleep_);
- scns_ = 0;
- }
- else
- this_thread::yield();
- }
- }
+ fiber::convert_thread_to_fiber();
+ shared_ptr< fiber > fib(
+ new fiber(
+ bind(
+ & worker_object::run_,
+ this),
+ 64000) );
+ fib_.swap( fib);
+ fib_->run();
+ BOOST_ASSERT( fib_->exited() );
+ fib_.reset();
         }
 
         void reschedule_until( function< bool() > const& pred)


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk