Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r56542 - in sandbox/task: boost boost/task boost/task/detail libs/task/build libs/task/examples libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-10-03 05:04:50


Author: olli
Date: 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
New Revision: 56542
URL: http://svn.boost.org/trac/boost/changeset/56542

Log:
- spin_lock, spin_mutex, spin_condition added
- unbounded_buffer uses spinwait-objects
- examples buffer_multi and buffer_multi2 enhanced (using buffer synchonization)

Added:
   sandbox/task/boost/task/spin_condition.hpp (contents, props changed)
   sandbox/task/boost/task/spin_lock.hpp (contents, props changed)
   sandbox/task/boost/task/spin_mutex.hpp (contents, props changed)
   sandbox/task/libs/task/src/spin_condition.cpp (contents, props changed)
   sandbox/task/libs/task/src/spin_mutex.cpp (contents, props changed)
Text files modified:
   sandbox/task/boost/task.hpp | 3
   sandbox/task/boost/task/detail/atomic_aix.hpp | 4
   sandbox/task/boost/task/detail/atomic_gcc.hpp | 13 ++
   sandbox/task/boost/task/detail/atomic_gcc_ppc.hpp | 26 +++++
   sandbox/task/boost/task/detail/atomic_gcc_x86.hpp | 17 +++
   sandbox/task/boost/task/detail/atomic_hpux.hpp | 12 ++
   sandbox/task/boost/task/detail/atomic_interlocked.hpp | 12 ++
   sandbox/task/boost/task/detail/atomic_interprocess.hpp | 18 +++
   sandbox/task/boost/task/detail/atomic_solaris.hpp | 12 ++
   sandbox/task/boost/task/detail/atomic_sync.hpp | 12 ++
   sandbox/task/boost/task/exceptions.hpp | 16 ++
   sandbox/task/boost/task/unbounded_buffer.hpp | 187 ++++++++++++++++++++++++++++++++-------
   sandbox/task/libs/task/build/Jamfile.v2 | 4
   sandbox/task/libs/task/examples/buffer_multi.cpp | 39 +++-----
   sandbox/task/libs/task/examples/buffer_multi2.cpp | 37 +++----
   sandbox/task/libs/task/examples/no_deadlock_pool2.cpp | 4
   16 files changed, 324 insertions(+), 92 deletions(-)

Modified: sandbox/task/boost/task.hpp
==============================================================================
--- sandbox/task/boost/task.hpp (original)
+++ sandbox/task/boost/task.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -25,6 +25,9 @@
 #include <boost/task/poolsize.hpp>
 #include <boost/task/scanns.hpp>
 #include <boost/task/semaphore.hpp>
+#include <boost/task/spin_condition.hpp>
+#include <boost/task/spin_lock.hpp>
+#include <boost/task/spin_mutex.hpp>
 #include <boost/task/stacksize.hpp>
 #include <boost/task/static_pool.hpp>
 #include <boost/task/task.hpp>

Modified: sandbox/task/boost/task/detail/atomic_aix.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_aix.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_aix.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -26,6 +26,10 @@
 { * object = desired; }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{ return ::compare_and_swap( object, expected, desired); }
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);

Modified: sandbox/task/boost/task/detail/atomic_gcc.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_gcc.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_gcc.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -17,6 +17,7 @@
 
 #include <boost/assert.hpp>
 #include <boost/cstdint.hpp>
+#include <boost/interprocess/detail/atomic.hpp>
 
 #include <boost/config/abi_prefix.hpp>
 
@@ -40,6 +41,18 @@
 }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = interprocess::detail::atomic_cas32( object, desired, * expected);
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);

Modified: sandbox/task/boost/task/detail/atomic_gcc_ppc.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_gcc_ppc.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_gcc_ppc.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -33,6 +33,32 @@
 }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = * expected;
+
+ __asm__ __volatile__
+ (
+ "0:\n\t"
+ "lwarx %0,0,%1\n\t"
+ "cmpw %0,%3\n\t"
+ "bne- 1f\n\t"
+ "stwcx. %2,0,%1\n\t"
+ "bne- 0b\n\t"
+ "1:"
+ : "=&r"( * expected)
+ : "b" ( object), "r" ( desired), "r" ( * expected)
+ : "memory", "cc"
+ );
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         int object_ = static_cast< int >( object);

Modified: sandbox/task/boost/task/detail/atomic_gcc_x86.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_gcc_x86.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_gcc_x86.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -27,6 +27,23 @@
 }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = * expected;
+
+ __asm__ __volatile__
+ (
+ "lock\n\t"
+ "cmpxchg %3, %1"
+ : "=a" ( * expected), "=m" ( * object)
+ : "a" ( prev), "r" ( desired)
+ : "memory", "cc"
+ );
+
+ return prev == * expected;
+}
+
+inline
 long atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         int operand_ = static_cast< int >( operand);

Modified: sandbox/task/boost/task/detail/atomic_hpux.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_hpux.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_hpux.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -25,6 +25,18 @@
 { * object = desired; }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = ::atomic_cas_32( object, * expected, desired);
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);

Modified: sandbox/task/boost/task/detail/atomic_interlocked.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_interlocked.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_interlocked.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -22,6 +22,18 @@
 { BOOST_INTERLOCKED_EXCHANGE( reinterpret_cast< long volatile * >( object), desired); }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = BOOST_INTERLOCKED_COMPARE_EXCHANGE( object, desired, * expected);
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);

Modified: sandbox/task/boost/task/detail/atomic_interprocess.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_interprocess.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_interprocess.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -18,18 +18,30 @@
 namespace detail
 {
 inline
-void atomic_exchange( volatile uint32_t * object, uint32_t desired)
+void atomic_exchange( uint32_t volatile * object, uint32_t desired)
 { interprocess::detail::atomic_write32( object, desired); }
 
 inline
-unsigned int atomic_fetch_add( volatile uint32_t * object, uint32_t operand)
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = interprocess::detail::atomic_cas32( object, desired, * expected);
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
+unsigned int atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);
         return interprocess::detail::atomic_inc32( object);
 }
 
 inline
-unsigned int atomic_fetch_sub( volatile uint32_t * object, uint32_t operand)
+unsigned int atomic_fetch_sub( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);
         return interprocess::detail::atomic_dec32( object);

Modified: sandbox/task/boost/task/detail/atomic_solaris.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_solaris.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_solaris.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -26,6 +26,18 @@
 { * object = desired; }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = ::atomic_cas_32( object, * expected, desired);
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);

Modified: sandbox/task/boost/task/detail/atomic_sync.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic_sync.hpp (original)
+++ sandbox/task/boost/task/detail/atomic_sync.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -30,6 +30,18 @@
 { * object = desired; }
 
 inline
+bool atomic_compare_exchange_strong( uint32_t volatile * object, uint32_t * expected, uint32_t desired)
+{
+ uint32_t prev = __sync_val_compare_and_swap( object, * expected, desired);
+ if ( prev != * expected)
+ {
+ * expected = prev;
+ return false;
+ }
+ return true;
+}
+
+inline
 uint32_t atomic_fetch_add( uint32_t volatile * object, uint32_t operand)
 {
         BOOST_ASSERT( operand == 1);

Modified: sandbox/task/boost/task/exceptions.hpp
==============================================================================
--- sandbox/task/boost/task/exceptions.hpp (original)
+++ sandbox/task/boost/task/exceptions.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -12,8 +12,9 @@
 
 #include <boost/config/abi_prefix.hpp>
 
-namespace boost { namespace task
-{
+namespace boost {
+namespace task {
+
 class invalid_poolsize : public std::invalid_argument
 {
 public:
@@ -104,7 +105,16 @@
         : std::logic_error("pool moved")
         {}
 };
-} }
+
+class lock_error : public std::logic_error
+{
+public:
+ lock_error()
+ : std::logic_error("lock invalid")
+ {}
+};
+
+}}
 
 #include <boost/config/abi_suffix.hpp>
 

Added: sandbox/task/boost/task/spin_condition.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/spin_condition.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -0,0 +1,123 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_SPIN_CONDITION_H
+#define BOOST_TASK_SPIN_CONDITION_H
+
+#include <boost/cstdint.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <boost/utility.hpp>
+
+#include <boost/task/exceptions.hpp>
+#include <boost/task/spin_mutex.hpp>
+
+namespace boost {
+namespace task {
+
+class spin_condition : private noncopyable
+{
+private:
+ enum command
+ {
+ SLEEPING = 0,
+ NOTIFY_ONE,
+ NOTIFY_ALL
+ };
+
+ volatile uint32_t cmd_;
+ volatile uint32_t waiters_;
+ spin_mutex enter_mtx_;
+ spin_mutex check_mtx_;
+
+ spin_condition( spin_condition const&);
+ spin_condition & operator=( spin_condition const&);
+
+ void wait_( spin_mutex &);
+ bool wait_( spin_mutex &, system_time const&);
+ void notify_( command);
+
+public:
+ spin_condition();
+
+ void notify_one();
+
+ void notify_all();
+
+ template< typename Lock >
+ void wait( Lock & lk)
+ {
+ if ( ! lk)
+ throw lock_error();
+ wait_( * lk.mutex() );
+ }
+
+ template<
+ typename Lock,
+ typename Pred
+ >
+ void wait( Lock & lk, Pred pred)
+ {
+ if ( ! lk)
+ throw lock_error();
+
+ while ( ! pred() )
+ wait_( * lk.mutex() );
+ }
+
+ template< typename Lock >
+ bool timed_wait( Lock & lk, system_time const& abs_time)
+ {
+ if ( abs_time.is_infinity() )
+ {
+ wait_( lk);
+ return true;
+ }
+
+ if ( ! lk)
+ throw lock_error();
+ return wait_( * lk.mutex(), abs_time);
+ }
+
+ template<
+ typename Lock,
+ typename Pred
+ >
+ bool timed_wait( Lock & lk, system_time const& abs_time, Pred pred)
+ {
+ if ( abs_time.is_infinity() )
+ {
+ wait_( lk, pred);
+ return true;
+ }
+
+ if ( ! lk)
+ throw lock_error();
+
+ while ( ! pred() )
+ if ( ! wait_( * lk.mutex(), abs_time) )
+ return pred();
+ return true;
+ }
+
+ template<
+ typename Lock,
+ typename TimeDuration
+ >
+ bool timed_wait( Lock & lk, TimeDuration const& rel_time)
+ { return timed_wait( lk, get_system_time() + rel_time); }
+
+ template<
+ typename Lock,
+ typename TimeDuration,
+ typename Pred
+ >
+ bool timed_wait( Lock & lk, TimeDuration const& rel_time, Pred pred)
+ { return timed_wait( lk, get_system_time() + rel_time, pred); }
+};
+
+}}
+
+#endif // BOOST_TASK_SPIN_CONDITION_H

Added: sandbox/task/boost/task/spin_lock.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/spin_lock.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -0,0 +1,112 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_SPIN_LOCK_H
+#define BOOST_TASK_SPIN_LOCK_H
+
+#include <algorithm>
+
+#include <boost/thread/thread_time.hpp>
+
+#include <boost/task/exceptions.hpp>
+
+namespace boost {
+namespace task {
+
+template< typename Mutex >
+class spin_lock
+{
+private:
+ typedef bool spin_lock::*unspecified_bool_type;
+
+ Mutex * mtx_;
+ bool locked_;
+
+ spin_lock( spin_lock &);
+ spin_lock & operator=( spin_lock &);
+
+public:
+ explicit spin_lock( Mutex & mtx)
+ : mtx_( & mtx), locked_( false)
+ {
+ mtx_->lock();
+ locked_ = true;
+ }
+
+ spin_lock( Mutex & mtx, system_time const& abs_time)
+ : mtx_( & mtx), locked_( mtx_->timed_lock( abs_time) )
+ {}
+
+ template< typename TimeDuration >
+ spin_lock( Mutex & mtx, TimeDuration const& rel_time)
+ : mtx_( & mtx), locked_( mtx_->timed_lock( rel_time) )
+ {}
+
+ ~spin_lock()
+ {
+ try
+ { if ( locked_ && mtx_) mtx_->unlock(); }
+ catch (...) {}
+ }
+
+ void lock()
+ {
+ if ( ! mtx_ || locked_)
+ throw lock_error();
+ mtx_->lock();
+ locked_ = true;
+ }
+
+ bool try_lock()
+ {
+ if ( ! mtx_ || locked_)
+ throw lock_error();
+ locked_ = mtx_->try_lock();
+ return locked_;
+ }
+
+ bool timed_lock( system_time const& abs_time)
+ {
+ if ( ! mtx_ || locked_)
+ throw lock_error();
+ locked_ = mtx_->timed_lock( abs_time);
+ return locked_;
+ }
+
+ template< typename TimeDuration >
+ bool timed_lock( TimeDuration const& rel_time)
+ { return timed_lock( get_system_time() + rel_time); }
+
+ void unlock()
+ {
+ if ( ! mtx_ || ! locked_)
+ throw lock_error();
+ mtx_->unlock();
+ locked_ = false;
+ }
+
+ bool owns() const
+ { return locked_ && mtx_; }
+
+ operator unspecified_bool_type() const
+ { return locked_ ? & locked_ : 0; }
+
+ bool operator!() const
+ { return ! locked_; }
+
+ Mutex * mutex() const
+ { return mtx_; }
+
+ void swap( spin_lock & other)
+ {
+ std::swap( mtx_, other.mtx_);
+ std::swap( locked_, other.locked_);
+ }
+};
+
+}}
+
+#endif // BOOST_TASK_SPIN_LOCK_H

Added: sandbox/task/boost/task/spin_mutex.hpp
==============================================================================
--- (empty file)
+++ sandbox/task/boost/task/spin_mutex.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -0,0 +1,40 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TASK_SPIN_MUTEX_H
+#define BOOST_TASK_SPIN_MUTEX_H
+
+#include <boost/cstdint.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <boost/utility.hpp>
+
+namespace boost {
+namespace task {
+
+class spin_mutex : private noncopyable
+{
+private:
+ volatile uint32_t state_;
+
+public:
+ spin_mutex();
+
+ void lock();
+
+ bool try_lock();
+
+ bool timed_lock( system_time const& abs_time);
+
+ template< typename TimeDuration >
+ bool timed_lock( TimeDuration const& rel_time)
+ { return timed_lock( get_system_time() + rel_time); }
+
+ void unlock();
+};
+
+}}
+
+#endif // BOOST_TASK_SPIN_MUTEX_H

Modified: sandbox/task/boost/task/unbounded_buffer.hpp
==============================================================================
--- sandbox/task/boost/task/unbounded_buffer.hpp (original)
+++ sandbox/task/boost/task/unbounded_buffer.hpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -15,7 +15,12 @@
 #include <boost/shared_ptr.hpp>
 #include <boost/utility.hpp>
 
+#include <boost/task/detail/atomic.hpp>
+#include <boost/task/exceptions.hpp>
 #include <boost/task/semaphore.hpp>
+#include <boost/task/spin_condition.hpp>
+#include <boost/task/spin_lock.hpp>
+#include <boost/task/spin_mutex.hpp>
 
 #include <boost/config/abi_prefix.hpp>
 
@@ -25,62 +30,152 @@
 class unbounded_buffer
 {
 private:
+ typedef T value_type;
+
         class base
         {
         private:
- class guard : private noncopyable
- {
- private:
- semaphore & sem_;
-
- public:
- guard( semaphore & sem)
- : sem_( sem)
- { sem_.wait(); }
-
- ~guard()
- { sem_.post(); }
- };
-
- semaphore protection_sem_;
- semaphore consumer_sem_;
- std::deque< T > queue_;
+ volatile uint32_t state_;
+ std::deque< value_type > queue_;
+ spin_mutex mtx_;
+ spin_condition not_empty_cond_;
 
                 base( base &);
                 base & operator=( base const&);
 
+ bool active_() const
+ { return 0 == state_; }
+
+ void deactivate_()
+ { detail::atomic_fetch_add( & state_, 1); }
+
                 bool empty_() const
                 { return queue_.empty(); }
 
+ void put_( value_type const& va)
+ {
+ if ( ! active_() )
+ throw task_rejected("queue is not active");
+ queue_.push_back( va);
+ not_empty_cond_.notify_one();
+ }
+
+ bool take_(
+ optional< value_type > & va,
+ spin_lock< spin_mutex > & lk)
+ {
+ bool empty = empty_();
+ if ( ! active_() && empty)
+ return false;
+ if ( empty)
+ {
+ try
+ {
+ not_empty_cond_.wait(
+ lk,
+ bind(
+ & base::consumers_activate_,
+ this) );
+ }
+ catch ( thread_interrupted const&)
+ { return false; }
+ }
+ if ( ! active_() && empty_() )
+ return false;
+ va = queue_.front();
+ queue_.pop_front();
+ return va;
+ }
+
+ template< typename Duration >
+ bool take_(
+ optional< value_type > & va,
+ Duration const& rel_time,
+ spin_lock< spin_mutex > & lk)
+ {
+ bool empty = empty_();
+ if ( ! active_() && empty)
+ return false;
+ if ( empty)
+ {
+ try
+ {
+ if ( ! not_empty_cond_.timed_wait(
+ lk,
+ rel_time,
+ bind(
+ & base::consumers_activate_,
+ this) ) )
+ return false;
+ }
+ catch ( thread_interrupted const&)
+ { return false; }
+ }
+ if ( ! active_() && empty_() )
+ return false;
+ va = queue_.front();
+ queue_.pop_front();
+ return va;
+ }
+
+ bool try_take_( optional< value_type > & va)
+ {
+ if ( empty_() )
+ return false;
+ va = queue_.front();
+ queue_.pop_front();
+ return va;
+ }
+
                 bool consumers_activate_() const
- { return ! empty_(); }
+ { return ! active_() || ! empty_(); }
 
         public:
                 base()
                 :
- protection_sem_( 1),
- consumer_sem_( 0),
- queue_()
+ state_( 0),
+ queue_(),
+ mtx_(),
+ not_empty_cond_()
                 {}
 
- void put( T const& t)
+ bool active()
+ { return active_(); }
+
+ void deactivate()
+ { deactivate_(); }
+
+ bool empty()
                 {
- {
- guard lk( protection_sem_);
- queue_.push_back( t);
- }
- consumer_sem_.post();
+ spin_lock< spin_mutex > lk( mtx_);
+ return empty_();
                 }
 
- bool take( optional< T > & t)
+ void put( value_type const& va)
                 {
- consumer_sem_.wait();
- {
- guard lk( protection_sem_);
- t = queue_.front();
- queue_.pop_front();
- }
- return true;
+ spin_lock< spin_mutex > lk( mtx_);
+ put_( va);
+ }
+
+ bool take( optional< value_type > & va)
+ {
+ spin_lock< spin_mutex > lk( mtx_);
+ return take_( va, lk);
+ }
+
+ template< typename TimeDuration >
+ bool take(
+ optional< value_type > & va,
+ TimeDuration const& rel_time)
+ {
+ spin_lock< spin_mutex > lk( mtx_);
+ return take_( va, rel_time, lk);
+ }
+
+ bool try_take( optional< value_type > & va)
+ {
+ spin_lock< spin_mutex > lk( mtx_);
+ return try_take_( va);
                 }
         };
 
@@ -91,11 +186,29 @@
         : impl_( new base)
         {}
 
+ bool active()
+ { return impl_->active(); }
+
+ void deactivate()
+ { impl_->deactivate(); }
+
+ bool empty()
+ { return impl_->empty(); }
+
         void put( T const& t)
         { impl_->put( t); }
 
- void take( optional< T > & t)
- { impl_->take( t); }
+ bool take( optional< T > & t)
+ { return impl_->take( t); }
+
+ template< typename TimeDuration >
+ bool take(
+ optional< T > & t,
+ TimeDuration const& rel_time)
+ { return impl_->take( t, rel_time); }
+
+ bool try_take( optional< T > & t)
+ { return impl_->try_take_( t); }
 };
 }}
 

Modified: sandbox/task/libs/task/build/Jamfile.v2
==============================================================================
--- sandbox/task/libs/task/build/Jamfile.v2 (original)
+++ sandbox/task/libs/task/build/Jamfile.v2 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -41,6 +41,8 @@
         poolsize.cpp
         scanns.cpp
         semaphore_windows.cpp
+ spin_condition.cpp
+ spin_mutex.cpp
         stacksize.cpp
         watermark.cpp
         worker.cpp
@@ -58,6 +60,8 @@
         poolsize.cpp
         scanns.cpp
         semaphore_posix.cpp
+ spin_condition.cpp
+ spin_mutex.cpp
         stacksize.cpp
         watermark.cpp
         worker.cpp

Modified: sandbox/task/libs/task/examples/buffer_multi.cpp
==============================================================================
--- sandbox/task/libs/task/examples/buffer_multi.cpp (original)
+++ sandbox/task/libs/task/examples/buffer_multi.cpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -60,11 +60,15 @@
                 tsk::unbounded_buffer< std::pair< int , int > > & recv,
                 int n)
 {
- send.put( n);
+ for ( int i = 0; i <= n; ++i)
+ send.put( i);
+ send.deactivate();
         boost::optional< std::pair< int , int > > r;
- recv.take( r);
- BOOST_ASSERT( r);
- printf("fib(%d) == %d\n", r->first, r->second);
+ while ( recv.take( r) )
+ {
+ BOOST_ASSERT( r);
+ printf("fib(%d) == %d\n", r->first, r->second);
+ }
 }
 
 inline
@@ -73,10 +77,13 @@
                 tsk::unbounded_buffer< std::pair< int , int > > & send)
 {
         boost::optional< int > n;
- recv.take( n);
- BOOST_ASSERT( n);
- int r = parallel_fib( * n, 5);
- send.put( std::make_pair( * n, r) );
+ while ( recv.take( n) )
+ {
+ BOOST_ASSERT( n);
+ int r = parallel_fib( * n, 5);
+ send.put( std::make_pair( * n, r) );
+ }
+ send.deactivate();
 }
 
 
@@ -91,30 +98,14 @@
                 
                 tsk::handle< void > h1(
                         tsk::async(
- tsk::make_task( submit, buf1, buf2, 5),
- tsk::new_thread() ) );
- tsk::handle< void > h2(
- tsk::async(
- tsk::make_task( submit, buf1, buf2, 10),
- tsk::new_thread() ) );
- tsk::handle< void > h3(
- tsk::async(
                                 tsk::make_task( submit, buf1, buf2, 15),
                                 tsk::new_thread() ) );
 
                 tsk::async(
                         tsk::make_task( calculate, buf1, buf2),
                         pool);
- tsk::async(
- tsk::make_task( calculate, buf1, buf2),
- pool);
- tsk::async(
- tsk::make_task( calculate, buf1, buf2),
- pool);
 
                 h1.get();
- h2.get();
- h3.get();
 
                 return EXIT_SUCCESS;
         }

Modified: sandbox/task/libs/task/examples/buffer_multi2.cpp
==============================================================================
--- sandbox/task/libs/task/examples/buffer_multi2.cpp (original)
+++ sandbox/task/libs/task/examples/buffer_multi2.cpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -60,11 +60,15 @@
                 tsk::unbounded_buffer< std::pair< int , int > > & recv,
                 int n)
 {
- send.put( n);
+ for ( int i = 0; i <= n; ++i)
+ send.put( i);
+ send.deactivate();
         boost::optional< std::pair< int , int > > r;
- recv.take( r);
- BOOST_ASSERT( r);
- printf("fib(%d) == %d\n", r->first, r->second);
+ while ( recv.take( r) )
+ {
+ BOOST_ASSERT( r);
+ printf("fib(%d) == %d\n", r->first, r->second);
+ }
 }
 
 inline
@@ -73,13 +77,15 @@
                 tsk::unbounded_buffer< std::pair< int , int > > & send)
 {
         boost::optional< int > n;
- recv.take( n);
- BOOST_ASSERT( n);
- int r = parallel_fib( * n, 5);
- send.put( std::make_pair( * n, r) );
+ while ( recv.take( n) )
+ {
+ BOOST_ASSERT( n);
+ int r = parallel_fib( * n, 5);
+ send.put( std::make_pair( * n, r) );
+ }
+ send.deactivate();
 }
 
-
 int main( int argc, char *argv[])
 {
         try
@@ -90,21 +96,8 @@
                 tsk::unbounded_buffer< std::pair< int , int > > buf2;
                 
                 tsk::async(
- tsk::make_task( submit, buf1, buf2, 5),
- pool);
- tsk::async(
- tsk::make_task( submit, buf1, buf2, 10),
- pool);
- tsk::async(
                         tsk::make_task( submit, buf1, buf2, 15),
                         pool);
-
- tsk::async(
- tsk::make_task( calculate, buf1, buf2),
- pool);
- tsk::async(
- tsk::make_task( calculate, buf1, buf2),
- pool);
                 tsk::async(
                         tsk::make_task( calculate, buf1, buf2),
                         pool);

Modified: sandbox/task/libs/task/examples/no_deadlock_pool2.cpp
==============================================================================
--- sandbox/task/libs/task/examples/no_deadlock_pool2.cpp (original)
+++ sandbox/task/libs/task/examples/no_deadlock_pool2.cpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -38,9 +38,7 @@
         try
         {
                 tsk::poolsize psize( boost::thread::hardware_concurrency() );
- tsk::static_pool<
- tsk::unbounded_channel< tsk::fifo >
- > pool( psize);
+ tsk::static_pool< tsk::unbounded_twolock_fifo > pool( psize);
 
                 fprintf( stderr, "pool-size == %d\n", pool.size() );
 

Added: sandbox/task/libs/task/src/spin_condition.cpp
==============================================================================
--- (empty file)
+++ sandbox/task/libs/task/src/spin_condition.cpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -0,0 +1,195 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/task/spin_condition.hpp"
+
+#include <boost/thread.hpp>
+
+#include <boost/task/detail/atomic.hpp>
+#include <boost/task/spin_mutex.hpp>
+#include <boost/task/spin_lock.hpp>
+#include <boost/task/utility.hpp>
+
+namespace boost {
+namespace task {
+
+void
+spin_condition::notify_( command cmd)
+{
+ enter_mtx_.lock();
+
+ if ( waiters_ == 0)
+ {
+ enter_mtx_.unlock();
+ return;
+ }
+
+ uint32_t expected = static_cast< uint32_t >( SLEEPING);
+ while ( ! detail::atomic_compare_exchange_strong(
+ static_cast< uint32_t volatile* >( & cmd_), & expected, cmd) )
+ {
+ if ( this_task::runs_in_pool() )
+ this_task::block();
+ else
+ this_thread::yield();
+ }
+}
+
+void
+spin_condition::wait_( spin_mutex & mtx)
+{
+ {
+ spin_lock< spin_mutex > lk( enter_mtx_);
+ if ( ! lk) return;
+ detail::atomic_fetch_add( & waiters_, 1);
+ mtx.unlock();
+ }
+
+ bool unlock_enter_mtx = false;
+ for (;;)
+ {
+ while ( SLEEPING == cmd_)
+ {
+ if ( this_task::runs_in_pool() )
+ this_task::block();
+ else
+ this_thread::yield();
+ }
+
+ spin_lock< spin_mutex > lk( check_mtx_);
+ if ( ! lk)
+ {
+ unlock_enter_mtx = true;
+ break;
+ }
+
+ uint32_t expected = static_cast< uint32_t >( NOTIFY_ONE);
+ detail::atomic_compare_exchange_strong(
+ static_cast< uint32_t volatile* >( & cmd_), & expected, SLEEPING);
+ if ( SLEEPING == expected)
+ continue;
+ else if ( NOTIFY_ONE == expected)
+ {
+ unlock_enter_mtx = true;
+ detail::atomic_fetch_sub( & waiters_, 1);
+ break;
+ }
+ else
+ {
+ unlock_enter_mtx = 1 == detail::atomic_fetch_sub( & waiters_, 1);
+ if ( unlock_enter_mtx)
+ {
+ expected = static_cast< uint32_t >( NOTIFY_ALL);
+ detail::atomic_compare_exchange_strong(
+ static_cast< uint32_t volatile* >( & cmd_), & expected, SLEEPING);
+ }
+ break;
+ }
+ }
+
+ if ( unlock_enter_mtx)
+ enter_mtx_.unlock();
+
+ mtx.lock();
+}
+
+bool
+spin_condition::wait_( spin_mutex & mtx, system_time const& abs_time)
+{
+ if ( get_system_time() >= abs_time) return false;
+
+ {
+ spin_lock< spin_mutex > lk( enter_mtx_, abs_time);
+ if ( ! lk) return false;
+ detail::atomic_fetch_add( & waiters_, 1);
+ mtx.unlock();
+ }
+
+ bool timed_out = false, unlock_enter_mtx = false;
+ for (;;)
+ {
+ while ( SLEEPING == cmd_)
+ {
+ if ( this_task::runs_in_pool() )
+ this_task::block();
+ else
+ this_thread::yield();
+
+ if ( get_system_time() >= abs_time)
+ {
+ timed_out = enter_mtx_.try_lock();
+ if ( ! timed_out)
+ continue;
+ break;
+ }
+ }
+
+ if ( timed_out)
+ {
+ detail::atomic_fetch_sub( & waiters_, 1);
+ unlock_enter_mtx = true;
+ break;
+ }
+ else
+ {
+ spin_lock< spin_mutex > lk( check_mtx_);
+ if ( ! lk)
+ {
+ timed_out = true;
+ unlock_enter_mtx = true;
+ break;
+ }
+
+ uint32_t expected = static_cast< uint32_t >( NOTIFY_ONE);
+ detail::atomic_compare_exchange_strong(
+ static_cast< uint32_t volatile* >( & cmd_), & expected, SLEEPING);
+ if ( SLEEPING == expected)
+ continue;
+ else if ( NOTIFY_ONE == expected)
+ {
+ unlock_enter_mtx = true;
+ detail::atomic_fetch_sub( & waiters_, 1);
+ break;
+ }
+ else
+ {
+ unlock_enter_mtx = 1 == detail::atomic_fetch_sub( & waiters_, 1);
+ if ( unlock_enter_mtx)
+ {
+ expected = static_cast< uint32_t >( NOTIFY_ALL);
+ detail::atomic_compare_exchange_strong(
+ static_cast< uint32_t volatile* >( & cmd_), & expected, SLEEPING);
+ }
+ break;
+ }
+ }
+ }
+
+ if ( unlock_enter_mtx)
+ enter_mtx_.unlock();
+
+ mtx.lock();
+
+ return ! timed_out;
+}
+
+spin_condition::spin_condition()
+:
+cmd_( SLEEPING),
+waiters_( 0),
+enter_mtx_(),
+check_mtx_()
+{}
+
+void
+spin_condition::notify_one()
+{ notify_( NOTIFY_ONE); }
+
+void
+spin_condition::notify_all()
+{ notify_( NOTIFY_ALL); }
+
+}}

Added: sandbox/task/libs/task/src/spin_mutex.cpp
==============================================================================
--- (empty file)
+++ sandbox/task/libs/task/src/spin_mutex.cpp 2009-10-03 05:04:47 EDT (Sat, 03 Oct 2009)
@@ -0,0 +1,81 @@
+
+// Copyright Oliver Kowalke 2009.
+// Distributed under the Boost Software License, Version 1.0.
+// (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include "boost/task/spin_mutex.hpp"
+
+#include <boost/thread.hpp>
+
+#include "boost/task/detail/atomic.hpp"
+#include "boost/task/utility.hpp"
+
+namespace boost {
+namespace task {
+
+spin_mutex::spin_mutex()
+: state_( 0)
+{}
+
+void
+spin_mutex::lock()
+{
+ for (;;)
+ {
+ uint32_t expected = 0;
+ if ( detail::atomic_compare_exchange_strong( & state_, & expected, 1) )
+ break;
+ else
+ {
+ if ( this_task::runs_in_pool() )
+ this_task::block();
+ else
+ this_thread::yield();
+ }
+ }
+}
+
+bool
+spin_mutex::try_lock()
+{
+ uint32_t expected = 0;
+ return detail::atomic_compare_exchange_strong( & state_, & expected, 1);
+}
+
+bool
+spin_mutex::timed_lock( system_time const& abs_time)
+{
+ if ( abs_time.is_infinity() )
+ {
+ lock();
+ return true;
+ }
+
+ if ( get_system_time() >= abs_time)
+ return false;
+
+ for (;;)
+ {
+ if ( try_lock() ) break;
+
+ if ( get_system_time() >= abs_time)
+ return false;
+
+ if ( this_task::runs_in_pool() )
+ this_task::block();
+ else
+ this_thread::yield();
+ }
+
+ return true;
+}
+
+void
+spin_mutex::unlock()
+{
+ uint32_t expected = 1;
+ detail::atomic_compare_exchange_strong( & state_, & expected, 0);
+}
+
+}}


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk