Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r58242 - in sandbox/fiber: . boost/fiber libs/fiber/examples libs/fiber/src libs/fiber/test
From: oliver.kowalke_at_[hidden]
Date: 2009-12-08 15:25:33


Author: olli
Date: 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
New Revision: 58242
URL: http://svn.boost.org/trac/boost/changeset/58242

Log:
- fiber migration supported

Added:
   sandbox/fiber/libs/fiber/examples/migrate_mt.cpp (contents, props changed)
Removed:
   sandbox/fiber/libs/fiber/examples/simple_mt.cpp
Text files modified:
   sandbox/fiber/boost/fiber/bounded_fifo.hpp | 247 ++++++++++++++++++++++++++++++++++++++-
   sandbox/fiber/boost/fiber/round_robin.hpp | 4
   sandbox/fiber/boost/fiber/scheduler.hpp | 4
   sandbox/fiber/boost/fiber/strategy.hpp | 6
   sandbox/fiber/boost/fiber/unbounded_fifo.hpp | 156 ++++++++++++++++++++++++-
   sandbox/fiber/change.log | 11 +
   sandbox/fiber/libs/fiber/examples/Jamfile.v2 | 2
   sandbox/fiber/libs/fiber/examples/ping_pong.cpp | 10 +
   sandbox/fiber/libs/fiber/src/round_robin.cpp | 44 +++++++
   sandbox/fiber/libs/fiber/test/test_scheduler.cpp | 82 +++++++++++++
   10 files changed, 545 insertions(+), 21 deletions(-)

Modified: sandbox/fiber/boost/fiber/bounded_fifo.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/bounded_fifo.hpp (original)
+++ sandbox/fiber/boost/fiber/bounded_fifo.hpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -20,16 +20,20 @@
 #include <boost/fiber/detail/atomic.hpp>
 #include <boost/fiber/exceptions.hpp>
 #include <boost/fiber/mutex.hpp>
+#include <boost/fiber/spin_condition.hpp>
+#include <boost/fiber/spin_mutex.hpp>
 
 #include <boost/config/abi_prefix.hpp>
 
 namespace boost {
 namespace fibers {
 
+template< typename T, typename M >
+class bounded_fifo;
+
 template< typename T >
-class bounded_fifo : private noncopyable
+class bounded_fifo< T, spin_mutex > : private noncopyable
 {
-
 public:
         typedef optional< T > value_type;
 
@@ -58,11 +62,11 @@
         volatile uint32_t state_;
         volatile uint32_t count_;
         typename node::ptr_t head_;
- mutex head_mtx_;
+ spin_mutex head_mtx_;
         typename node::ptr_t tail_;
- mutex tail_mtx_;
- condition not_empty_cond_;
- condition not_full_cond_;
+ spin_mutex tail_mtx_;
+ spin_condition not_empty_cond_;
+ spin_condition not_full_cond_;
         std::size_t hwm_;
         std::size_t lwm_;
         volatile uint32_t use_count_;
@@ -84,7 +88,7 @@
 
         typename node::ptr_t get_tail_()
         {
- mutex::scoped_lock lk( tail_mtx_);
+ spin_mutex::scoped_lock lk( tail_mtx_);
                 typename node::ptr_t tmp = tail_;
                 return tmp;
         }
@@ -160,6 +164,231 @@
 
         bool empty()
         {
+ spin_mutex::scoped_lock lk( head_mtx_);
+ return empty_();
+ }
+
+ void put( T const& t)
+ {
+ typename node::ptr_t new_node( new node);
+ {
+ spin_mutex::scoped_lock lk( tail_mtx_);
+
+ if ( full_() )
+ {
+ while ( active_() && full_() )
+ not_full_cond_.wait( lk);
+ }
+ if ( ! active_() )
+ throw std::runtime_error("queue is not active");
+
+ tail_->va = t;
+ tail_->next = new_node;
+ tail_ = new_node;
+ detail::atomic_fetch_add( & count_, 1);
+ }
+ not_empty_cond_.notify_one();
+ }
+
+ bool take( value_type & va)
+ {
+ spin_mutex::scoped_lock lk( head_mtx_);
+ bool empty = empty_();
+ if ( ! active_() && empty)
+ return false;
+ if ( empty)
+ {
+ try
+ {
+ while ( active_() && empty_() )
+ not_empty_cond_.wait( lk);
+ }
+ catch ( fiber_interrupted const&)
+ { return false; }
+ }
+ if ( ! active_() && empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ if ( size_() <= lwm_)
+ {
+ if ( lwm_ == hwm_)
+ not_full_cond_.notify_one();
+ else
+ // more than one producer could be waiting
+ // for submiting an action object
+ not_full_cond_.notify_all();
+ }
+ return va;
+ }
+
+ bool try_take( value_type & va)
+ {
+ spin_mutex::scoped_lock lk( head_mtx_);
+ if ( empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ bool valid = va;
+ if ( valid && size_() <= lwm_)
+ {
+ if ( lwm_ == hwm_)
+ not_full_cond_.notify_one();
+ else
+ // more than one producer could be waiting
+ // in order to submit an task
+ not_full_cond_.notify_all();
+ }
+ return valid;
+ }
+
+ template< typename R >
+ friend void intrusive_ptr_add_ref( bounded_fifo< R, spin_mutex > * p)
+ { detail::atomic_fetch_add( & p->use_count_, 1); }
+
+ template< typename R >
+ friend void intrusive_ptr_release( bounded_fifo< R, spin_mutex > * p)
+ { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
+};
+
+template< typename T >
+class bounded_fifo< T, mutex > : private noncopyable
+{
+public:
+ typedef optional< T > value_type;
+
+private:
+ struct node
+ {
+ typedef intrusive_ptr< node > ptr_t;
+
+ uint32_t use_count;
+ value_type va;
+ ptr_t next;
+
+ node() :
+ use_count( 0),
+ va(),
+ next()
+ {}
+
+ inline friend void intrusive_ptr_add_ref( node * p)
+ { ++p->use_count; }
+
+ inline friend void intrusive_ptr_release( node * p)
+ { if ( --p->use_count == 0) delete p; }
+ };
+
+ volatile uint32_t state_;
+ volatile uint32_t count_;
+ typename node::ptr_t head_;
+ mutex head_mtx_;
+ typename node::ptr_t tail_;
+ mutex tail_mtx_;
+ condition not_empty_cond_;
+ condition not_full_cond_;
+ std::size_t hwm_;
+ std::size_t lwm_;
+ volatile uint32_t use_count_;
+
+ bool active_() const
+ { return 0 == state_; }
+
+ void deactivate_()
+ { detail::atomic_fetch_add( & state_, 1); }
+
+ uint32_t size_()
+ { return count_; }
+
+ bool empty_()
+ { return head_ == get_tail_(); }
+
+ bool full_()
+ { return size_() >= hwm_; }
+
+ typename node::ptr_t get_tail_()
+ {
+ mutex::scoped_lock lk( tail_mtx_);
+ typename node::ptr_t tmp = tail_;
+ return tmp;
+ }
+
+ typename node::ptr_t pop_head_()
+ {
+ typename node::ptr_t old_head = head_;
+ head_ = old_head->next;
+ detail::atomic_fetch_sub( & count_, 1);
+ return old_head;
+ }
+
+public:
+ template< typename Strategy >
+ bounded_fifo(
+ scheduler< Strategy > & sched,
+ std::size_t const& hwm,
+ std::size_t const& lwm) :
+ state_( 0),
+ count_( 0),
+ head_( new node),
+ head_mtx_( sched),
+ tail_( head_),
+ tail_mtx_( sched),
+ not_empty_cond_( sched),
+ not_full_cond_( sched),
+ hwm_( hwm),
+ lwm_( lwm),
+ use_count_( 0)
+ {
+ if ( hwm_ < lwm_)
+ throw invalid_watermark();
+ }
+
+ template< typename Strategy >
+ bounded_fifo(
+ scheduler< Strategy > & sched,
+ std::size_t const& wm) :
+ state_( 0),
+ count_( 0),
+ head_( new node),
+ head_mtx_( sched),
+ tail_( head_),
+ tail_mtx_( sched),
+ not_empty_cond_( sched),
+ not_full_cond_( sched),
+ hwm_( wm),
+ lwm_( wm),
+ use_count_( 0)
+ {}
+
+ void upper_bound_( std::size_t hwm)
+ {
+ if ( hwm < lwm_)
+ throw invalid_watermark();
+ std::size_t tmp( hwm_);
+ hwm_ = hwm;
+ if ( hwm_ > tmp) not_full_cond_.notify_one();
+ }
+
+ std::size_t upper_bound()
+ { return hwm_; }
+
+ void lower_bound_( std::size_t lwm)
+ {
+ if ( lwm > hwm_ )
+ throw invalid_watermark();
+ std::size_t tmp( lwm_);
+ lwm_ = lwm;
+ if ( lwm_ > tmp) not_full_cond_.notify_one();
+ }
+
+ std::size_t lower_bound()
+ { return lwm_; }
+
+ void deactivate()
+ { deactivate_(); }
+
+ bool empty()
+ {
                 mutex::scoped_lock lk( head_mtx_);
                 return empty_();
         }
@@ -239,11 +468,11 @@
         }
 
         template< typename R >
- friend void intrusive_ptr_add_ref( bounded_fifo< R > * p)
+ friend void intrusive_ptr_add_ref( bounded_fifo< R, mutex > * p)
     { detail::atomic_fetch_add( & p->use_count_, 1); }
 
         template< typename R >
- friend void intrusive_ptr_release( bounded_fifo< R > * p)
+ friend void intrusive_ptr_release( bounded_fifo< R, mutex > * p)
     { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
 };
 

Modified: sandbox/fiber/boost/fiber/round_robin.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/round_robin.hpp (original)
+++ sandbox/fiber/boost/fiber/round_robin.hpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -91,6 +91,10 @@
 
         void object_notify_all( object::id const&);
 
+ fiber release( fiber::id const&);
+
+ void migrate( fiber);
+
         bool run();
 
         bool empty();

Modified: sandbox/fiber/boost/fiber/scheduler.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/scheduler.hpp (original)
+++ sandbox/fiber/boost/fiber/scheduler.hpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -77,6 +77,10 @@
         void make_fiber( std::size_t stack_size, Fn fn)
         { strategy_->add( fiber( stack_size, fn) ); }
 
+ template< typename OtherStrategy >
+ void migrate_fiber( fiber::id const& id, scheduler< OtherStrategy > & other)
+ { strategy_->migrate( other.strategy_->release( id) ); }
+
 #ifndef BOOST_FIBER_MAX_ARITY
 #define BOOST_FIBER_MAX_ARITY 10
 #endif

Modified: sandbox/fiber/boost/fiber/strategy.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/strategy.hpp (original)
+++ sandbox/fiber/boost/fiber/strategy.hpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -160,7 +160,7 @@
         virtual void interrupt( fiber::id const&) = 0;
 
         virtual void reschedule( fiber::id const&) = 0;
-
+
         virtual void cancel( fiber::id const&) = 0;
         
         virtual void yield() = 0;
@@ -175,6 +175,10 @@
 
         virtual void object_notify_all( object::id const&) = 0;
 
+ virtual fiber release( fiber::id const&) = 0;
+
+ virtual void migrate( fiber) = 0;
+
         virtual bool run() = 0;
 
         virtual bool empty() = 0;

Modified: sandbox/fiber/boost/fiber/unbounded_fifo.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/unbounded_fifo.hpp (original)
+++ sandbox/fiber/boost/fiber/unbounded_fifo.hpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -20,14 +20,20 @@
 #include <boost/fiber/detail/atomic.hpp>
 #include <boost/fiber/exceptions.hpp>
 #include <boost/fiber/mutex.hpp>
+#include <boost/fiber/scheduler.hpp>
+#include <boost/fiber/spin_condition.hpp>
+#include <boost/fiber/spin_mutex.hpp>
 
 #include <boost/config/abi_prefix.hpp>
 
 namespace boost {
 namespace fibers {
 
+template< typename T, typename M >
+class unbounded_fifo;
+
 template< typename T >
-class unbounded_fifo : private noncopyable
+class unbounded_fifo< T, spin_mutex > : private noncopyable
 {
 
 public:
@@ -57,10 +63,10 @@
 
         volatile uint32_t state_;
         typename node::ptr_t head_;
- mutex head_mtx_;
+ spin_mutex head_mtx_;
         typename node::ptr_t tail_;
- mutex tail_mtx_;
- condition not_empty_cond_;
+ spin_mutex tail_mtx_;
+ spin_condition not_empty_cond_;
         uint32_t use_count_;
 
         bool active_() const
@@ -74,7 +80,7 @@
 
         typename node::ptr_t get_tail_()
         {
- mutex::scoped_lock lk( tail_mtx_);
+ spin_mutex::scoped_lock lk( tail_mtx_);
                 typename node::ptr_t tmp = tail_;
                 return tmp;
         }
@@ -102,6 +108,142 @@
 
         bool empty()
         {
+ spin_mutex::scoped_lock lk( head_mtx_);
+ return empty_();
+ }
+
+ void put( T const& t)
+ {
+ typename node::ptr_t new_node( new node);
+ {
+ spin_mutex::scoped_lock lk( tail_mtx_);
+
+ tail_->va = t;
+ tail_->next = new_node;
+ tail_ = new_node;
+ }
+ not_empty_cond_.notify_one();
+ }
+
+ bool take( value_type & va)
+ {
+ spin_mutex::scoped_lock lk( head_mtx_);
+ bool empty = empty_();
+ if ( ! active_() && empty)
+ return false;
+ if ( empty)
+ {
+ try
+ {
+ while ( active_() && empty_() )
+ not_empty_cond_.wait( lk);
+ }
+ catch ( fiber_interrupted const&)
+ { return false; }
+ }
+ if ( ! active_() && empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ return va;
+ }
+
+ bool try_take( value_type & va)
+ {
+ spin_mutex::scoped_lock lk( head_mtx_);
+ if ( empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ return va;
+ }
+
+ template< typename R >
+ friend void intrusive_ptr_add_ref( unbounded_fifo< R, spin_mutex > * p)
+ { detail::atomic_fetch_add( & p->use_count_, 1); }
+
+ template< typename R >
+ friend void intrusive_ptr_release( unbounded_fifo< R, spin_mutex > * p)
+ { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
+};
+
+template< typename T >
+class unbounded_fifo< T, mutex > : private noncopyable
+{
+
+public:
+ typedef optional< T > value_type;
+
+private:
+ struct node
+ {
+ typedef intrusive_ptr< node > ptr_t;
+
+ uint32_t use_count;
+ value_type va;
+ ptr_t next;
+
+ node() :
+ use_count( 0),
+ va(),
+ next()
+ {}
+
+ inline friend void intrusive_ptr_add_ref( node * p)
+ { ++p->use_count; }
+
+ inline friend void intrusive_ptr_release( node * p)
+ { if ( --p->use_count == 0) delete p; }
+ };
+
+ volatile uint32_t state_;
+ typename node::ptr_t head_;
+ mutex head_mtx_;
+ typename node::ptr_t tail_;
+ mutex tail_mtx_;
+ condition not_empty_cond_;
+ uint32_t use_count_;
+
+ bool active_() const
+ { return 0 == state_; }
+
+ void deactivate_()
+ { detail::atomic_fetch_add( & state_, 1); }
+
+ bool empty_()
+ { return head_ == get_tail_(); }
+
+ typename node::ptr_t get_tail_()
+ {
+ mutex::scoped_lock lk( tail_mtx_);
+ typename node::ptr_t tmp = tail_;
+ return tmp;
+ }
+
+ typename node::ptr_t pop_head_()
+ {
+ typename node::ptr_t old_head = head_;
+ head_ = old_head->next;
+ return old_head;
+ }
+
+public:
+ template< typename Strategy >
+ unbounded_fifo( scheduler< Strategy > & sched) :
+ state_( 0),
+ head_( new node),
+ head_mtx_( sched),
+ tail_( head_),
+ tail_mtx_( sched),
+ not_empty_cond_( sched),
+ use_count_( 0)
+ {}
+
+ void deactivate()
+ { deactivate_(); }
+
+ bool empty()
+ {
                 mutex::scoped_lock lk( head_mtx_);
                 return empty_();
         }
@@ -153,11 +295,11 @@
         }
 
         template< typename R >
- friend void intrusive_ptr_add_ref( unbounded_fifo< R > * p)
+ friend void intrusive_ptr_add_ref( unbounded_fifo< R, mutex > * p)
     { detail::atomic_fetch_add( & p->use_count_, 1); }
 
         template< typename R >
- friend void intrusive_ptr_release( unbounded_fifo< R > * p)
+ friend void intrusive_ptr_release( unbounded_fifo< R, mutex > * p)
     { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
 };
 

Modified: sandbox/fiber/change.log
==============================================================================
--- sandbox/fiber/change.log (original)
+++ sandbox/fiber/change.log 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -1,3 +1,14 @@
+0.3.0:
+------
+- explicit migration of fibers between two schedulers
+ (even if schedulers are in different threads - see example migrate_mt)
+- locks replaced by locks implementation from boost.thread
+- introduction of spin_mutex, spin_condition, spin_auto_reset_event, ...
+ (can be used to sync fibers between threads)
+- introduction of mutex, condition, auto_reset_event, ...
+ (objects bounded to one scheduler, but more efficient than spin counter-parts)
+- refactoring of internal classes
+
 0.2.0:
 ------
 - classes disable_interruption and restore_interruption moved into namespace boost::this_fiber

Modified: sandbox/fiber/libs/fiber/examples/Jamfile.v2
==============================================================================
--- sandbox/fiber/libs/fiber/examples/Jamfile.v2 (original)
+++ sandbox/fiber/libs/fiber/examples/Jamfile.v2 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -29,5 +29,5 @@
 exe interrupt : interrupt.cpp ;
 exe join : join.cpp ;
 exe ping_pong : ping_pong.cpp ;
-exe simple_mt : simple_mt.cpp ;
+exe migrate_mt : migrate_mt.cpp ;
 exe simple : simple.cpp ;

Added: sandbox/fiber/libs/fiber/examples/migrate_mt.cpp
==============================================================================
--- (empty file)
+++ sandbox/fiber/libs/fiber/examples/migrate_mt.cpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -0,0 +1,124 @@
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+
+#include <boost/bind.hpp>
+#include <boost/ref.hpp>
+#include <boost/system/system_error.hpp>
+#include <boost/thread.hpp>
+
+#include <boost/fiber.hpp>
+
+void g( std::string const& str, int n)
+{
+ for ( int i = 0; i < n; ++i)
+ {
+ std::ostringstream os1;
+ std::ostringstream os2;
+ os1 << boost::this_thread::get_id();
+ os2 << boost::this_fiber::get_id();
+ fprintf( stderr, "(thread: %s, fiber: %s) %d: %s\n", os1.str().c_str(), os2.str().c_str(), i, str.c_str() );
+ boost::this_fiber::yield();
+ }
+}
+
+void fn1(
+ boost::fiber::id const& id,
+ boost::barrier & b,
+ boost::fibers::scheduler<> & sched2,
+ std::string const& msg, int n)
+{
+ std::ostringstream os;
+ os << boost::this_thread::get_id();
+ fprintf( stderr, "start (thread1: %s)\n", os.str().c_str() );
+
+ boost::fibers::scheduler<> sched1;
+ sched1.make_fiber( & g, msg, n);
+
+ for ( int i = 0; i < 2; ++i)
+ sched1.run();
+
+ b.wait();
+
+ sched1.migrate_fiber( id, sched2);
+
+ b.wait();
+
+ std::ostringstream id_os;
+ id_os << id;
+
+ fprintf( stderr, "thread1: fiber %s migrated\n", id_os.str().c_str() );
+ fprintf( stderr, "thread1: scheduler runs %d fibers\n", static_cast< int >( sched1.size() ) );
+
+ for (;;)
+ {
+ while ( sched1.run() );
+ if ( sched1.empty() ) break;
+ }
+
+ fprintf( stderr, "finish (thread1: %s)\n", os.str().c_str() );
+}
+
+void fn2(
+ boost::fiber & f,
+ boost::barrier & b,
+ boost::fibers::scheduler<> & sched)
+{
+ std::ostringstream os;
+ os << boost::this_thread::get_id();
+ fprintf( stderr, "start (thread2: %s)\n", os.str().c_str() );
+
+ sched.submit_fiber( f);
+
+ sched.run();
+ sched.run();
+
+ b.wait();
+ b.wait();
+
+ fprintf( stderr, "thread2: scheduler runs %d fibers\n", static_cast< int >( sched.size() ) );
+
+ fprintf( stderr, "finish (thread2: %s)\n", os.str().c_str() );
+}
+
+int main()
+{
+ try
+ {
+ boost::fibers::scheduler<> sched;
+ boost::barrier b( 2);
+
+ boost::fiber f( & g, "xyz", 4);
+
+ std::cout << "start" << std::endl;
+
+ boost::thread th1(
+ fn1,
+ f.get_id(),
+ boost::ref( b),
+ boost::ref( sched),
+ "abc", 5);
+ boost::thread th2(
+ fn2,
+ f,
+ boost::ref( b),
+ boost::ref( sched) );
+
+ th1.join();
+ th2.join();
+
+ std::cout << "finish" << std::endl;
+
+ return EXIT_SUCCESS;
+ }
+ catch ( boost::system::system_error const& e)
+ { std::cerr << "system_error: " << e.code().value() << std::endl; }
+ catch ( boost::fibers::scheduler_error const& e)
+ { std::cerr << "scheduler_error: " << e.what() << std::endl; }
+ catch ( std::exception const& e)
+ { std::cerr << "exception: " << e.what() << std::endl; }
+ catch (...)
+ { std::cerr << "unhandled exception" << std::endl; }
+ return EXIT_FAILURE;
+}

Modified: sandbox/fiber/libs/fiber/examples/ping_pong.cpp
==============================================================================
--- sandbox/fiber/libs/fiber/examples/ping_pong.cpp (original)
+++ sandbox/fiber/libs/fiber/examples/ping_pong.cpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -10,7 +10,8 @@
 
 #include <boost/fiber.hpp>
 
-typedef boost::fibers::unbounded_fifo< std::string > fifo_t;
+typedef boost::fibers::unbounded_fifo< std::string, boost::fibers::mutex > fifo_t;
+//typedef boost::fibers::unbounded_fifo< std::string, boost::fibers::spin_mutex > fifo_t;
 typedef boost::intrusive_ptr< fifo_t > fifo_ptr_t;
 inline
 void ping(
@@ -64,8 +65,11 @@
 
 void f( boost::fibers::scheduler<> & sched)
 {
- fifo_ptr_t buf1( new fifo_t() );
- fifo_ptr_t buf2( new fifo_t() );
+ fifo_ptr_t buf1( new fifo_t( sched) );
+ fifo_ptr_t buf2( new fifo_t( sched) );
+
+// fifo_ptr_t buf1( new fifo_t() );
+// fifo_ptr_t buf2( new fifo_t() );
         
         sched.make_fiber( & ping, buf1, buf2);
         sched.make_fiber( & pong, buf2, buf1);

Deleted: sandbox/fiber/libs/fiber/examples/simple_mt.cpp
==============================================================================
--- sandbox/fiber/libs/fiber/examples/simple_mt.cpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
+++ (empty file)
@@ -1,80 +0,0 @@
-#include <cstdlib>
-#include <iostream>
-#include <sstream>
-#include <string>
-
-#include <boost/bind.hpp>
-#include <boost/ref.hpp>
-#include <boost/system/system_error.hpp>
-#include <boost/thread.hpp>
-
-#include <boost/fiber.hpp>
-
-void f( std::string const& str, int n)
-{
- for ( int i = 0; i < n; ++i)
- {
- std::ostringstream os1;
- std::ostringstream os2;
- os1 << boost::this_thread::get_id();
- os2 << boost::this_fiber::get_id();
- fprintf( stderr, "(thread: %s, fiber: %s) %d: %s\n", os1.str().c_str(), os2.str().c_str(), i, str.c_str() );
- boost::this_fiber::yield();
- }
-}
-
-void run_thread(
- boost::barrier & b,
- boost::fibers::scheduler<> & sched,
- std::string const& msg, int n)
-{
- std::ostringstream os;
- os << boost::this_thread::get_id();
- fprintf( stderr, "start (thread: %s)\n", os.str().c_str() );
- sched.make_fiber( & f, msg, n);
-
- b.wait();
- for (;;)
- {
- while ( sched.run() );
- if ( sched.empty() ) break;
- }
-
- fprintf( stderr, "finish (thread: %s)\n", os.str().c_str() );
-}
-
-int main()
-{
- try
- {
- boost::fibers::scheduler<> sched;
-
- std::cout << "start" << std::endl;
-
- boost::barrier b( 2);
- boost::thread th1(
- run_thread,
- boost::ref( b),
- boost::ref( sched), "abc", 3);
- boost::thread th2(
- run_thread,
- boost::ref( b),
- boost::ref( sched), "xyz", 4);
-
- th1.join();
- th2.join();
-
- std::cout << "finish" << std::endl;
-
- return EXIT_SUCCESS;
- }
- catch ( boost::system::system_error const& e)
- { std::cerr << "system_error: " << e.code().value() << std::endl; }
- catch ( boost::fibers::scheduler_error const& e)
- { std::cerr << "scheduler_error: " << e.what() << std::endl; }
- catch ( std::exception const& e)
- { std::cerr << "exception: " << e.what() << std::endl; }
- catch (...)
- { std::cerr << "unhandled exception" << std::endl; }
- return EXIT_FAILURE;
-}

Modified: sandbox/fiber/libs/fiber/src/round_robin.cpp
==============================================================================
--- sandbox/fiber/libs/fiber/src/round_robin.cpp (original)
+++ sandbox/fiber/libs/fiber/src/round_robin.cpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -353,6 +353,50 @@
         oi->second.clear();
 }
 
+fiber
+round_robin::release( fiber::id const& id)
+{
+ fiber_map::iterator fi = fibers_.find( id);
+ if ( fi == fibers_.end() )
+ throw scheduler_error("fiber not managed by scheduler");
+ schedulable s( fi->second);
+ fiber f( s.f);
+ BOOST_ASSERT( f);
+ BOOST_ASSERT( ! is_master( f) );
+ if ( ! in_state_ready( f) || ! s.joining_fibers.empty() )
+ throw scheduler_error("fiber can not be released");
+ BOOST_ASSERT( ! s.waiting_on_fiber);
+ BOOST_ASSERT( ! s.waiting_on_object);
+
+ runnable_fibers_.remove( id);
+ fibers_.erase( id);
+
+ return f;
+}
+
+void
+round_robin::migrate( fiber f)
+{
+ BOOST_ASSERT( f);
+ BOOST_ASSERT( in_state_ready( f) );
+
+ // attach to this scheduler
+ attach( f);
+
+ // insert fiber to fiber-list
+ std::pair< std::map< fiber::id, schedulable >::iterator, bool > result(
+ fibers_.insert(
+ std::make_pair(
+ f.get_id(),
+ schedulable( f) ) ) );
+
+ // check result
+ if ( ! result.second) throw scheduler_error("inserting fiber failed");
+
+ // put fiber to runnable-queue
+ runnable_fibers_.push_back( result.first->first);
+}
+
 bool
 round_robin::run()
 {

Modified: sandbox/fiber/libs/fiber/test/test_scheduler.cpp
==============================================================================
--- sandbox/fiber/libs/fiber/test/test_scheduler.cpp (original)
+++ sandbox/fiber/libs/fiber/test/test_scheduler.cpp 2009-12-08 15:25:31 EST (Tue, 08 Dec 2009)
@@ -102,6 +102,24 @@
         BOOST_CHECK_EQUAL( 1, value2);
 }
 
+void yield1_fn()
+{
+ for ( int i = 0; i < 5; ++i)
+ {
+ ++value1;
+ boost::this_fiber::yield();
+ }
+}
+
+void yield2_fn()
+{
+ for ( int i = 0; i < 5; ++i)
+ {
+ ++value2;
+ boost::this_fiber::yield();
+ }
+}
+
 void test_case_1()
 {
         value1 = 0;
@@ -262,6 +280,69 @@
         BOOST_CHECK_EQUAL( 1, value3);
 }
 
+void test_case_6()
+{
+ value1 = 0;
+ value2 = 0;
+
+ boost::fibers::scheduler<> sched1, sched2;
+
+ boost::fiber f( & yield1_fn);
+ boost::fiber::id id = f.get_id();
+ sched1.submit_fiber( f);
+ sched2.make_fiber( & yield2_fn);
+
+ BOOST_CHECK( ! sched1.empty() );
+ BOOST_CHECK( ! sched2.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 1), sched1.size() );
+ BOOST_CHECK_EQUAL( std::size_t( 1), sched2.size() );
+ BOOST_CHECK_EQUAL( 0, value1);
+ BOOST_CHECK_EQUAL( 0, value2);
+
+ BOOST_CHECK( sched1.run() );
+ BOOST_CHECK( ! sched1.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 1), sched1.size() );
+ BOOST_CHECK_EQUAL( 1, value1);
+ BOOST_CHECK_EQUAL( 0, value2);
+
+ BOOST_CHECK( sched2.run() );
+ BOOST_CHECK( ! sched2.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 1), sched2.size() );
+ BOOST_CHECK_EQUAL( 1, value1);
+ BOOST_CHECK_EQUAL( 1, value2);
+
+ BOOST_CHECK( sched1.run() );
+ BOOST_CHECK( ! sched1.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 1), sched1.size() );
+ BOOST_CHECK_EQUAL( 2, value1);
+ BOOST_CHECK_EQUAL( 1, value2);
+
+ BOOST_CHECK( sched2.run() );
+ BOOST_CHECK( ! sched2.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 1), sched2.size() );
+ BOOST_CHECK_EQUAL( 2, value1);
+ BOOST_CHECK_EQUAL( 2, value2);
+
+ sched2.migrate_fiber( id, sched1);
+ BOOST_CHECK_EQUAL( std::size_t( 0), sched1.size() );
+ BOOST_CHECK_EQUAL( std::size_t( 2), sched2.size() );
+
+ BOOST_CHECK( ! sched1.run() );
+ BOOST_CHECK( sched1.empty() );
+
+ BOOST_CHECK( sched2.run() );
+ BOOST_CHECK( ! sched2.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 2), sched2.size() );
+ BOOST_CHECK_EQUAL( 2, value1);
+ BOOST_CHECK_EQUAL( 3, value2);
+
+ BOOST_CHECK( sched2.run() );
+ BOOST_CHECK( ! sched2.empty() );
+ BOOST_CHECK_EQUAL( std::size_t( 2), sched2.size() );
+ BOOST_CHECK_EQUAL( 3, value1);
+ BOOST_CHECK_EQUAL( 3, value2);
+}
+
 boost::unit_test::test_suite * init_unit_test_suite( int, char* [])
 {
         boost::unit_test::test_suite * test =
@@ -272,6 +353,7 @@
         test->add( BOOST_TEST_CASE( & test_case_3) );
         test->add( BOOST_TEST_CASE( & test_case_4) );
         test->add( BOOST_TEST_CASE( & test_case_5) );
+ test->add( BOOST_TEST_CASE( & test_case_6) );
 
         return test;
 }


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk