|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r58318 - sandbox/fiber/boost/fiber
From: oliver.kowalke_at_[hidden]
Date: 2009-12-12 05:03:41
Author: olli
Date: 2009-12-12 05:03:41 EST (Sat, 12 Dec 2009)
New Revision: 58318
URL: http://svn.boost.org/trac/boost/changeset/58318
Log:
added
Text files modified:
sandbox/fiber/boost/fiber/bounded_fifo.hpp | 608 ++++++++++++++-------------------------
sandbox/fiber/boost/fiber/condition.hpp | 82 ++++
sandbox/fiber/boost/fiber/unbounded_fifo.hpp | 346 +++++++---------------
3 files changed, 398 insertions(+), 638 deletions(-)
Modified: sandbox/fiber/boost/fiber/bounded_fifo.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/bounded_fifo.hpp (original)
+++ sandbox/fiber/boost/fiber/bounded_fifo.hpp 2009-12-12 05:03:41 EST (Sat, 12 Dec 2009)
@@ -11,315 +11,246 @@
#include <stdexcept>
#include <boost/cstdint.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/condition.hpp>
-#include <boost/fiber/detail/atomic.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
-#include <boost/fiber/spin_condition.hpp>
-#include <boost/fiber/spin_mutex.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost {
namespace fibers {
-template< typename T, typename M >
-class bounded_fifo;
-
template< typename T >
-class bounded_fifo< T, spin_mutex > : private noncopyable
+class bounded_fifo
{
public:
typedef optional< T > value_type;
private:
- struct node
+ class impl : private noncopyable
{
- typedef intrusive_ptr< node > ptr_t;
+ private:
+ struct node
+ {
+ typedef intrusive_ptr< node > ptr_t;
- uint32_t use_count;
- value_type va;
- ptr_t next;
-
- node() :
- use_count( 0),
- va(),
- next()
- {}
+ uint32_t use_count;
+ value_type va;
+ ptr_t next;
+
+ node() :
+ use_count( 0),
+ va(),
+ next()
+ {}
+
+ inline friend void intrusive_ptr_add_ref( node * p)
+ { ++p->use_count; }
+
+ inline friend void intrusive_ptr_release( node * p)
+ { if ( --p->use_count == 0) delete p; }
+ };
+
+ uint32_t state_;
+ uint32_t count_;
+ typename node::ptr_t head_;
+ mutex head_mtx_;
+ typename node::ptr_t tail_;
+ mutex tail_mtx_;
+ condition not_empty_cond_;
+ condition not_full_cond_;
+ std::size_t hwm_;
+ std::size_t lwm_;
+ uint32_t use_count_;
+
+ bool active_() const
+ { return 0 == state_; }
+
+ void deactivate_()
+ { ++state_; }
+
+ uint32_t size_()
+ { return count_; }
- inline friend void intrusive_ptr_add_ref( node * p)
- { ++p->use_count; }
-
- inline friend void intrusive_ptr_release( node * p)
- { if ( --p->use_count == 0) delete p; }
- };
+ bool empty_()
+ { return head_ == get_tail_(); }
- volatile uint32_t state_;
- volatile uint32_t count_;
- typename node::ptr_t head_;
- spin_mutex head_mtx_;
- typename node::ptr_t tail_;
- spin_mutex tail_mtx_;
- spin_condition not_empty_cond_;
- spin_condition not_full_cond_;
- std::size_t hwm_;
- std::size_t lwm_;
- volatile uint32_t use_count_;
-
- bool active_() const
- { return 0 == state_; }
-
- void deactivate_()
- { detail::atomic_fetch_add( & state_, 1); }
-
- uint32_t size_()
- { return count_; }
+ bool full_()
+ { return size_() >= hwm_; }
- bool empty_()
- { return head_ == get_tail_(); }
-
- bool full_()
- { return size_() >= hwm_; }
+ typename node::ptr_t get_tail_()
+ {
+ mutex::scoped_lock lk( tail_mtx_);
+ typename node::ptr_t tmp = tail_;
+ return tmp;
+ }
- typename node::ptr_t get_tail_()
- {
- spin_mutex::scoped_lock lk( tail_mtx_);
- typename node::ptr_t tmp = tail_;
- return tmp;
- }
+ typename node::ptr_t pop_head_()
+ {
+ typename node::ptr_t old_head = head_;
+ head_ = old_head->next;
+ --count_;
+ return old_head;
+ }
- typename node::ptr_t pop_head_()
- {
- typename node::ptr_t old_head = head_;
- head_ = old_head->next;
- detail::atomic_fetch_sub( & count_, 1);
- return old_head;
- }
+ public:
+ template< typename Strategy >
+ impl(
+ scheduler< Strategy > & sched,
+ std::size_t const& hwm,
+ std::size_t const& lwm) :
+ state_( 0),
+ count_( 0),
+ head_( new node),
+ head_mtx_( sched),
+ tail_( head_),
+ tail_mtx_( sched),
+ not_empty_cond_( sched),
+ not_full_cond_( sched),
+ hwm_( hwm),
+ lwm_( lwm),
+ use_count_( 0)
+ {
+ if ( hwm_ < lwm_)
+ throw invalid_watermark();
+ }
-public:
- bounded_fifo(
- std::size_t const& hwm,
- std::size_t const& lwm) :
- state_( 0),
- count_( 0),
- head_( new node),
- head_mtx_(),
- tail_( head_),
- tail_mtx_(),
- not_empty_cond_(),
- not_full_cond_(),
- hwm_( hwm),
- lwm_( lwm),
- use_count_( 0)
- {
- if ( hwm_ < lwm_)
- throw invalid_watermark();
- }
-
- bounded_fifo( std::size_t const& wm) :
- state_( 0),
- count_( 0),
- head_( new node),
- head_mtx_(),
- tail_( head_),
- tail_mtx_(),
- not_empty_cond_(),
- not_full_cond_(),
- hwm_( wm),
- lwm_( wm),
- use_count_( 0)
- {}
+ template< typename Strategy >
+ impl(
+ scheduler< Strategy > & sched,
+ std::size_t const& wm) :
+ state_( 0),
+ count_( 0),
+ head_( new node),
+ head_mtx_( sched),
+ tail_( head_),
+ tail_mtx_( sched),
+ not_empty_cond_( sched),
+ not_full_cond_( sched),
+ hwm_( wm),
+ lwm_( wm),
+ use_count_( 0)
+ {}
- void upper_bound_( std::size_t hwm)
- {
- if ( hwm < lwm_)
- throw invalid_watermark();
- std::size_t tmp( hwm_);
- hwm_ = hwm;
- if ( hwm_ > tmp) not_full_cond_.notify_one();
- }
+ void upper_bound( std::size_t hwm)
+ {
+ if ( hwm < lwm_)
+ throw invalid_watermark();
+ std::size_t tmp( hwm_);
+ hwm_ = hwm;
+ if ( hwm_ > tmp) not_full_cond_.notify_one();
+ }
- std::size_t upper_bound()
- { return hwm_; }
+ std::size_t upper_bound()
+ { return hwm_; }
- void lower_bound_( std::size_t lwm)
- {
- if ( lwm > hwm_ )
- throw invalid_watermark();
- std::size_t tmp( lwm_);
- lwm_ = lwm;
- if ( lwm_ > tmp) not_full_cond_.notify_one();
- }
-
- std::size_t lower_bound()
- { return lwm_; }
+ void lower_bound( std::size_t lwm)
+ {
+ if ( lwm > hwm_ )
+ throw invalid_watermark();
+ std::size_t tmp( lwm_);
+ lwm_ = lwm;
+ if ( lwm_ > tmp) not_full_cond_.notify_one();
+ }
- void deactivate()
- { deactivate_(); }
+ std::size_t lower_bound()
+ { return lwm_; }
- bool empty()
- {
- spin_mutex::scoped_lock lk( head_mtx_);
- return empty_();
- }
+ void deactivate()
+ { deactivate_(); }
- void put( T const& t)
- {
- typename node::ptr_t new_node( new node);
+ bool empty()
{
- spin_mutex::scoped_lock lk( tail_mtx_);
+ mutex::scoped_lock lk( head_mtx_);
+ return empty_();
+ }
- if ( full_() )
+ void put( T const& t)
+ {
+ typename node::ptr_t new_node( new node);
{
- while ( active_() && full_() )
- not_full_cond_.wait( lk);
- }
- if ( ! active_() )
- throw std::runtime_error("queue is not active");
+ mutex::scoped_lock lk( tail_mtx_);
- tail_->va = t;
- tail_->next = new_node;
- tail_ = new_node;
- detail::atomic_fetch_add( & count_, 1);
+ if ( full_() )
+ {
+ while ( active_() && full_() )
+ not_full_cond_.wait( lk);
+ }
+ if ( ! active_() )
+ throw std::runtime_error("queue is not active");
+
+ tail_->va = t;
+ tail_->next = new_node;
+ tail_ = new_node;
+ ++count_;
+ }
+ not_empty_cond_.notify_one();
}
- not_empty_cond_.notify_one();
- }
- bool take( value_type & va)
- {
- spin_mutex::scoped_lock lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
+ bool take( value_type & va)
{
- try
+ mutex::scoped_lock lk( head_mtx_);
+ bool empty = empty_();
+ if ( ! active_() && empty)
+ return false;
+ if ( empty)
{
- while ( active_() && empty_() )
- not_empty_cond_.wait( lk);
+ try
+ {
+ while ( active_() && empty_() )
+ not_empty_cond_.wait( lk);
+ }
+ catch ( fiber_interrupted const&)
+ { return false; }
}
- catch ( fiber_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // for submiting an action object
- not_full_cond_.notify_all();
+ if ( ! active_() && empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ if ( size_() <= lwm_)
+ {
+ if ( lwm_ == hwm_)
+ not_full_cond_.notify_one();
+ else
+ // more than one producer could be waiting
+ // for submiting an action object
+ not_full_cond_.notify_all();
+ }
+ return va;
}
- return va;
- }
- bool try_take( value_type & va)
- {
- spin_mutex::scoped_lock lk( head_mtx_);
- if ( empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- bool valid = va;
- if ( valid && size_() <= lwm_)
+ bool try_take( value_type & va)
{
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // in order to submit an task
- not_full_cond_.notify_all();
+ mutex::scoped_lock lk( head_mtx_);
+ if ( empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ bool valid = va;
+ if ( valid && size_() <= lwm_)
+ {
+ if ( lwm_ == hwm_)
+ not_full_cond_.notify_one();
+ else
+ // more than one producer could be waiting
+ // in order to submit an task
+ not_full_cond_.notify_all();
+ }
+ return valid;
}
- return valid;
- }
-
- template< typename R >
- friend void intrusive_ptr_add_ref( bounded_fifo< R, spin_mutex > * p)
- { detail::atomic_fetch_add( & p->use_count_, 1); }
-
- template< typename R >
- friend void intrusive_ptr_release( bounded_fifo< R, spin_mutex > * p)
- { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
-};
-template< typename T >
-class bounded_fifo< T, mutex > : private noncopyable
-{
-public:
- typedef optional< T > value_type;
+ friend void intrusive_ptr_add_ref( impl * p)
+ { ++( p->use_count_s); }
-private:
- struct node
- {
- typedef intrusive_ptr< node > ptr_t;
-
- uint32_t use_count;
- value_type va;
- ptr_t next;
-
- node() :
- use_count( 0),
- va(),
- next()
- {}
-
- inline friend void intrusive_ptr_add_ref( node * p)
- { ++p->use_count; }
-
- inline friend void intrusive_ptr_release( node * p)
- { if ( --p->use_count == 0) delete p; }
+ friend void intrusive_ptr_release( impl * p)
+ { if ( --( & p->use_count_) == 1) delete p; }
};
- volatile uint32_t state_;
- volatile uint32_t count_;
- typename node::ptr_t head_;
- mutex head_mtx_;
- typename node::ptr_t tail_;
- mutex tail_mtx_;
- condition not_empty_cond_;
- condition not_full_cond_;
- std::size_t hwm_;
- std::size_t lwm_;
- volatile uint32_t use_count_;
-
- bool active_() const
- { return 0 == state_; }
-
- void deactivate_()
- { detail::atomic_fetch_add( & state_, 1); }
-
- uint32_t size_()
- { return count_; }
-
- bool empty_()
- { return head_ == get_tail_(); }
-
- bool full_()
- { return size_() >= hwm_; }
-
- typename node::ptr_t get_tail_()
- {
- mutex::scoped_lock lk( tail_mtx_);
- typename node::ptr_t tmp = tail_;
- return tmp;
- }
-
- typename node::ptr_t pop_head_()
- {
- typename node::ptr_t old_head = head_;
- head_ = old_head->next;
- detail::atomic_fetch_sub( & count_, 1);
- return old_head;
- }
+ intrusive_ptr< impl > impl_;
public:
template< typename Strategy >
@@ -327,153 +258,42 @@
scheduler< Strategy > & sched,
std::size_t const& hwm,
std::size_t const& lwm) :
- state_( 0),
- count_( 0),
- head_( new node),
- head_mtx_( sched),
- tail_( head_),
- tail_mtx_( sched),
- not_empty_cond_( sched),
- not_full_cond_( sched),
- hwm_( hwm),
- lwm_( lwm),
- use_count_( 0)
- {
- if ( hwm_ < lwm_)
- throw invalid_watermark();
- }
-
+ impl_( new impl( sched, hwm, lwm) )
+ {}
+
template< typename Strategy >
bounded_fifo(
scheduler< Strategy > & sched,
std::size_t const& wm) :
- state_( 0),
- count_( 0),
- head_( new node),
- head_mtx_( sched),
- tail_( head_),
- tail_mtx_( sched),
- not_empty_cond_( sched),
- not_full_cond_( sched),
- hwm_( wm),
- lwm_( wm),
- use_count_( 0)
+ impl_( new impl( sched, wm) )
{}
-
- void upper_bound_( std::size_t hwm)
- {
- if ( hwm < lwm_)
- throw invalid_watermark();
- std::size_t tmp( hwm_);
- hwm_ = hwm;
- if ( hwm_ > tmp) not_full_cond_.notify_one();
- }
-
+
+ void upper_bound( std::size_t hwm)
+ { impl_->upper_bound( hwm); }
+
std::size_t upper_bound()
- { return hwm_; }
-
- void lower_bound_( std::size_t lwm)
- {
- if ( lwm > hwm_ )
- throw invalid_watermark();
- std::size_t tmp( lwm_);
- lwm_ = lwm;
- if ( lwm_ > tmp) not_full_cond_.notify_one();
- }
-
+ { return impl_->upper_bound(); }
+
+ void lower_bound( std::size_t lwm)
+ { impl_->lower_bound( lwm); }
+
std::size_t lower_bound()
- { return lwm_; }
-
+ { return impl_->lower_bound(); }
+
void deactivate()
- { deactivate_(); }
-
+ { impl_->deactivate(); }
+
bool empty()
- {
- mutex::scoped_lock lk( head_mtx_);
- return empty_();
- }
-
+ { return impl_->empty(); }
+
void put( T const& t)
- {
- typename node::ptr_t new_node( new node);
- {
- mutex::scoped_lock lk( tail_mtx_);
-
- if ( full_() )
- {
- while ( active_() && full_() )
- not_full_cond_.wait( lk);
- }
- if ( ! active_() )
- throw std::runtime_error("queue is not active");
-
- tail_->va = t;
- tail_->next = new_node;
- tail_ = new_node;
- detail::atomic_fetch_add( & count_, 1);
- }
- not_empty_cond_.notify_one();
- }
-
+ { impl_->put( t); }
+
bool take( value_type & va)
- {
- mutex::scoped_lock lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- while ( active_() && empty_() )
- not_empty_cond_.wait( lk);
- }
- catch ( fiber_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- if ( size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // for submiting an action object
- not_full_cond_.notify_all();
- }
- return va;
- }
-
+ { return impl_->take( va);}
+
bool try_take( value_type & va)
- {
- mutex::scoped_lock lk( head_mtx_);
- if ( empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- bool valid = va;
- if ( valid && size_() <= lwm_)
- {
- if ( lwm_ == hwm_)
- not_full_cond_.notify_one();
- else
- // more than one producer could be waiting
- // in order to submit an task
- not_full_cond_.notify_all();
- }
- return valid;
- }
-
- template< typename R >
- friend void intrusive_ptr_add_ref( bounded_fifo< R, mutex > * p)
- { detail::atomic_fetch_add( & p->use_count_, 1); }
-
- template< typename R >
- friend void intrusive_ptr_release( bounded_fifo< R, mutex > * p)
- { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
+ { return impl_->try_take( va); }
};
}}
Modified: sandbox/fiber/boost/fiber/condition.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/condition.hpp (original)
+++ sandbox/fiber/boost/fiber/condition.hpp 2009-12-12 05:03:41 EST (Sat, 12 Dec 2009)
@@ -9,12 +9,15 @@
#ifndef BOOST_FIBERS_CONDITION_H
#define BOOST_FIBERS_CONDITION_H
+#include <boost/assert.hpp>
#include <boost/cstdint.hpp>
#include <boost/utility.hpp>
+#include <boost/thread/locks.hpp>
-#include <boost/fiber/object/id.hpp>
+#include <boost/fiber/detail/atomic.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
+#include <boost/fiber/object/id.hpp>
#include <boost/fiber/scheduler.hpp>
#include <boost/fiber/strategy.hpp>
@@ -38,8 +41,6 @@
object::id id_;
strategy::ptr_t strategy_;
- void wait_( mutex &);
-
public:
template< typename Strategy >
condition( scheduler< Strategy > & sched) :
@@ -57,25 +58,82 @@
void notify_all();
- template< typename Lock >
- void wait( Lock & lk)
+ void wait( unique_lock< mutex > & lk)
{
if ( ! lk)
throw lock_error();
- wait_( * lk.mutex() );
+ wait( * lk.mutex() );
}
- template<
- typename Lock,
- typename Pred
- >
- void wait( Lock & lk, Pred pred)
+ template< typename Pred >
+ void wait( unique_lock< mutex > & lk, Pred pred)
{
if ( ! lk)
throw lock_error();
while ( ! pred() )
- wait_( * lk.mutex() );
+ wait( * lk.mutex() );
+ }
+
+ template< typename LockType >
+ void wait( LockType & lt)
+ {
+ {
+ mutex::scoped_lock lk( enter_mtx_);
+ BOOST_ASSERT( lk);
+ detail::atomic_fetch_add( & waiters_, 1);
+ lt.unlock();
+ }
+
+ bool unlock_enter_mtx = false;
+ for (;;)
+ {
+ while ( static_cast< uint32_t >( SLEEPING) == detail::atomic_load( & cmd_) )
+ strategy_->wait_for_object( id_);
+
+ mutex::scoped_lock lk( check_mtx_);
+ BOOST_ASSERT( lk);
+
+ uint32_t expected = static_cast< uint32_t >( NOTIFY_ONE);
+ detail::atomic_compare_exchange_strong(
+ & cmd_, & expected,
+ static_cast< uint32_t >( SLEEPING) );
+ if ( static_cast< uint32_t >( SLEEPING) == expected)
+ continue;
+ else if ( static_cast< uint32_t >( NOTIFY_ONE) == expected)
+ {
+ unlock_enter_mtx = true;
+ detail::atomic_fetch_sub( & waiters_, 1);
+ break;
+ }
+ else
+ {
+ unlock_enter_mtx = 1 == detail::atomic_fetch_sub( & waiters_, 1);
+ if ( unlock_enter_mtx)
+ {
+ expected = static_cast< uint32_t >( NOTIFY_ALL);
+ detail::atomic_compare_exchange_strong(
+ & cmd_, & expected,
+ static_cast< uint32_t >( SLEEPING) );
+ }
+ break;
+ }
+ }
+
+ if ( unlock_enter_mtx)
+ enter_mtx_.unlock();
+
+ lt.lock();
+ }
+
+ template<
+ typename LockType,
+ typename Pred
+ >
+ void wait( LockType & lt, Pred pred)
+ {
+ while ( ! pred() )
+ wait( lt);
}
};
Modified: sandbox/fiber/boost/fiber/unbounded_fifo.hpp
==============================================================================
--- sandbox/fiber/boost/fiber/unbounded_fifo.hpp (original)
+++ sandbox/fiber/boost/fiber/unbounded_fifo.hpp 2009-12-12 05:03:41 EST (Sat, 12 Dec 2009)
@@ -11,296 +11,178 @@
#include <boost/config.hpp>
#include <boost/cstdint.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/optional.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/condition.hpp>
-#include <boost/fiber/detail/atomic.hpp>
#include <boost/fiber/exceptions.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/scheduler.hpp>
-#include <boost/fiber/spin_condition.hpp>
-#include <boost/fiber/spin_mutex.hpp>
#include <boost/config/abi_prefix.hpp>
namespace boost {
namespace fibers {
-template< typename T, typename M >
-class unbounded_fifo;
-
template< typename T >
-class unbounded_fifo< T, spin_mutex > : private noncopyable
+class unbounded_fifo
{
-
public:
typedef optional< T > value_type;
private:
- struct node
+ class impl : private noncopyable
{
- typedef intrusive_ptr< node > ptr_t;
-
- uint32_t use_count;
- value_type va;
- ptr_t next;
-
- node() :
- use_count( 0),
- va(),
- next()
- {}
-
- inline friend void intrusive_ptr_add_ref( node * p)
- { ++p->use_count; }
-
- inline friend void intrusive_ptr_release( node * p)
- { if ( --p->use_count == 0) delete p; }
- };
+ private:
+ struct node
+ {
+ typedef intrusive_ptr< node > ptr_t;
- volatile uint32_t state_;
- typename node::ptr_t head_;
- spin_mutex head_mtx_;
- typename node::ptr_t tail_;
- spin_mutex tail_mtx_;
- spin_condition not_empty_cond_;
- uint32_t use_count_;
-
- bool active_() const
- { return 0 == state_; }
+ uint32_t use_count;
+ value_type va;
+ ptr_t next;
+
+ node() :
+ use_count( 0),
+ va(),
+ next()
+ {}
+
+ inline friend void intrusive_ptr_add_ref( node * p)
+ { ++p->use_count; }
+
+ inline friend void intrusive_ptr_release( node * p)
+ { if ( --p->use_count == 0) delete p; }
+ };
+
+ uint32_t state_;
+ typename node::ptr_t head_;
+ mutex head_mtx_;
+ typename node::ptr_t tail_;
+ mutex tail_mtx_;
+ condition not_empty_cond_;
+ uint32_t use_count_;
+
+ bool active_() const
+ { return 0 == state_; }
- void deactivate_()
- { detail::atomic_fetch_add( & state_, 1); }
+ void deactivate_()
+ { ++state_; }
- bool empty_()
- { return head_ == get_tail_(); }
+ bool empty_()
+ { return head_ == get_tail_(); }
- typename node::ptr_t get_tail_()
- {
- spin_mutex::scoped_lock lk( tail_mtx_);
- typename node::ptr_t tmp = tail_;
- return tmp;
- }
+ typename node::ptr_t get_tail_()
+ {
+ mutex::scoped_lock lk( tail_mtx_);
+ typename node::ptr_t tmp = tail_;
+ return tmp;
+ }
- typename node::ptr_t pop_head_()
- {
- typename node::ptr_t old_head = head_;
- head_ = old_head->next;
- return old_head;
- }
+ typename node::ptr_t pop_head_()
+ {
+ typename node::ptr_t old_head = head_;
+ head_ = old_head->next;
+ return old_head;
+ }
-public:
- unbounded_fifo() :
- state_( 0),
- head_( new node),
- head_mtx_(),
- tail_( head_),
- tail_mtx_(),
- not_empty_cond_(),
- use_count_( 0)
- {}
+ public:
+ template< typename Strategy >
+ impl( scheduler< Strategy > & sched) :
+ state_( 0),
+ head_( new node),
+ head_mtx_( sched),
+ tail_( head_),
+ tail_mtx_( sched),
+ not_empty_cond_( sched),
+ use_count_( 0)
+ {}
- void deactivate()
- { deactivate_(); }
+ void deactivate()
+ { deactivate_(); }
- bool empty()
- {
- spin_mutex::scoped_lock lk( head_mtx_);
- return empty_();
- }
+ bool empty()
+ {
+ mutex::scoped_lock lk( head_mtx_);
+ return empty_();
+ }
- void put( T const& t)
- {
- typename node::ptr_t new_node( new node);
+ void put( T const& t)
{
- spin_mutex::scoped_lock lk( tail_mtx_);
+ typename node::ptr_t new_node( new node);
+ {
+ mutex::scoped_lock lk( tail_mtx_);
- tail_->va = t;
- tail_->next = new_node;
- tail_ = new_node;
+ tail_->va = t;
+ tail_->next = new_node;
+ tail_ = new_node;
+ }
+ not_empty_cond_.notify_one();
}
- not_empty_cond_.notify_one();
- }
- bool take( value_type & va)
- {
- spin_mutex::scoped_lock lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
+ bool take( value_type & va)
{
- try
+ mutex::scoped_lock lk( head_mtx_);
+ bool empty = empty_();
+ if ( ! active_() && empty)
+ return false;
+ if ( empty)
{
- while ( active_() && empty_() )
- not_empty_cond_.wait( lk);
+ try
+ {
+ while ( active_() && empty_() )
+ not_empty_cond_.wait( lk);
+ }
+ catch ( fiber_interrupted const&)
+ { return false; }
}
- catch ( fiber_interrupted const&)
- { return false; }
+ if ( ! active_() && empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ return va;
}
- if ( ! active_() && empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- return va;
- }
- bool try_take( value_type & va)
- {
- spin_mutex::scoped_lock lk( head_mtx_);
- if ( empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- return va;
- }
-
- template< typename R >
- friend void intrusive_ptr_add_ref( unbounded_fifo< R, spin_mutex > * p)
- { detail::atomic_fetch_add( & p->use_count_, 1); }
-
- template< typename R >
- friend void intrusive_ptr_release( unbounded_fifo< R, spin_mutex > * p)
- { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
-};
-
-template< typename T >
-class unbounded_fifo< T, mutex > : private noncopyable
-{
-
-public:
- typedef optional< T > value_type;
-
-private:
- struct node
- {
- typedef intrusive_ptr< node > ptr_t;
+ bool try_take( value_type & va)
+ {
+ mutex::scoped_lock lk( head_mtx_);
+ if ( empty_() )
+ return false;
+ swap( va, head_->va);
+ pop_head_();
+ return va;
+ }
- uint32_t use_count;
- value_type va;
- ptr_t next;
-
- node() :
- use_count( 0),
- va(),
- next()
- {}
+ friend void intrusive_ptr_add_ref( impl * p)
+ { ++( p->use_count_); }
- inline friend void intrusive_ptr_add_ref( node * p)
- { ++p->use_count; }
-
- inline friend void intrusive_ptr_release( node * p)
- { if ( --p->use_count == 0) delete p; }
+ friend void intrusive_ptr_release( impl * p)
+ { if ( --( p->use_count_) == 1) delete p; }
};
- volatile uint32_t state_;
- typename node::ptr_t head_;
- mutex head_mtx_;
- typename node::ptr_t tail_;
- mutex tail_mtx_;
- condition not_empty_cond_;
- uint32_t use_count_;
-
- bool active_() const
- { return 0 == state_; }
-
- void deactivate_()
- { detail::atomic_fetch_add( & state_, 1); }
-
- bool empty_()
- { return head_ == get_tail_(); }
-
- typename node::ptr_t get_tail_()
- {
- mutex::scoped_lock lk( tail_mtx_);
- typename node::ptr_t tmp = tail_;
- return tmp;
- }
-
- typename node::ptr_t pop_head_()
- {
- typename node::ptr_t old_head = head_;
- head_ = old_head->next;
- return old_head;
- }
+ intrusive_ptr< impl > impl_;
public:
template< typename Strategy >
unbounded_fifo( scheduler< Strategy > & sched) :
- state_( 0),
- head_( new node),
- head_mtx_( sched),
- tail_( head_),
- tail_mtx_( sched),
- not_empty_cond_( sched),
- use_count_( 0)
+ impl_( new impl( sched) )
{}
void deactivate()
- { deactivate_(); }
+ { impl_->deactivate(); }
bool empty()
- {
- mutex::scoped_lock lk( head_mtx_);
- return empty_();
- }
+ { return impl_->empty(); }
void put( T const& t)
- {
- typename node::ptr_t new_node( new node);
- {
- mutex::scoped_lock lk( tail_mtx_);
-
- tail_->va = t;
- tail_->next = new_node;
- tail_ = new_node;
- }
- not_empty_cond_.notify_one();
- }
+ { impl_->put( t); }
bool take( value_type & va)
- {
- mutex::scoped_lock lk( head_mtx_);
- bool empty = empty_();
- if ( ! active_() && empty)
- return false;
- if ( empty)
- {
- try
- {
- while ( active_() && empty_() )
- not_empty_cond_.wait( lk);
- }
- catch ( fiber_interrupted const&)
- { return false; }
- }
- if ( ! active_() && empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- return va;
- }
+ { return impl_->take( va); }
bool try_take( value_type & va)
- {
- mutex::scoped_lock lk( head_mtx_);
- if ( empty_() )
- return false;
- swap( va, head_->va);
- pop_head_();
- return va;
- }
-
- template< typename R >
- friend void intrusive_ptr_add_ref( unbounded_fifo< R, mutex > * p)
- { detail::atomic_fetch_add( & p->use_count_, 1); }
-
- template< typename R >
- friend void intrusive_ptr_release( unbounded_fifo< R, mutex > * p)
- { if ( detail::atomic_fetch_sub( & p->use_count_, 1) == 1) delete p; }
+ { return impl_->try_take( va); }
};
}}
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk