Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r52422 - in sandbox/threadpool: boost boost/tp boost/tp/detail libs/tp/build libs/tp/examples libs/tp/src
From: oliver.kowalke_at_[hidden]
Date: 2009-04-16 11:57:09


Author: olli
Date: 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
New Revision: 52422
URL: http://svn.boost.org/trac/boost/changeset/52422

Log:
+ modifications related to task sematics

Added:
   sandbox/threadpool/boost/tp/detail/config.hpp (contents, props changed)
   sandbox/threadpool/boost/tp/detail/semaphore.hpp (contents, props changed)
   sandbox/threadpool/boost/tp/detail/semaphore_posix.hpp (contents, props changed)
   sandbox/threadpool/boost/tp/detail/semaphore_windows.hpp (contents, props changed)
   sandbox/threadpool/boost/tp/launch.hpp (contents, props changed)
   sandbox/threadpool/boost/tp/utility.hpp (contents, props changed)
   sandbox/threadpool/libs/tp/examples/delay.cpp (contents, props changed)
   sandbox/threadpool/libs/tp/src/semaphore_posix.cpp (contents, props changed)
   sandbox/threadpool/libs/tp/src/semaphore_windows.cpp (contents, props changed)
   sandbox/threadpool/libs/tp/src/wsq.cpp (contents, props changed)
Removed:
   sandbox/threadpool/libs/tp/examples/parallel_sort.cpp
   sandbox/threadpool/libs/tp/examples/sleep.cpp
Text files modified:
   sandbox/threadpool/boost/future.hpp | 56 -----
   sandbox/threadpool/boost/threadpool.hpp | 4
   sandbox/threadpool/boost/tp/bounded_channel.hpp | 62 ++----
   sandbox/threadpool/boost/tp/default_pool.hpp | 5
   sandbox/threadpool/boost/tp/detail/atomic.hpp | 40 +++
   sandbox/threadpool/boost/tp/detail/bind_processor.hpp | 34 +-
   sandbox/threadpool/boost/tp/detail/callable.hpp | 87 +++-----
   sandbox/threadpool/boost/tp/detail/interrupter.hpp | 3
   sandbox/threadpool/boost/tp/detail/worker.hpp | 103 ++++------
   sandbox/threadpool/boost/tp/detail/wsq.hpp | 166 ++---------------
   sandbox/threadpool/boost/tp/exceptions.hpp | 35 +-
   sandbox/threadpool/boost/tp/fifo.hpp | 35 ---
   sandbox/threadpool/boost/tp/info.hpp | 14
   sandbox/threadpool/boost/tp/lifo.hpp | 35 ---
   sandbox/threadpool/boost/tp/pool.hpp | 79 ++-----
   sandbox/threadpool/boost/tp/poolsize.hpp | 2
   sandbox/threadpool/boost/tp/priority.hpp | 26 -
   sandbox/threadpool/boost/tp/scanns.hpp | 3
   sandbox/threadpool/boost/tp/smart.hpp | 28 +-
   sandbox/threadpool/boost/tp/task.hpp | 384 +++++++++++++++++++++++----------------
   sandbox/threadpool/boost/tp/unbounded_channel.hpp | 53 +---
   sandbox/threadpool/boost/tp/watermark.hpp | 2
   sandbox/threadpool/libs/tp/build/Jamfile.v2 | 15 +
   sandbox/threadpool/libs/tp/examples/Jamfile.v2 | 4
   sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp | 37 ++-
   sandbox/threadpool/libs/tp/examples/fork_join.cpp | 44 ++-
   sandbox/threadpool/libs/tp/examples/interrupt.cpp | 13
   sandbox/threadpool/libs/tp/examples/pending.cpp | 11
   sandbox/threadpool/libs/tp/examples/priority.cpp | 26 +-
   sandbox/threadpool/libs/tp/examples/reschedule_until.cpp | 26 +-
   sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp | 12
   sandbox/threadpool/libs/tp/examples/smart.cpp | 39 ++-
   sandbox/threadpool/libs/tp/examples/submit.cpp | 6
   sandbox/threadpool/libs/tp/examples/yield.cpp | 26 +
   sandbox/threadpool/libs/tp/src/callable.cpp | 7
   sandbox/threadpool/libs/tp/src/default_pool.cpp | 3
   sandbox/threadpool/libs/tp/src/interrupter.cpp | 2
   sandbox/threadpool/libs/tp/src/poolsize.cpp | 4
   sandbox/threadpool/libs/tp/src/scanns.cpp | 4
   sandbox/threadpool/libs/tp/src/watermark.cpp | 2
   sandbox/threadpool/libs/tp/src/worker.cpp | 18 -
   sandbox/threadpool/libs/tp/src/worker_group.cpp | 1
   42 files changed, 666 insertions(+), 890 deletions(-)

Modified: sandbox/threadpool/boost/future.hpp
==============================================================================
--- sandbox/threadpool/boost/future.hpp (original)
+++ sandbox/threadpool/boost/future.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -27,10 +27,6 @@
 #include <list>
 #include <boost/next_prior.hpp>
 
-#define CATCH_ENABLE_CURRENT_EXCEPTION( Exception) \
-catch ( Exception const& e) \
-{ throw boost::enable_current_exception( e); }
-
 namespace boost
 {
     class future_uninitialized:
@@ -1207,28 +1203,7 @@
             {
                 try
                 {
- try
- { this->mark_finished_with_result(f()); }
- CATCH_ENABLE_CURRENT_EXCEPTION( boost::thread_interrupted)
- CATCH_ENABLE_CURRENT_EXCEPTION( boost::exception)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::ios_base::failure)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::domain_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::invalid_argument)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::length_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::out_of_range)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::logic_error)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::overflow_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::range_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::underflow_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::runtime_error)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_alloc)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_cast)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_typeid)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_exception)
+ this->mark_finished_with_result(f());
                 }
                 catch(...)
                 {
@@ -1253,31 +1228,8 @@
             {
                 try
                 {
- try
- {
- f();
- this->mark_finished_with_result();
- }
- CATCH_ENABLE_CURRENT_EXCEPTION( boost::thread_interrupted)
- CATCH_ENABLE_CURRENT_EXCEPTION( boost::exception)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::ios_base::failure)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::domain_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::invalid_argument)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::length_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::out_of_range)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::logic_error)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::overflow_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::range_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::underflow_error)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::runtime_error)
-
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_alloc)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_cast)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_typeid)
- CATCH_ENABLE_CURRENT_EXCEPTION( std::bad_exception)
+ f();
+ this->mark_finished_with_result();
                 }
                 catch(...)
                 {
@@ -1412,6 +1364,4 @@
         { return detail::thread_move_t< packaged_task< T > >( t); }
 }
 
-#undef CATCH_ENABLE_CURRENT_EXCEPTION
-
 #endif

Modified: sandbox/threadpool/boost/threadpool.hpp
==============================================================================
--- sandbox/threadpool/boost/threadpool.hpp (original)
+++ sandbox/threadpool/boost/threadpool.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -6,10 +6,11 @@
 #define BOOST_TP_TP_H
 
 #include <boost/tp/bounded_channel.hpp>
-#include <boost/tp/exceptions.hpp>
 #include <boost/tp/default_pool.hpp>
+#include <boost/tp/exceptions.hpp>
 #include <boost/tp/fifo.hpp>
 #include <boost/tp/info.hpp>
+#include <boost/tp/launch.hpp>
 #include <boost/tp/lifo.hpp>
 #include <boost/tp/pool.hpp>
 #include <boost/tp/poolsize.hpp>
@@ -18,6 +19,7 @@
 #include <boost/tp/smart.hpp>
 #include <boost/tp/task.hpp>
 #include <boost/tp/unbounded_channel.hpp>
+#include <boost/tp/utility.hpp>
 #include <boost/tp/watermark.hpp>
 
 #endif // BOOST_TP_TP_H

Modified: sandbox/threadpool/boost/tp/bounded_channel.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/bounded_channel.hpp (original)
+++ sandbox/threadpool/boost/tp/bounded_channel.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -17,7 +17,6 @@
 #include <boost/thread/shared_mutex.hpp>
 
 #include <boost/tp/detail/callable.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 #include <boost/tp/exceptions.hpp>
 #include <boost/tp/watermark.hpp>
 
@@ -26,15 +25,9 @@
 template< typename SchedulingPolicy >
 class bounded_channel
 {
-private:
- typedef SchedulingPolicy scheduling_policy;
- typedef typename scheduling_policy::template impl<
- detail::callable
- > queue;
-
 public:
- typedef typename queue::item item;
- typedef scheduling_policy scheduler_type;
+ typedef SchedulingPolicy scheduler_type;
+ typedef typename scheduler_type::impl::item item;
 
 private:
         enum channel_state
@@ -45,7 +38,7 @@
         };
 
         channel_state state_;
- queue queue_;
+ scheduler_type queue_;
         shared_mutex mtx_;
         condition not_empty_cond_;
         condition not_full_cond_;
@@ -98,8 +91,8 @@
                 BOOST_ASSERT( deactive_now_() );
                 std::vector< detail::callable > unprocessed;
                 unprocessed.reserve( queue_.size() );
- BOOST_FOREACH( item itm, queue_)
- { unprocessed.push_back( itm.ca() ); }
+ BOOST_FOREACH( detail::callable ca, queue_)
+ { unprocessed.push_back( ca); }
                 clear_();
                 BOOST_ASSERT( empty_() );
                 return unprocessed;
@@ -168,7 +161,6 @@
 
         bool take_(
                 detail::callable & ca,
- detail::interrupter & intr,
                 unique_lock< shared_mutex > & lk)
         {
                 if ( deactive_now_() || ( deactive_() && empty_() ) )
@@ -185,9 +177,7 @@
                 { return false; }
                 if ( deactive_now_() || ( deactive_() && empty_() ) )
                         return false;
- item itm( queue_.pop() );
- ca = itm.ca();
- intr = itm.intr();
+ ca = queue_.pop();
                 if ( size_() <= lwm_)
                 {
                         if ( lwm_ == hwm_)
@@ -203,7 +193,6 @@
         template< typename Duration >
         bool take_(
                 detail::callable & ca,
- detail::interrupter & intr,
                 Duration const& rel_time,
                 unique_lock< shared_mutex > & lk)
         {
@@ -223,37 +212,31 @@
                 { return false; }
                 if ( deactive_now_() || ( deactive_() && empty_() ) )
                         return false;
- item itm( queue_.pop() );
- ca = itm.ca();
- intr = itm.intr();
+ ca = queue_.pop();
                 if ( size_() <= lwm_)
                 {
                         if ( lwm_ == hwm_)
                                 not_full_cond_.notify_one();
                         else
                                 // more than one producer could be waiting
- // for submiting an action object
+ // in order to submit an task
                                 not_full_cond_.notify_all();
                 }
                 return ! ca.empty();
         }
 
- bool try_take_(
- detail::callable & ca,
- detail::interrupter & intr)
+ bool try_take_( detail::callable & ca)
         {
                 if ( deactive_now_() || empty_() )
                         return false;
- item itm( queue_.pop() );
- ca = itm.ca();
- intr = itm.intr();
+ ca = queue_.pop();
                 if ( size_() <= lwm_)
                 {
                         if ( lwm_ == hwm_)
                                 not_full_cond_.notify_one();
                         else
                                 // more than one producer could be waiting
- // for submiting an action object
+ // in order to submit an task
                                 not_full_cond_.notify_all();
                 }
                 return ! ca.empty();
@@ -363,45 +346,40 @@
                 return size_();
         }
 
- void put( item const& itm)
+ void put( detail::callable const& ca)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- put_( itm, lk);
+ put_( ca, lk);
         }
 
         template< typename Duration >
         void put(
- item const& itm,
+ detail::callable const& ca,
                 Duration const& rel_time)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- put_( itm, rel_time, lk);
+ put_( ca, rel_time, lk);
         }
 
- bool take(
- detail::callable & ca,
- detail::interrupter & intr)
+ bool take( detail::callable & ca)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, intr, lk);
+ return take_( ca, lk);
         }
 
         template< typename Duration >
         bool take(
                 detail::callable & ca,
- detail::interrupter & intr,
                 Duration const& rel_time)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, intr, rel_time, lk);
+ return take_( ca, rel_time, lk);
         }
 
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
+ bool try_take( detail::callable & ca)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- return try_take_( ca, intr);
+ return try_take_( ca);
         }
 };
 } }

Modified: sandbox/threadpool/boost/tp/default_pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/default_pool.hpp (original)
+++ sandbox/threadpool/boost/tp/default_pool.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -7,7 +7,6 @@
 
 #include <boost/tp/fifo.hpp>
 #include <boost/tp/pool.hpp>
-#include <boost/tp/task.hpp>
 #include <boost/tp/unbounded_channel.hpp>
 
 namespace boost { namespace tp
@@ -23,10 +22,6 @@
 inline
 default_pool & get_default_pool()
 { return detail::static_pool::instance; }
-
-template< typename Act >
-task< typename result_of< Act() >::type > lauch_in_pool( Act const& act)
-{ return get_default_pool().submit( act); }
 } }
 
 #endif // BOOST_TP_DEFAULT_POOL_H

Modified: sandbox/threadpool/boost/tp/detail/atomic.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/atomic.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/atomic.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -5,12 +5,12 @@
 #ifndef BOOST_TP_DETAIL_ATOMIC_H
 #define BOOST_TP_DETAIL_ATOMIC_H
 
-#include <boost/config.hpp>
+#include <boost/tp/detail/config.hpp>
 
 namespace boost { namespace tp {
 namespace detail
 {
-#if (defined BOOST_WINDOWS) && !(defined BOOST_DISABLE_WIN32)
+# if defined( BOOST_WINDOWS_OS)
 
 #include <Windows.h>
 
@@ -34,7 +34,31 @@
 unsigned int atomic_dec_32( volatile unsigned int * object)
 { return InterlockedDecrement( reinterpret_cast< volatile LONG * >( object) ); }
 
-#elif defined(__hpux)
+# elif defined(BOOST_AIX_OS)
+
+#include <sys/atomic_ops.h>
+
+template< typename T >
+bool atomic_compare_exchange_ptr( volatile T * object, T expected, T desired)
+{ return ::compare_and_swap( object, expected, & desired); }
+
+inline
+void atomic_write_32( volatile unsigned int * object, unsigned int desired)
+{ * object = desired; }
+
+template< typename T >
+void atomic_write_ptr( volatile T * object, T * desired)
+{ * object = desired; }
+
+inline
+unsigned int atomic_inc_32( volatile unsigned int * object)
+{ return ::fetch_and_add( object, 1); }
+
+inline
+unsigned int atomic_dec_32( volatile unsigned int * object)
+{ return ::fetch_and_add( object, -1); }
+
+# elif defined(BOOST_HPUX_OS)
 
 #include <atomic.h>
 
@@ -58,7 +82,7 @@
 unsigned int atomic_dec_32( volatile unsigned int * object)
 { return ::atomic_dec( object); }
 
-#elif (defined(sun) || defined(__sun))
+# elif defined(BOOST_SOLARIS_OS)
 
 #include <atomic.h>
 
@@ -107,11 +131,9 @@
 unsigned int atomic_dec_32( volatile unsigned int * object)
 { return __sync_fetch_and_sub( object, 1); }
 
-#else
-
-#error No atomic operations available for this platform!
-
-#endif
+# else
+# error "No atomic operations available for this platform!"
+# endif
 } } }
 
 #endif // BOOST_TP_DETAIL_ATOMIC_H

Modified: sandbox/threadpool/boost/tp/detail/bind_processor.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/bind_processor.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/bind_processor.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -5,26 +5,24 @@
 #ifndef BOOST_THREAD_BIND_PROCESSOR_H
 #define BOOST_THREAD_BIND_PROCESSOR_H
 
-#include <boost/config.hpp>
+#include <boost/tp/detail/config.hpp>
 
 #define BOOST_THREAD_BIND_PROCESSOR_AFFINITY 1
 
-#if defined(linux) || defined(__linux) || defined(__linux__)
-#include <boost/tp/detail/bind_processor_linux.hpp>
-// #elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) || defined(__DragonFly__)
-// #include <boost/tp/detail/bind_processor_bsd.hpp>
-#elif defined(sun) || defined(__sun)
-#include <boost/tp/detail/bind_processor_solaris.hpp>
-#elif defined(__sgi)
-#include <boost/tp/detail/bind_processor_irix.hpp>
-#elif defined(__IBMCPP__) || defined(_AIX)
-#include <boost/tp/detail/bind_processor_aix.hpp>
-#elif defined(__hpux)
-#include <boost/tp/detail/bind_processor_hpux.hpp>
-#elif defined(_WIN32) || defined(__WIN32__) || defined(WIN32)
-#include <boost/tp/detail/bind_processor_windows.hpp>
-#else
-#undef BOOST_THREAD_BIND_PROCESSOR_AFFINITY
-#endif
+# if defined(BOOST_WINDOWS_OS)
+# include <boost/tp/detail/bind_processor_windows.hpp>
+# elif defined(BOOST_LINUX_OS)
+# include <boost/tp/detail/bind_processor_linux.hpp>
+//# elif defined(BOOST_xBSD_OS)
+//# include <boost/tp/detail/bind_processor_bsd.hpp>
+# elif defined(BOOST_AIX_OS)
+# include <boost/tp/detail/bind_processor_aix.hpp>
+# elif defined(BOOST_HPUX_OS)
+# include <boost/tp/detail/bind_processor_hpux.hpp>
+# elif defined(BOOST_SOLARIS_OS)
+# include <boost/tp/detail/bind_processor_solaris.hpp>
+# else
+# undef BOOST_THREAD_BIND_PROCESSOR_AFFINITY
+# endif
 
 #endif // BOOST_THREAD_BIND_PROCESSOR_H

Modified: sandbox/threadpool/boost/tp/detail/callable.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/callable.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/callable.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -5,11 +5,11 @@
 #ifndef BOOST_TP_DETAIL_CALLABLE_H
 #define BOOST_TP_DETAIL_CALLABLE_H
 
-#include <boost/future.hpp>
+#include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
+#include <boost/utility.hpp>
 
-#include <boost/thread/detail/move.hpp>
-#include <boost/tp/exceptions.hpp>
+#include <boost/tp/task.hpp>
 
 namespace boost { namespace tp {
 namespace detail
@@ -21,75 +21,50 @@
         {
                 virtual ~impl() {}
                 virtual void run() = 0;
+ virtual void set( shared_ptr< thread > &) = 0;
+ virtual void reset() = 0;
         };
 
- template<
- typename Act,
- typename T
- >
- struct impl_wrapper
- : public impl
+ template< typename R >
+ class impl_wrapper : public impl
         {
- Act act;
- promise< T > prom;
+ private:
+ task< R > tsk_;
 
- impl_wrapper(
- Act const& act_,
- promise< T > & prom_)
- : act( act_), prom( move( prom_) )
+ public:
+ impl_wrapper( task< R > const& tsk)
+ : tsk_( tsk)
                 {}
 
                 void run()
- {
- try
- { prom.set_value( act() ); }
- catch ( thread_interrupted const&)
- { prom.set_exception( copy_exception( task_interrupted() ) ); }
- catch(...)
- { prom.set_exception( current_exception() ); }
- }
+ { tsk_(); }
+
+ void set( shared_ptr< thread > & thrd)
+ { tsk_.impl_->intr.set( thrd); }
+
+ void reset()
+ { tsk_.impl_->intr.reset(); }
         };
 
- template< typename Act >
- struct impl_wrapper< Act, void >
- : public impl
+ shared_ptr< impl > impl_;
+
+public:
+ class scoped_lock : public noncopyable
         {
- Act act;
- promise< void > prom;
+ private:
+ callable & ca_;
 
- impl_wrapper(
- Act const& act_,
- promise< void > & prom_)
- : act( act_), prom( move( prom_) )
- {}
+ public:
+ scoped_lock( callable &, shared_ptr< thread > &);
 
- void run()
- {
- try
- {
- act();
- prom.set_value();
- }
- catch ( thread_interrupted const&)
- { prom.set_exception( copy_exception( task_interrupted() ) ); }
- catch(...)
- { prom.set_exception( current_exception() ); }
- }
+ ~scoped_lock();
         };
 
- boost::shared_ptr< impl > impl_;
-
-public:
         callable();
 
- template<
- typename Act,
- typename T
- >
- callable(
- Act const& act,
- promise< T > & prom)
- : impl_( new impl_wrapper< Act, T >( act, prom) )
+ template< typename R >
+ callable( task< R > const& tsk)
+ : impl_( new impl_wrapper< R >( tsk) )
         {}
 
         void operator()();

Added: sandbox/threadpool/boost/tp/detail/config.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/config.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,35 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_CONFIG_H
+#define BOOST_TP_DETAIL_CONFIG_H
+
+#include <boost/config.hpp>
+
+# if defined(BOOST_WINDOWS_API) && defined(BOOST_POSIX_API)
+# error "Both BOOST_WINDOWS_API and BOOST_POSIX_API are defined!"
+# elif ! defined(BOOST_WINDOWS_API ) && ! defined(BOOST_POSIX_API)
+# if defined(_WIN32) || defined(__WIN32__) || defined(WIN32) || defined(__CYGWIN__)
+# define BOOST_WINDOWS_API
+# else
+# define BOOST_POSIX_API
+# endif
+# endif
+
+# if defined(BOOST_WINDOWS_API)
+# define BOOST_WINDOWS_OS
+# elif defined(linux) || defined(__linux) || defined(__linux__)
+# define BOOST_LINUX_OS
+# elif defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) || defined(__DragonFly__)
+# define BOOST_xBSD_OS
+# elif defined(__IBMCPP__) || defined(_AIX)
+# define BOOST_AIX_OS
+# elif defined(__hpux)
+# define BOOST_HPUX_OS
+# elif defined(sun) || defined(__sun)
+# define BOOST_SOLARIS_OS
+# endif
+
+#endif // BOOST_TP_DETAIL_CONFIG_H
+

Modified: sandbox/threadpool/boost/tp/detail/interrupter.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/interrupter.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/interrupter.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -19,8 +19,7 @@
 class interrupter
 {
 private:
- class impl
- : private noncopyable
+ class impl : private noncopyable
         {
         private:
                 bool interruption_requested_;

Added: sandbox/threadpool/boost/tp/detail/semaphore.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/semaphore.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,18 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_SEMAPHORE_H
+#define BOOST_TP_DETAIL_SEMAPHORE_H
+
+#include <boost/tp/detail/config.hpp>
+
+# if defined(BOOST_WINDOWS_API)
+# include <boost/tp/detail/semaphore_windows.hpp>
+# elif defined(BOOST_POSIX_API)
+# include <boost/tp/detail/semaphore_posix.hpp>
+# else
+# error "No semaphore available for this platform!"
+# endif
+
+#endif // BOOST_TP_DETAIL_SEMAPHORE_H

Added: sandbox/threadpool/boost/tp/detail/semaphore_posix.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/semaphore_posix.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,37 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_SEMAPHORE_POSIX_H
+#define BOOST_TP_DETAIL_SEMAPHORE_POSIX_H
+
+extern "C"
+{
+#include <semaphore.h>
+}
+
+#include <boost/utility.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+class semaphore : private boost::noncopyable
+{
+private:
+ sem_t sem_;
+
+public:
+ semaphore( int);
+
+ ~semaphore();
+
+ void post();
+
+ bool try_wait();
+ void wait();
+
+ int value();
+};
+}}}
+
+#endif // BOOST_TP_DETAIL_SEMAPHORE_POSIX_H

Added: sandbox/threadpool/boost/tp/detail/semaphore_windows.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/detail/semaphore_windows.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,37 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_SEMAPHORE_WINDOWS_H
+#define BOOST_TP_DETAIL_SEMAPHORE_WINDOWS_H
+
+extern "C"
+{
+#include <Windows.h>
+}
+
+#include <boost/utility.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+class semaphore : private boost::noncopyable
+{
+private:
+ HANDLE handle_;
+
+public:
+ semaphore( int);
+
+ ~semaphore();
+
+ void post();
+
+ bool try_wait();
+ void wait();
+
+ int value();
+};
+}}}
+
+#endif // BOOST_TP_DETAIL_SEMAPHORE_WINDOWS_H

Modified: sandbox/threadpool/boost/tp/detail/worker.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/worker.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/worker.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -10,7 +10,6 @@
 
 #include <boost/assert.hpp>
 #include <boost/function.hpp>
-#include <boost/interprocess/sync/interprocess_semaphore.hpp>
 #include <boost/random.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
@@ -19,6 +18,7 @@
 #include <boost/tp/detail/callable.hpp>
 #include <boost/tp/detail/guard.hpp>
 #include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/semaphore.hpp>
 #include <boost/tp/detail/wsq.hpp>
 #include <boost/tp/poolsize.hpp>
 #include <boost/tp/scanns.hpp>
@@ -42,11 +42,11 @@
 
                 virtual void interrupt() const = 0;
 
- virtual void put( callable const&, interrupter const&) = 0;
+ virtual void put( callable const&) = 0;
 
- virtual bool try_take( callable &, interrupter &) = 0;
+ virtual bool try_take( callable &) = 0;
 
- virtual bool try_steal( callable &, interrupter &) = 0;
+ virtual bool try_steal( callable &) = 0;
 
                 virtual void signal_shutdown() = 0;
 
@@ -81,41 +81,34 @@
                         { return die_(); }
                 };
 
- typedef std::pair< callable, interrupter > item;
+ Pool & pool_;
+ shared_ptr< thread > thrd_;
+ wsq wsq_;
+ semaphore shtdwn_sem_;
+ semaphore shtdwn_now_sem_;
+ bool shtdwn_;
+ posix_time::time_duration asleep_;
+ scanns max_scns_;
+ std::size_t scns_;
+ random_idx rnd_idx_;
 
- Pool & pool_;
- shared_ptr< thread > thrd_;
- wsq< item > wsq_;
- interprocess::interprocess_semaphore shtdwn_sem_;
- interprocess::interprocess_semaphore shtdwn_now_sem_;
- bool shtdwn_;
- posix_time::time_duration asleep_;
- scanns max_scns_;
- std::size_t scns_;
- random_idx rnd_idx_;
-
- void execute_(
- callable & ca,
- interrupter & intr)
+ void execute_( callable & ca)
                 {
                         BOOST_ASSERT( ! ca.empty() );
                         guard grd( get_pool().active_worker_);
- shared_ptr< void > ig(
- static_cast< void * >( 0),
- boost::bind(
- & interrupter::reset,
- intr) );
- intr.set( thrd_);
- ca();
+ {
+ callable::scoped_lock lk( ca, thrd_);
+ ca();
+ }
                         ca.clear();
                         BOOST_ASSERT( ca.empty() );
                 }
         
- void next_callable_( callable & ca, interrupter & intr)
+ void next_callable_( callable & ca)
                 {
- if ( ! try_take( ca, intr) )
+ if ( ! try_take( ca) )
                         {
- if ( ! get_pool().channel_.try_take( ca, intr) )
+ if ( ! get_pool().channel_.try_take( ca) )
                                 {
                                         std::size_t idx( rnd_idx_() );
                                         for ( std::size_t j( 0); j < get_pool().wg_.size(); ++j)
@@ -123,7 +116,7 @@
                                                 worker other( get_pool().wg_[idx]);
                                                 if ( this_thread::get_id() == other.get_id() ) continue;
                                                 if ( ++idx >= get_pool().wg_.size() ) idx = 0;
- if ( other.try_steal( ca, intr) ) break;
+ if ( other.try_steal( ca) ) break;
                                         }
         
                                         if ( ca.empty() )
@@ -134,7 +127,7 @@
                                                 if ( scns_ >= max_scns_)
                                                 {
                                                         if ( get_pool().size_() == get_pool().idle_worker_)
- get_pool().channel_.take( ca, intr, asleep_);
+ get_pool().channel_.take( ca, asleep_);
                                                         else
                                                                 this_thread::sleep( asleep_);
                                                         scns_ = 0;
@@ -201,39 +194,27 @@
                 void signal_shutdown_now()
                 { shtdwn_now_sem_.post(); }
 
- void put(
- callable const& ca,
- interrupter const& intr)
+ void put( callable const& ca)
                 {
                         BOOST_ASSERT( ! ca.empty() );
- wsq_.put( std::make_pair( ca, intr) );
+ wsq_.put( ca);
                 }
 
- bool try_take(
- callable & ca,
- interrupter & intr)
+ bool try_take( callable & ca)
                 {
- item itm;
- bool result( wsq_.try_take( itm) );
+ callable tmp;
+ bool result( wsq_.try_take( tmp) );
                         if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
+ ca = tmp;
                         return result;
                 }
                 
- bool try_steal(
- callable & ca,
- interrupter & intr)
+ bool try_steal( callable & ca)
                 {
- item itm;
- bool result( wsq_.try_steal( itm) );
+ callable tmp;
+ bool result( wsq_.try_steal( tmp) );
                         if ( result)
- {
- ca = itm.first;
- intr = itm.second;
- }
+ ca = tmp;
                         return result;
                 }
 
@@ -247,18 +228,16 @@
                         schedule_until(
                                 bind( & impl_pool::shutdown_, this) );
                 }
-
+
                 void schedule_until( function< bool() > const& pred)
                 {
                         callable ca;
- interrupter intr;
-
                         while ( ! pred() )
                         {
- next_callable_( ca, intr);
+ next_callable_( ca);
                                 if( ! ca.empty() )
                                 {
- execute_( ca, intr);
+ execute_( ca);
                                         scns_ = 0;
                                 }
                         }
@@ -292,14 +271,14 @@
         void signal_shutdown();
         void signal_shutdown_now();
 
- void put( callable const&, interrupter const&);
- bool try_take( callable &, interrupter &);
- bool try_steal( callable &, interrupter &);
+ void put( callable const&);
+ bool try_take( callable &);
+ bool try_steal( callable &);
 
         void reschedule_until( function< bool() > const&);
 
         template< typename Pool >
- Pool & get_thread_pool() const
+ Pool & get_pool() const
         {
                 impl_pool< Pool > * p( dynamic_cast< impl_pool< Pool > * >( impl_.get() ) );
                 BOOST_ASSERT( p);

Modified: sandbox/threadpool/boost/tp/detail/wsq.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/wsq.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/wsq.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -5,164 +5,38 @@
 #ifndef BOOST_TP_DETAIL_WSQ_H
 #define BOOST_TP_DETAIL_WSQ_H
 
-#include <utility>
-
-#include <boost/assert.hpp>
-#include <boost/interprocess/detail/atomic.hpp>
-#include <boost/foreach.hpp>
-#include <boost/function.hpp>
 #include <boost/shared_array.hpp>
-#include <boost/shared_ptr.hpp>
-#include <boost/thread/locks.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/utility.hpp>
 
-#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/callable.hpp>
 
 namespace boost { namespace tp {
 namespace detail
 {
-template< typename T >
-class wsq
-: private noncopyable
+class wsq : private noncopyable
 {
 private:
- class impl
- {
- private:
- const int initial_size_;
- shared_array< T > array_;
- int capacity_;
- int mask_;
- volatile uint32_t head_idx_;
- volatile uint32_t tail_idx_;
- recursive_mutex mtx_;
-
- public:
- impl()
- :
- initial_size_( 32),
- array_( new T[ initial_size_]),
- capacity_( initial_size_),
- mask_( initial_size_ - 1),
- head_idx_( 0),
- tail_idx_( 0),
- mtx_()
- {}
-
- bool empty() const
- { return head_idx_ >= tail_idx_; }
-
- std::size_t size() const
- { return tail_idx_ - head_idx_; }
-
- void put( T const& t)
- {
- uint32_t tail( tail_idx_);
- if ( tail <= head_idx_ + mask_)
- {
- array_[tail & mask_] = t;
- tail_idx_ = tail + 1;
- }
- else
- {
- unique_lock< recursive_mutex > lk( mtx_);
- uint32_t head( head_idx_);
- int count( size() );
-
- if ( count >= mask_)
- {
- capacity_ <<= 1;
- shared_array< T > array( new T[capacity_]);
- for ( int i( 0); i != count; ++i)
- array[i] = array_[(i + head) & mask_];
- array_.swap( array);
- head_idx_ = 0;
- tail_idx_ = tail = count;
- mask_ = (mask_ << 1) | 1;
- }
- array_[tail & mask_] = t;
- tail_idx_ = tail + 1;
- }
- }
-
- bool try_take( T & t)
- {
- uint32_t tail( tail_idx_);
- if ( tail == 0)
- return false;
- tail -= 1;
- interprocess::detail::atomic_write32( & tail_idx_, tail);
- if ( head_idx_ <= tail)
- {
- t = array_[tail & mask_];
- return true;
- }
- else
- {
- unique_lock< recursive_mutex > lk( mtx_);
- if ( head_idx_ <= tail)
- {
- t = array_[tail & mask_];
- return true;
- }
- else
- {
- tail_idx_ = tail + 1;
- return false;
- }
- }
- }
-
- bool try_steal( T & t)
- {
- recursive_mutex::scoped_try_lock lk( mtx_);
- if ( lk.owns_lock() )
- {
- uint32_t head( head_idx_);
- interprocess::detail::atomic_write32( & head_idx_, head + 1);
- if ( head < tail_idx_)
- {
- t = array_[head & mask_];
- return true;
- }
- else
- {
- head_idx_ = head;
- return false;
- }
- }
- return false;
- }
- };
-
- shared_ptr< impl > impl_;
+ const int initial_size_;
+ shared_array< callable > array_;
+ int capacity_;
+ int mask_;
+ volatile uint32_t head_idx_;
+ volatile uint32_t tail_idx_;
+ recursive_mutex mtx_;
 
 public:
- wsq()
- : impl_( new impl() )
- {}
-
- bool empty() const
- { return impl_->empty(); }
-
- std::size_t size() const
- { return impl_->size(); }
-
- void put( T const& t)
- { impl_->put( t); }
-
- bool try_take( T & t)
- { return impl_->try_take( t); }
-
- bool try_steal( T & t)
- { return impl_->try_steal( t); }
-
- void swap( wsq & queue)
- {
- if ( this == & queue) return;
- impl_.swap( queue.impl_);
- }
+ wsq();
+
+ bool empty() const;
+
+ std::size_t size() const;
+
+ void put( callable const&);
+
+ bool try_take( callable &);
+
+ bool try_steal( callable &);
 };
 } } }
 

Modified: sandbox/threadpool/boost/tp/exceptions.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/exceptions.hpp (original)
+++ sandbox/threadpool/boost/tp/exceptions.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -10,35 +10,31 @@
 
 namespace boost { namespace tp
 {
-class invalid_poolsize
-: public std::invalid_argument
+class invalid_poolsize : public std::invalid_argument
 {
 public:
- invalid_poolsize( std::string const& msg)
- : std::invalid_argument( msg)
+ invalid_poolsize()
+ : std::invalid_argument("core poolsize must be greater than zero")
         {}
 };
 
-class invalid_scanns
-: public std::invalid_argument
+class invalid_scanns : public std::invalid_argument
 {
 public:
- invalid_scanns( std::string const& msg)
- : std::invalid_argument( msg)
+ invalid_scanns()
+ : std::invalid_argument("scanns must be greater than or equal to zero")
         {}
 };
 
-class invalid_timeduration
-: public std::invalid_argument
+class invalid_timeduration : public std::invalid_argument
 {
 public:
- invalid_timeduration( std::string const& msg)
- : std::invalid_argument( msg)
+ invalid_timeduration()
+ : std::invalid_argument("argument asleep is not valid")
         {}
 };
 
-class invalid_watermark
-: public std::invalid_argument
+class invalid_watermark : public std::invalid_argument
 {
 public:
     invalid_watermark( std::string const& msg)
@@ -46,11 +42,18 @@
         {}
 };
 
+class task_already_executed : public std::logic_error
+{
+public:
+ task_already_executed()
+ : std::logic_error("task already executed")
+ {}
+};
+
 struct task_interrupted
 {};
 
-class task_rejected
-: public std::runtime_error
+class task_rejected : public std::runtime_error
 {
 public:
     task_rejected( std::string const& msg)

Modified: sandbox/threadpool/boost/tp/fifo.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/fifo.hpp (original)
+++ sandbox/threadpool/boost/tp/fifo.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -8,12 +8,8 @@
 #include <cstddef>
 #include <list>
 
-#include <boost/assert.hpp>
-#include <boost/bind.hpp>
-#include <boost/function.hpp>
-
+#include <boost/tp/detail/callable.hpp>
 #include <boost/tp/detail/info.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
 {
@@ -21,35 +17,12 @@
 {
         typedef detail::has_no_priority priority_tag_type;
 
- template< typename Callable >
         class impl
         {
- private:
- typedef Callable callable;
-
         public:
- class item
- {
- private:
- callable ca_;
- detail::interrupter intr_;
-
- public:
- item(
- callable const& ca,
- detail::interrupter const& intr)
- : ca_( ca), intr_( intr)
- { BOOST_ASSERT( ! ca_.empty() ); }
-
- const callable ca() const
- { return ca_; }
-
- const detail::interrupter intr() const
- { return intr_; }
- };
-
- typedef typename std::list< item >::iterator iterator;
- typedef typename std::list< item >::const_iterator const_iterator;
+ typedef detail::callable item;
+ typedef std::list< item >::iterator iterator;
+ typedef std::list< item >::const_iterator const_iterator;
         
         private:
                 std::list< item > lst_;

Modified: sandbox/threadpool/boost/tp/info.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/info.hpp (original)
+++ sandbox/threadpool/boost/tp/info.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -13,14 +13,12 @@
 namespace boost { namespace tp
 {
 template< typename Pool >
-struct has_priority
-:
-public mpl::bool_<
- is_same<
- detail::has_priority,
- typename Pool::scheduler_type::priority_tag_type
- >::value
->
+struct has_priority : public mpl::bool_<
+ is_same<
+ detail::has_priority,
+ typename Pool::scheduler_type::priority_tag_type
+ >::value
+ >
 {};
 
 template< typename Pool >

Added: sandbox/threadpool/boost/tp/launch.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/launch.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,59 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_LAUNCH_H
+#define BOOST_TP_LAUNCH_H
+
+#include <boost/tp/default_pool.hpp>
+#include <boost/tp/pool.hpp>
+#include <boost/tp/task.hpp>
+
+namespace boost { namespace tp
+{
+template< typename R >
+void launch_in_pool( task< R > tsk)
+{ get_default_pool().submit( tsk); }
+
+template<
+ typename R,
+ typename Attr
+>
+void launch_in_pool(
+ task< R > tsk,
+ Attr const& attr)
+{ get_default_pool().submit( tsk, attr); }
+
+template<
+ typename Channel,
+ typename R
+>
+void launch_in_pool(
+ pool< Channel > & pool,
+ task< R > tsk)
+{ pool.submit( tsk); }
+
+template<
+ typename Channel,
+ typename R,
+ typename Attr
+>
+void launch_in_pool(
+ pool< Channel > & pool,
+ task< R > tsk,
+ Attr const& attr)
+{ pool.submit( tsk, attr); }
+
+template< typename R >
+void launch_in_thread( task< R > tsk)
+{
+ thread t( tsk);
+ t.join();
+}
+
+template< typename R >
+void launch_in_current( task< R > tsk)
+{ tsk(); }
+} }
+
+#endif // BOOST_TP_LAUNCH_H

Modified: sandbox/threadpool/boost/tp/lifo.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/lifo.hpp (original)
+++ sandbox/threadpool/boost/tp/lifo.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -8,12 +8,8 @@
 #include <cstddef>
 #include <list>
 
-#include <boost/assert.hpp>
-#include <boost/bind.hpp>
-#include <boost/function.hpp>
-
+#include <boost/tp/detail/callable.hpp>
 #include <boost/tp/detail/info.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
 {
@@ -21,35 +17,12 @@
 {
         typedef detail::has_no_priority priority_tag_type;
 
- template< typename Callable >
         class impl
         {
- private:
- typedef Callable callable;
-
         public:
- class item
- {
- private:
- callable ca_;
- detail::interrupter intr_;
-
- public:
- item(
- callable const& ca,
- detail::interrupter const& intr)
- : ca_( ca), intr_( intr)
- { BOOST_ASSERT( ! ca_.empty() ); }
-
- const callable ca() const
- { return ca_; }
-
- const detail::interrupter intr() const
- { return intr_; }
- };
-
- typedef typename std::list< item >::iterator iterator;
- typedef typename std::list< item >::const_iterator const_iterator;
+ typedef detail::callable item;
+ typedef std::list< item >::iterator iterator;
+ typedef std::list< item >::const_iterator const_iterator;
         
         private:
                 std::list< item > lst_;

Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -6,7 +6,6 @@
 #define BOOST_TP_POOL_H
 
 #include <cstddef>
-#include <list>
 #include <utility>
 #include <vector>
 
@@ -14,18 +13,13 @@
 
 #include <boost/assert.hpp>
 #include <boost/bind.hpp>
-#include <boost/foreach.hpp>
 #include <boost/function.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/ref.hpp>
-#include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
 #include <boost/utility.hpp>
-#include <boost/utility/result_of.hpp>
 
 #include <boost/tp/detail/atomic.hpp>
 #include <boost/tp/detail/callable.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 #ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
 #include <boost/tp/detail/bind_processor.hpp>
 #endif
@@ -43,24 +37,17 @@
 class pool : private noncopyable
 {
 private:
- template< typename Pred >
- friend void this_task::reschedule_until( Pred const&);
-
- template< typename Pool >
- friend Pool & this_task::get_thread_pool();
-
         friend class detail::worker;
 
- typedef Channel channel;
- typedef typename channel::item channel_item;
+ typedef Channel channel;
+ typedef typename channel::item channel_item;
 
-
- detail::worker_group wg_;
- shared_mutex mtx_wg_;
- unsigned int state_;
- channel channel_;
- unsigned int active_worker_;
- unsigned int idle_worker_;
+ detail::worker_group wg_;
+ shared_mutex mtx_wg_;
+ unsigned int state_;
+ channel channel_;
+ unsigned int active_worker_;
+ unsigned int idle_worker_;
 
         void worker_entry_()
         {
@@ -144,7 +131,7 @@
         idle_worker_( 0)
         {
                 if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration("argument asleep is not valid");
+ throw invalid_timeduration();
                 channel_.activate();
                 unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
@@ -169,7 +156,7 @@
         idle_worker_( 0)
         {
                 if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration("argument asleep is not valid");
+ throw invalid_timeduration();
                 channel_.activate();
                 unique_lock< shared_mutex > lk( mtx_wg_);
                 for ( std::size_t i( 0); i < psize; ++i)
@@ -190,7 +177,7 @@
         idle_worker_( 0)
         {
                 if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration("argument asleep is not valid");
+ throw invalid_timeduration();
                 poolsize psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
@@ -216,7 +203,7 @@
         idle_worker_( 0)
         {
                 if ( asleep.is_special() || asleep.is_negative() )
- throw invalid_timeduration("argument asleep is not valid");
+ throw invalid_timeduration();
                 poolsize psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
@@ -299,74 +286,58 @@
         void lower_bound( low_watermark const lwm)
         { return channel_.lower_bound( lwm); }
 
- template< typename Act >
- task< typename result_of< Act() >::type > submit( Act const& act)
+ template< typename R >
+ void submit( task< R > tsk)
         {
- typedef typename result_of< Act() >::type R;
- detail::interrupter intr;
- promise< R > prom;
- shared_future< R > f( prom.get_future() );
                 detail::worker * w( detail::worker::tss_get() );
                 if ( w)
                 {
                         function< bool() > wcb(
                                 bind(
                                         & shared_future< R >::is_ready,
- f) );
- prom.set_wait_callback(
+ tsk.impl_->fut) );
+ tsk.set_wait_callback(
                                 bind(
                                         ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
                                         w,
                                         wcb) );
- w->put( detail::callable( act, prom), intr);
- return task< R >( f, intr);
+ w->put( detail::callable( tsk) );
                 }
                 else
                 {
                         if ( closed_() )
                                 throw task_rejected("pool is closed");
 
- channel_item itm( detail::callable( act, prom), intr);
- channel_.put( itm);
- return task< R >( f, intr);
+ channel_.put( detail::callable( tsk) );
                 }
         }
 
         template<
- typename Act,
+ typename R,
                 typename Attr
>
- task< typename result_of< Act() >::type > submit(
- Act const& act,
- Attr const& attr)
- {
- typedef typename result_of< Act() >::type R;
- detail::interrupter intr;
- promise< R > prom;
- shared_future< R > f( prom.get_future() );
+ void submit( task< R > tsk, Attr const& attr)
+ {
                 detail::worker * w( detail::worker::tss_get() );
                 if ( w)
                 {
                         function< bool() > wcb(
                                 bind(
                                         & shared_future< R >::is_ready,
- f) );
- prom.set_wait_callback(
+ tsk.impl_->fut) );
+ tsk.set_wait_callback(
                                 bind(
                                         ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
                                         w,
                                         wcb) );
- w->put( detail::callable( act, prom), intr);
- return task< R >( f, intr);
+ w->put( detail::callable( tsk) );
                 }
                 else
                 {
                         if ( closed_() )
                                 throw task_rejected("pool is closed");
 
- channel_item itm( detail::callable( act, prom), attr, intr);
- channel_.put( itm);
- return task< R >( f, intr);
+ channel_.put( channel_item( detail::callable( tsk), attr) );
                 }
         }
 };

Modified: sandbox/threadpool/boost/tp/poolsize.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/poolsize.hpp (original)
+++ sandbox/threadpool/boost/tp/poolsize.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -7,8 +7,6 @@
 
 #include <cstddef>
 
-#include <boost/tp/exceptions.hpp>
-
 namespace boost { namespace tp
 {
 class poolsize

Modified: sandbox/threadpool/boost/tp/priority.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/priority.hpp (original)
+++ sandbox/threadpool/boost/tp/priority.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -9,13 +9,12 @@
 #include <utility>
 
 #include <boost/assert.hpp>
-#include <boost/function.hpp>
 #include <boost/multi_index_container.hpp>
 #include <boost/multi_index/mem_fun.hpp>
 #include <boost/multi_index/ordered_index.hpp>
 
+#include <boost/tp/detail/callable.hpp>
 #include <boost/tp/detail/info.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
 {
@@ -25,41 +24,34 @@
>
 struct priority
 {
- typedef Attr attribute_type;
         typedef detail::has_priority priority_tag_type;
+ typedef Attr attribute_type;
 
- template< typename Callable >
         class impl
         {
         private:
                 typedef Attr attribute;
- typedef Callable callable;
                 typedef Ord ordering;
 
         public:
                 class item
                 {
                 private:
- callable ca_;
+ detail::callable ca_;
                         attribute attr_;
- detail::interrupter intr_;
         
                 public:
                         item(
- callable const& ca,
- attribute const& attr,
- detail::interrupter const& intr)
- : ca_( ca), attr_( attr), intr_( intr)
+ detail::callable const& ca,
+ attribute const& attr)
+ : ca_( ca), attr_( attr)
                         { BOOST_ASSERT( ! ca_.empty() ); }
         
- const callable ca() const
+ const detail::callable ca() const
                         { return ca_; }
         
                         const attribute attr() const
                         { return attr_; }
-
- const detail::interrupter intr() const
- { return intr_; }
                 };
         
         private:
@@ -94,13 +86,13 @@
                 void push( item const& itm)
                 { idx_.insert( itm); }
         
- const item pop()
+ const detail::callable pop()
                 {
                         iterator i( lst_.begin() );
                         BOOST_ASSERT( i != lst_.end() );
                         item itm( * i);
                         lst_.erase( i);
- return itm;
+ return itm.ca();
                 }
         
                 std::size_t size() const

Modified: sandbox/threadpool/boost/tp/scanns.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/scanns.hpp (original)
+++ sandbox/threadpool/boost/tp/scanns.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -7,11 +7,8 @@
 
 #include <cstddef>
 
-#include <boost/tp/exceptions.hpp>
-
 namespace boost { namespace tp
 {
-
 class scanns
 {
 private:

Modified: sandbox/threadpool/boost/tp/smart.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/smart.hpp (original)
+++ sandbox/threadpool/boost/tp/smart.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -8,13 +8,12 @@
 #include <cstddef>
 
 #include <boost/assert.hpp>
-#include <boost/function.hpp>
 #include <boost/multi_index_container.hpp>
 #include <boost/multi_index/mem_fun.hpp>
 #include <boost/multi_index/ordered_index.hpp>
 
+#include <boost/tp/detail/callable.hpp>
 #include <boost/tp/detail/info.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
 {
@@ -26,15 +25,13 @@
>
 struct smart
 {
- typedef Attr attribute_type;
         typedef detail::has_priority priority_tag_type;
+ typedef Attr attribute_type;
 
- template< typename Callable >
         class impl
         {
         private:
                 typedef Attr attribute;
- typedef Callable callable;
                 typedef Deq dequeue_op;
                 typedef Enq enqueue_op;
                 typedef Ord ordering;
@@ -43,30 +40,25 @@
                 class item
                 {
                 private:
- callable ca_;
+ detail::callable ca_;
                         attribute attr_;
- detail::interrupter intr_;
         
                 public:
                         item()
- : ca_(), attr_(), intr_()
+ : ca_(), attr_()
                         {}
 
                         item(
- callable const& ca,
- attribute const& attr,
- detail::interrupter const& intr)
- : ca_( ca), attr_( attr), intr_( intr)
+ detail::callable const& ca,
+ attribute const& attr)
+ : ca_( ca), attr_( attr)
                         { BOOST_ASSERT( ! ca_.empty() ); }
         
- const callable ca() const
+ const detail::callable ca() const
                         { return ca_; }
         
                         const attribute attr() const
                         { return attr_; }
-
- const detail::interrupter intr() const
- { return intr_; }
                 };
         
         private:
@@ -107,11 +99,11 @@
                 void push( item const& itm)
                 { enq_op_( idx_, itm); }
         
- const item pop()
+ const detail::callable pop()
                 {
                         item itm;
                         deq_op_( idx_, itm);
- return itm;
+ return itm.ca();
                 }
         
                 std::size_t size() const

Modified: sandbox/threadpool/boost/tp/task.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/task.hpp (original)
+++ sandbox/threadpool/boost/tp/task.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -5,22 +5,39 @@
 #ifndef BOOST_TP_TASK_H
 #define BOOST_TP_TASK_H
 
-#include <boost/assert.hpp>
+#include <exception>
+#include <ios>
+#include <new>
+#include <stdexcept>
+#include <typeinfo>
+
+#include <boost/exception_ptr.hpp>
+#include <boost/exception/exception.hpp>
 #include <boost/future.hpp>
-#include <boost/next_prior.hpp>
 #include <boost/thread.hpp>
 #include <boost/thread/thread_time.hpp>
+#include <boost/utility/result_of.hpp>
 
 #include <boost/tp/detail/interrupter.hpp>
-#include <boost/tp/detail/worker.hpp>
+#include <boost/tp/exceptions.hpp>
 
 namespace boost {
 namespace tp
 {
+namespace detail
+{
+class callable;
+}
+template< typename Channel >
+class pool;
+
 template< typename R >
 class task
 {
 private:
+ template< typename Channel >
+ friend class pool;
+ friend class detail::callable;
         template< typename Iterator >
         friend void waitfor_all( Iterator begin, Iterator end);
         template< typename T1, typename T2 >
@@ -42,71 +59,146 @@
         template< typename T1, typename T2, typename T3, typename T4, typename T5 >
         friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5);
 
- shared_future< R > fut_;
- detail::interrupter intr_;
+ struct impl
+ {
+ promise< R > prom;
+ shared_future< R > fut;
+ detail::interrupter intr;
+
+ impl()
+ :
+ prom(),
+ fut( prom.get_future() ),
+ intr()
+ {}
+
+ virtual ~impl() {}
+
+ virtual void operator()() = 0;
+ };
+
+ template< typename Fn >
+ class impl_wrapper : public impl
+ {
+ private:
+ Fn fn_;
+
+ public:
+ impl_wrapper( Fn const& fn)
+ : fn_( fn)
+ {}
+
+ void operator()() // throw()
+ {
+ try
+ { impl::prom.set_value( fn_() ); }
+ catch ( promise_already_satisfied const&)
+ { throw task_already_executed(); }
+ catch ( thread_interrupted const&)
+ { impl::prom.set_exception( copy_exception( task_interrupted() ) ); }
+ catch ( boost::exception const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::ios_base::failure const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::domain_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::invalid_argument const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::length_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::out_of_range const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::logic_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::overflow_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::range_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::underflow_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::runtime_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_alloc const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_cast const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_typeid const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_exception const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch(...)
+ { impl::prom.set_exception( current_exception() ); }
+ }
+ };
+
+ shared_ptr< impl > impl_;
 
 public:
         task()
- : fut_(), intr_()
+ : impl_()
         {}
 
- task(
- shared_future< R > const& fut,
- detail::interrupter const& intr)
- :
- fut_( fut),
- intr_( intr)
+ template< typename Fn >
+ task( Fn const& fn)
+ : impl_( new impl_wrapper< Fn >( fn) )
         {}
 
         void interrupt()
- { intr_.interrupt(); }
+ { impl_->intr.interrupt(); }
 
         void interrupt_and_wait()
- { intr_.interrupt_and_wait(); }
+ { impl_->intr.interrupt_and_wait(); }
 
         void interrupt_and_wait( system_time const& abs_time)
- { intr_.interrupt_and_wait( abs_time); }
+ { impl_->intr.interrupt_and_wait( abs_time); }
 
         template< typename Duration >
         void interrupt_and_wait( Duration const& rel_time)
- { intr_.interrupt_and_wait( rel_time); }
+ { impl_->intr.interrupt_and_wait( rel_time); }
 
         bool interruption_requested()
- { return intr_.interruption_requested(); }
+ { return impl_->intr.interruption_requested(); }
 
         R get()
- { return fut_.get(); }
+ { return impl_->fut.get(); }
 
         bool is_ready() const
- { return fut_.is_ready(); }
+ { return impl_->fut.is_ready(); }
 
         bool has_value() const
- { return fut_.has_value(); }
+ { return impl_->fut.has_value(); }
 
         bool has_exception() const
- { return fut_.has_exception(); }
+ { return impl_->fut.has_exception(); }
 
         void wait() const
- { fut_.wait(); }
+ { impl_->fut.wait(); }
 
     template< typename Duration >
     bool timed_wait( Duration const& rel_time) const
- { return fut_.timed_wait( rel_time); }
+ { return impl_->fut.timed_wait( rel_time); }
 
     bool timed_wait_until( system_time const& abs_time) const
- { return fut_.timed_wait_until( abs_time); }
+ { return impl_->fut.timed_wait_until( abs_time); }
 
- void swap( task< R > & other)
- {
- fut_.swap( other.fut_);
- intr_.swap( other.intr_);
- }
+ void swap( task< R > & other) // throw()
+ { impl_.swap( other.impl_); }
+
+ void operator()() // throw()
+ { ( * impl_)(); }
+
+ template< typename F >
+ void set_wait_callback( F const& f)
+ { impl_->prom.set_wait_callback( f); }
 };
 
 template<>
 class task< void >
 {
 private:
+ template< typename Channel >
+ friend class pool;
+ friend class detail::callable;
         template< typename Iterator >
         friend void waitfor_all( Iterator begin, Iterator end);
         template< typename T1, typename T2 >
@@ -128,65 +220,140 @@
         template< typename T1, typename T2, typename T3, typename T4, typename T5 >
         friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5);
 
- shared_future< void > fut_;
- detail::interrupter intr_;
+ struct impl
+ {
+ promise< void > prom;
+ shared_future< void > fut;
+ detail::interrupter intr;
+
+ impl()
+ :
+ prom(),
+ fut( prom.get_future() ),
+ intr()
+ {}
+
+ virtual ~impl() {}
+
+ virtual void operator()() = 0;
+ };
+
+ template< typename Fn >
+ class impl_wrapper : public impl
+ {
+ private:
+ Fn fn_;
+
+ public:
+ impl_wrapper( Fn const& fn)
+ : fn_( fn)
+ {}
+
+ void operator()() // throw()
+ {
+ try
+ {
+ fn_();
+ impl::prom.set_value();
+ }
+ catch ( promise_already_satisfied const&)
+ { throw task_already_executed(); }
+ catch ( thread_interrupted const&)
+ { impl::prom.set_exception( copy_exception( task_interrupted() ) ); }
+ catch ( boost::exception const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::ios_base::failure const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::domain_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::invalid_argument const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::length_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::out_of_range const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::logic_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::overflow_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::range_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::underflow_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::runtime_error const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_alloc const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_cast const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_typeid const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch ( std::bad_exception const& e)
+ { impl::prom.set_exception( copy_exception( e) ); }
+ catch(...)
+ { impl::prom.set_exception( current_exception() ); }
+ }
+ };
+
+ shared_ptr< impl > impl_;
 
 public:
         task()
- : fut_(), intr_()
+ : impl_()
         {}
 
- task(
- shared_future< void > const& fut,
- detail::interrupter const& intr)
- :
- fut_( fut),
- intr_( intr)
+ template< typename Fn >
+ task( Fn const& fn)
+ : impl_( new impl_wrapper< Fn >( fn) )
         {}
 
         void interrupt()
- { intr_.interrupt(); }
+ { impl_->intr.interrupt(); }
 
         void interrupt_and_wait()
- { intr_.interrupt_and_wait(); }
+ { impl_->intr.interrupt_and_wait(); }
 
         void interrupt_and_wait( system_time const& abs_time)
- { intr_.interrupt_and_wait( abs_time); }
+ { impl_->intr.interrupt_and_wait( abs_time); }
 
         template< typename Duration >
         void interrupt_and_wait( Duration const& rel_time)
- { intr_.interrupt_and_wait( rel_time); }
+ { impl_->intr.interrupt_and_wait( rel_time); }
 
         bool interruption_requested()
- { return intr_.interruption_requested(); }
+ { return impl_->intr.interruption_requested(); }
 
         void get()
- { fut_.get(); }
+ { impl_->fut.get(); }
 
         bool is_ready() const
- { return fut_.is_ready(); }
+ { return impl_->fut.is_ready(); }
 
         bool has_value() const
- { return fut_.has_value(); }
+ { return impl_->fut.has_value(); }
 
         bool has_exception() const
- { return fut_.has_exception(); }
+ { return impl_->fut.has_exception(); }
 
         void wait() const
- { fut_.wait(); }
+ { impl_->fut.wait(); }
 
     template< typename Duration >
     bool timed_wait( Duration const& rel_time) const
- { return fut_.timed_wait( rel_time); }
+ { return impl_->fut.timed_wait( rel_time); }
 
     bool timed_wait_until( system_time const& abs_time) const
- { return fut_.timed_wait_until( abs_time); }
+ { return impl_->fut.timed_wait_until( abs_time); }
 
- void swap( task< void > & other)
- {
- fut_.swap( other.fut_);
- intr_.swap( other.intr_);
- }
+ void swap( task< void > & other) // throw()
+ { impl_.swap( other.impl_); }
+
+ void operator()() // throw()
+ { ( * impl_)(); }
+
+ template< typename F >
+ void set_wait_callback( F const& f)
+ { impl_->prom.set_wait_callback( f); }
 };
 
 template< typename Iterator >
@@ -236,111 +403,6 @@
 template< typename T1, typename T2, typename T3, typename T4, typename T5 >
 unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5)
 { return wait_for_any( t1.fut_, t2.fut_, t3.fut_, t4.fut_, t5.fut_); }
-}
-
-namespace this_task
-{
-namespace detail
-{
-struct time_reached
-{
- system_time abs_time;
-
- time_reached( system_time & abs_time_)
- : abs_time( abs_time_)
- {}
-
- bool operator()()
- { return get_system_time() >= abs_time; }
-};
-
-class always_true
-{
-private:
- bool result_;
-
-public:
- always_true()
- : result_( false)
- {}
-
- bool operator()()
- {
- if ( ! result_)
- {
- result_ = true;
- return false;
- }
- else
- return true;
- }
-};
-}
-
-template< typename Pred >
-void reschedule_until( Pred const& pred)
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- w->reschedule_until( pred);
-}
-
-template< typename Pool >
-Pool & get_thread_pool()
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- return w->get_thread_pool< Pool >();
-}
-
-inline
-bool is_worker()
-{ return tp::detail::worker::tss_get() != 0; }
-
-inline
-thread::id worker_id()
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- return w->get_id();
-}
-
-inline
-void sleep( system_time abs_time)
-{
- if ( is_worker() )
- {
- detail::time_reached t( abs_time);
- reschedule_until( t);
- }
- else
- this_thread::sleep( abs_time);
-}
-
-template< typename Duration >
-void sleep( Duration const& rel_time)
-{ sleep( get_system_time() + rel_time); }
-
-inline
-void yield()
-{
- if ( is_worker() )
- {
- detail::always_true t;
- reschedule_until( t);
- }
- else
- this_thread::yield();
-}
-
-inline
-void interrupt()
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- w->interrupt();
- this_thread::interruption_point();
-}
-} }
+}}
 
 #endif // BOOST_TP_TASK_H

Modified: sandbox/threadpool/boost/tp/unbounded_channel.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/unbounded_channel.hpp (original)
+++ sandbox/threadpool/boost/tp/unbounded_channel.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -12,13 +12,11 @@
 #include <boost/bind.hpp>
 #include <boost/foreach.hpp>
 #include <boost/function.hpp>
-#include <boost/ref.hpp>
 #include <boost/thread/condition.hpp>
 #include <boost/thread/locks.hpp>
 #include <boost/thread/shared_mutex.hpp>
 
 #include <boost/tp/detail/callable.hpp>
-#include <boost/tp/detail/interrupter.hpp>
 #include <boost/tp/exceptions.hpp>
 
 namespace boost { namespace tp
@@ -26,17 +24,13 @@
 template< typename SchedulingPolicy >
 class unbounded_channel
 {
-private:
- typedef SchedulingPolicy scheduling_policy;
- typedef typename scheduling_policy::template impl<
- detail::callable
- > queue;
-
 public:
- typedef typename queue::item item;
- typedef scheduling_policy scheduler_type;
+ typedef SchedulingPolicy scheduler_type;
+ typedef typename scheduler_type::impl::item item;
 
 private:
+ typedef typename scheduler_type::impl queue;
+
         enum channel_state
         {
                 channel_active,
@@ -95,8 +89,8 @@
                 BOOST_ASSERT( deactive_now_() );
                 std::vector< detail::callable > unprocessed;
                 unprocessed.reserve( queue_.size() );
- BOOST_FOREACH( item itm, queue_)
- { unprocessed.push_back( itm.ca() ); }
+ BOOST_FOREACH( detail::callable ca, queue_)
+ { unprocessed.push_back( ca); }
                 clear_();
                 BOOST_ASSERT( empty_() );
                 return unprocessed;
@@ -120,7 +114,6 @@
 
         bool take_(
                 detail::callable & ca,
- detail::interrupter & intr,
                 unique_lock< shared_mutex > & lk)
         {
                 if ( deactive_now_() || ( deactive_() && empty_() ) )
@@ -137,16 +130,13 @@
                 { return false; }
                 if ( deactive_now_() || ( deactive_() && empty_() ) )
                         return false;
- item itm( queue_.pop() );
- ca = itm.ca();
- intr = itm.intr();
+ ca = queue_.pop();
                 return ! ca.empty();
         }
 
         template< typename Duration >
         bool take_(
                 detail::callable & ca,
- detail::interrupter & intr,
                 Duration const& rel_time,
                 unique_lock< shared_mutex > & lk)
         {
@@ -166,21 +156,15 @@
                 { return false; }
                 if ( deactive_now_() || ( deactive_() && empty_() ) )
                         return false;
- item itm( queue_.pop() );
- ca = itm.ca();
- intr = itm.intr();
+ ca = queue_.pop();
                 return ! ca.empty();
         }
 
- bool try_take_(
- detail::callable & ca,
- detail::interrupter & intr)
+ bool try_take_( detail::callable & ca)
         {
                 if ( deactive_now_() || empty_() )
                         return false;
- item itm( queue_.pop() );
- ca = itm.ca();
- intr = itm.intr();
+ ca = queue_.pop();
                 return ! ca.empty();
         }
 
@@ -258,37 +242,32 @@
 
         template< typename Duration >
         void put(
- item const& itm,
+ item & itm,
                 Duration const&)
         {
                 unique_lock< shared_mutex > lk( mtx_);
                 put_( itm, lk);
         }
 
- bool take(
- detail::callable & ca,
- detail::interrupter & intr)
+ bool take( detail::callable & ca)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, intr, lk);
+ return take_( ca, lk);
         }
 
         template< typename Duration >
         bool take(
                 detail::callable & ca,
- detail::interrupter & intr,
                 Duration const& rel_time)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- return take_( ca, intr, rel_time, lk);
+ return take_( ca, rel_time, lk);
         }
 
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
+ bool try_take( detail::callable & ca)
         {
                 unique_lock< shared_mutex > lk( mtx_);
- return try_take_( ca, intr);
+ return try_take_( ca);
         }
 };
 } }

Added: sandbox/threadpool/boost/tp/utility.hpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/boost/tp/utility.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,120 @@
+// Copright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_UTILITY_H
+#define BOOST_TP_UTILITY_H
+
+#include <boost/assert.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/thread_time.hpp>
+
+#include <boost/tp/detail/worker.hpp>
+
+namespace boost {
+namespace this_task
+{
+namespace detail
+{
+struct time_reached
+{
+ system_time abs_time;
+
+ time_reached( system_time & abs_time_)
+ : abs_time( abs_time_)
+ {}
+
+ bool operator()()
+ { return get_system_time() >= abs_time; }
+};
+
+class once_false
+{
+private:
+ bool result_;
+
+public:
+ once_false()
+ : result_( false)
+ {}
+
+ bool operator()()
+ {
+ if ( ! result_)
+ {
+ result_ = true;
+ return false;
+ }
+ else
+ return true;
+ }
+};
+}
+
+template< typename Pred >
+void reschedule_until( Pred const& pred)
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ w->reschedule_until( pred);
+}
+
+template< typename Pool >
+Pool & get_pool()
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ return w->get_pool< Pool >();
+}
+
+inline
+bool runs_in_pool()
+{ return tp::detail::worker::tss_get() != 0; }
+
+inline
+thread::id worker_id()
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ return w->get_id();
+}
+
+inline
+void delay( system_time abs_time)
+{
+ if ( runs_in_pool() )
+ {
+ detail::time_reached t( abs_time);
+ reschedule_until( t);
+ }
+ else
+ this_thread::sleep( abs_time);
+}
+
+template< typename Duration >
+void delay( Duration const& rel_time)
+{ delay( get_system_time() + rel_time); }
+
+inline
+void yield()
+{
+ if ( runs_in_pool() )
+ {
+ detail::once_false t;
+ reschedule_until( t);
+ }
+ else
+ this_thread::yield();
+}
+
+inline
+void interrupt()
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ w->interrupt();
+ this_thread::interruption_point();
+}
+}}
+
+#endif // BOOST_TP_UTILITY_H

Modified: sandbox/threadpool/boost/tp/watermark.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/watermark.hpp (original)
+++ sandbox/threadpool/boost/tp/watermark.hpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -7,8 +7,6 @@
 
 #include <cstddef>
 
-#include <boost/tp/exceptions.hpp>
-
 namespace boost { namespace tp
 {
 class high_watermark

Modified: sandbox/threadpool/libs/tp/build/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/build/Jamfile.v2 (original)
+++ sandbox/threadpool/libs/tp/build/Jamfile.v2 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -14,7 +14,20 @@
       <link>static:<define>BOOST_THREADPOOL_STATIC_LINK=1
     ;
 
-SOURCES = callable default_pool guard interrupter poolsize scanns watermark worker worker_group ;
+SOURCES =
+ callable.cpp
+ default_pool.cpp
+ guard.cpp
+ interrupter.cpp
+ poolsize.cpp
+ scanns.cpp
+ semaphore_posix.cpp
+// semaphore_windows.cpp
+ watermark.cpp
+ worker.cpp
+ worker_group.cpp
+ wsq.cpp
+ ;
 
 lib boost_threadpool
    : $(SOURCES).cpp

Modified: sandbox/threadpool/libs/tp/examples/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/examples/Jamfile.v2 (original)
+++ sandbox/threadpool/libs/tp/examples/Jamfile.v2 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -16,15 +16,13 @@
     ;
 
 exe bind_to_processor : bind_to_processor.cpp ;
+exe delay : delay.cpp ;
 exe fork_join : for_join.cpp ;
 exe interrupt : interrupt.cpp ;
-exe launch : launch.cpp ;
-exe parrallel_sort : parralele_sort.cpp ;
 exe pending : pending.cpp ;
 exe priority : priority.cpp ;
 exe reschedule_until : reschedule_until.cpp ;
 exe shutdonw_now : shutdonw_now.cpp ;
-exe sleep : sleep.cpp ;
 exe smart : smart.cpp ;
 exe submit : submit.cpp ;
 exe yield : yield.cpp ;

Modified: sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -37,17 +37,21 @@
                 else
                 {
                         tp::task< long > t1(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fib_task::execute,
- boost::ref( * this),
- n - 1) ) );
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 1) ) ;
                         tp::task< long > t2(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fib_task::execute,
- boost::ref( * this),
- n - 2) ) );
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 2) );
+ tp::launch_in_pool(
+ boost::this_task::get_pool< pool_type >(),
+ t1);
+ tp::launch_in_pool(
+ boost::this_task::get_pool< pool_type >(),
+ t2);
                         return t1.get() + t2.get();
                 }
         }
@@ -72,11 +76,14 @@
                 pt::ptime start( pt::microsec_clock::universal_time() );
 
                 for ( int i = 0; i < 26; ++i)
- results.push_back(
- tp::get_default_pool().submit(
- boost::bind(
- & parallel_fib,
- i) ) );
+ {
+ tp::task< long > t(
+ boost::bind(
+ & parallel_fib,
+ i) );
+ results.push_back( t);
+ tp::launch_in_pool( pool, t);
+ }
 
                 tp::waitfor_all( results.begin(), results.end() );
 

Added: sandbox/threadpool/libs/tp/examples/delay.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/examples/delay.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,88 @@
+#include <iostream>
+#include <cstdlib>
+#include <stdexcept>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "boost/tp.hpp"
+
+namespace pt = boost::posix_time;
+namespace tp = boost::tp;
+
+typedef tp::default_pool pool_type;
+
+long serial_fib( long n)
+{
+ if( n < 2)
+ return n;
+ else
+ return serial_fib( n - 1) + serial_fib( n - 2);
+}
+
+class fib_task
+{
+private:
+ long cutof_;
+
+public:
+ fib_task( long cutof)
+ : cutof_( cutof)
+ {}
+
+ long execute( long n)
+ {
+ if ( n < cutof_)
+ {
+ if ( n == 0)
+ boost::this_task::delay( pt::seconds( 2) );
+ return serial_fib( n);
+ }
+ else
+ {
+ tp::task< long > t1(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 1) );
+ tp::task< long > t2(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 2) );
+ tp::launch_in_pool( t1);
+ tp::launch_in_pool( t2);
+ return t1.get() + t2.get();
+ }
+ }
+};
+
+
+void parallel_fib( long n)
+{
+ fib_task a( 5);
+ long result = a.execute( n);
+ printf("n == %d, fibonnaci == %d\n", n, result);
+}
+
+int main( int argc, char *argv[])
+{
+ try
+ {
+ for ( int i = 0; i < 10; ++i)
+ tp::launch_in_pool(
+ tp::task< void >(
+ boost::bind(
+ & parallel_fib,
+ i) ) );
+
+ return EXIT_SUCCESS;
+ }
+ catch ( std::exception const& e)
+ { std::cerr << "exception: " << e.what() << std::endl; }
+ catch ( ... )
+ { std::cerr << "unhandled" << std::endl; }
+
+ return EXIT_FAILURE;
+}

Modified: sandbox/threadpool/libs/tp/examples/fork_join.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/fork_join.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/fork_join.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -11,7 +11,7 @@
 namespace pt = boost::posix_time;
 namespace tp = boost::tp;
 
-typedef tp::default_pool pool_type;
+typedef tp::pool< tp::unbounded_channel< tp::fifo > > pool_type;
 
 long serial_fib( long n)
 {
@@ -37,23 +37,26 @@
                 else
                 {
                         tp::task< long > t1(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fib_task::execute,
- boost::ref( * this),
- n - 1) ) );
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 1) );
                         tp::task< long > t2(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fib_task::execute,
- boost::ref( * this),
- n - 2) ) );
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 2) );
+ tp::launch_in_pool(
+ boost::this_task::get_pool< pool_type >(),
+ t1);
+ tp::launch_in_pool(
+ boost::this_task::get_pool< pool_type >(),
+ t2);
                         return t1.get() + t2.get();
                 }
         }
 };
 
-
 long parallel_fib( long n)
 {
         fib_task a( 5);
@@ -64,17 +67,22 @@
 {
         try
         {
+ pool_type pool( tp::poolsize( 5) );
+
                 std::vector< tp::task< long > > results;
                 results.reserve( 20);
 
                 pt::ptime start( pt::microsec_clock::universal_time() );
 
- for ( int i = 0; i < 15; ++i)
- results.push_back(
- tp::get_default_pool().submit(
- boost::bind(
- & parallel_fib,
- i) ) );
+ for ( int i = 0; i < 26; ++i)
+ {
+ tp::task< long > t(
+ boost::bind(
+ & parallel_fib,
+ i) );
+ results.push_back( t);
+ tp::launch_in_pool( pool, t);
+ }
 
                 tp::waitfor_all( results.begin(), results.end() );
 

Modified: sandbox/threadpool/libs/tp/examples/interrupt.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/interrupt.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/interrupt.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -36,19 +36,20 @@
 {
         try
         {
- tp::get_default_pool().submit(
+ tp::task< void > t1(
                         boost::bind(
                                 long_running_fn) );
+ tp::get_default_pool().submit( t1);
                 std::cout << "poolsize == " << tp::get_default_pool().size() << std::endl;
                 std::cout << "idle threads == " << tp::get_default_pool().idle() << std::endl;
                 std::cout << "active threads == " << tp::get_default_pool().active() << std::endl;
- tp::task< int > t(
- tp::get_default_pool().submit(
+ tp::task< int > t2(
                                 boost::bind(
                                         fibonacci_fn,
- 10) ) );
- t.interrupt();
- std::cout << t.get() << std::endl;
+ 10) );
+ tp::get_default_pool().submit( t2);
+ t2.interrupt();
+ std::cout << t2.get() << std::endl;
 
                 return EXIT_SUCCESS;
         }

Deleted: sandbox/threadpool/libs/tp/examples/parallel_sort.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/parallel_sort.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
+++ (empty file)
@@ -1,271 +0,0 @@
-//////////////////////////////////////////////////////////////////////////////
-//
-// (C) Copyright Vicente J. Botet Escriba 2008-2009.
-// Distributed under the Boost Software License, Version 1.0.
-// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-//
-// See http://www.boost.org/libs/interthreads for documentation.
-//
-// Based on the shared.cpp example from the threadalert library of Roland Schwarz
-//////////////////////////////////////////////////////////////////////////////
-
-// requires interthread and range available at boost-sandbox and boost-vault
-
-#include <iostream>
-#include <algorithm>
-
-#include <boost/progress.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/bind.hpp>
-#include "boost/tp.hpp"
-#include <boost/range/algorithm/sort.hpp>
-#include <boost/range/algorithm/inplace_merge.hpp>
-#include <boost/range/sub_range.hpp>
-#include <boost/range/begin.hpp>
-#include <boost/range/end.hpp>
-#include <boost/range/algorithm/equal.hpp>
-#include <boost/range/algorithm/for_each.hpp>
-#include <boost/range/algorithm/transform.hpp>
-#include <boost/range/adaptor/sliced.hpp>
-#include <boost/interthreads/fork.hpp>
-#include <boost/interthreads/algorithm/wait.hpp>
-#include <boost/interthreads/scheduler.hpp>
-#include <boost/array.hpp>
-
-#include <assert.h>
-
-#define BOOST_PARTS 2
-#define NN 10000000
-
-class scoped_timer {
- boost::posix_time::ptime start_;
-public:
- scoped_timer()
- : start_(boost::posix_time::microsec_clock::universal_time())
- {}
- ~scoped_timer() {
- boost::posix_time::ptime stop( boost::posix_time::microsec_clock::universal_time() );
- std::cout << " " << ( stop - start_).total_milliseconds() << " milli seconds" << std::endl;
- }
-};
-
-template <typename Range>
-class partition
-{
-public:
- boost::iterator_range<typename boost::range_iterator<Range>::type> range_;
- std::size_t parts_;
- partition(boost::iterator_range<typename boost::range_iterator<Range>::type>& range, std::size_t parts):
- range_(range),
- parts_(parts)
- {}
- boost::iterator_range<typename boost::range_iterator<Range>::type> operator[](unsigned i) {
- unsigned size = boost::size(range_);
- if (i<(parts_-1))
- return boost::make_sliced_range(range_, i*(size/parts_), ((i+1)*(size/parts_)));
- else
- return boost::make_sliced_range(range_, (parts_-1)*(size/parts_), size);
- }
-};
-
-typedef boost::tp::pool<
- boost::tp::unbounded_channel< boost::tp::fifo >
-> pool_type;
-
-#ifdef TASK_POOL
- typedef boost::tp::task< pool_type, void > task_type;
-#else
- typedef boost::tp::task< void > task_type;
-#endif
-
-
-template <
- typename DirectSolver,
- typename Composer,
- typename AE,
- typename Range
->
- void inplace_solve( AE & ae,
- boost::iterator_range<typename boost::range_iterator<Range>::type> range,
- unsigned cutoff );
-
-template <
- typename DirectSolver,
- typename Composer,
- typename AE,
- typename Range
->
- void inplace_solve( AE & ae,
- boost::iterator_range<typename boost::range_iterator<Range>::type> range,
- unsigned cutoff )
- {
- unsigned size = boost::size(range);
- //std::cout << "<<par_ " << size;
- if ( size <= cutoff) DirectSolver()(range);
- else {
- partition<Range> parts(range, BOOST_PARTS);
- std::list<task_type> tasks;
- #if 0 // this code do not compiles with gcc 3.4.4 cygwin
- boost::transform(parts, boost::begin(tasks),
- boost::bind(&AE::submit, boost::ref(ae),
- //boost::bind(&boost::interthreads::fork<AE>, boost::ref(ae),
- boost::bind(&inplace_solve<DirectSolver,Composer,AE,Range>, boost::ref(ae),_1,cutoff)));
- #else
- for (unsigned i=0;i < BOOST_PARTS-1; ++i) {
- task_type tmp(ae.submit(
- boost::bind(
- &inplace_solve<DirectSolver,Composer,AE,Range>,
- boost::ref(ae),
- parts[i],
- cutoff
- )));
- tasks.push_back(tmp);
- }
- #endif
- inplace_solve<DirectSolver,Composer,AE,Range>(ae, parts[BOOST_PARTS-1], cutoff);
- boost::for_each(tasks, &boost::interthreads::wait_act<task_type>);
-
- //std::cout << "par_inplace_merge_fct " << size << ">>"<< std::endl;
- Composer()(range);
- //std::cout << "par_ " << size << ">>"<< std::endl;
-
- }
- }
-
-struct sort_fct {
- template<class RandomAccessRange>
- RandomAccessRange& operator()(RandomAccessRange rng) {
- return boost::sort(rng);
- }
-};
-
-struct inplace_merge_fct {
- template<class BidirectionalRange>
- BidirectionalRange&
- operator()( BidirectionalRange rng) {
- return boost::inplace_merge(rng, boost::begin(rng)+(boost::size(rng)/2));
- }
-};
-template <typename AE, typename Range>
-void parallel_sort(AE& ae, Range& range, unsigned cutoff=10000) {
- boost::iterator_range<typename boost::range_iterator<Range>::type> rng(range);
- inplace_solve<sort_fct,inplace_merge_fct,pool_type,Range>( ae, rng, cutoff);
-}
-
-int sorted[NN];
-int values1[NN];
-int values2[NN];
-int values3[NN];
-int values4[NN];
-int values5[NN];
-int values6[NN];
-
-int main() {
- for (unsigned i=0; i<NN; ++i) sorted[i]=i;
-
- for (unsigned i=0; i<NN; ++i) values1[i]=NN-i-1;
- {
- std::cout << "std::sort: reverse 0.." << NN;
- scoped_timer t; // start timing
- std::sort(boost::begin(values1), boost::end(values1));
- }
- assert(boost::equal(values1, sorted));
- {
- std::cout << "std::sort: 0.." << NN;
- scoped_timer t; // start timing
- std::sort(boost::begin(values1), boost::end(values1));
- }
-
- for (unsigned i=0; i<NN; ++i) values2[i]=NN-i-1;
- {
- std::cout << "boost::sort: reverse 0.."<<NN;
- scoped_timer t; // start timing
- boost::sort(values2);
- }
- assert(boost::equal(values2, sorted));
- {
- std::cout << "boost::sort: 0.."<<NN;
- scoped_timer t; // start timing
- boost::sort(values2);
- }
-
- // creates a threadpool with two worker-threads
- pool_type pool( boost::tp::poolsize( 2) );
-
-// // creates a threadpool with as many worker-threads
-// // as CPUs/Cores are online and bind each worker-thread
-// // to a specific CPU/Core
-// // requires BOOST_BIND_WORKER_TO_PROCESSORS to be specified
-// pool_type pool;
-
- for (unsigned i=0; i<NN; ++i) values5[i]=NN-i-1;
- {
- std::cout << "parallel_sort "<<NN/2<<": reverse 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/2);
- }
- assert(boost::equal(values5, sorted));
- {
- std::cout << "parallel_sort "<<NN/2<<": 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/2);
- }
-
- for (unsigned i=0; i<NN; ++i) values5[i]=NN-i-1;
- {
- std::cout << "parallel_sort "<<NN/4<<": reverse 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/4);
- }
- assert(boost::equal(values5, sorted));
- {
- std::cout << "parallel_sort "<<NN/4<<": 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/4);
- }
-
- for (unsigned i=0; i<NN; ++i) values5[i]=NN-i-1;
- {
- std::cout << "parallel_sort "<<NN/8<<": reverse 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/8);
- }
- assert(boost::equal(values5, sorted));
- {
- std::cout << "parallel_sort "<<NN/8<<": 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/8);
- }
-
- for (unsigned i=0; i<NN; ++i) values5[i]=NN-i-1;
- {
- std::cout << "parallel_sort "<<NN/16<<": reverse 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/16);
- }
- assert(boost::equal(values5, sorted));
- {
- std::cout << "parallel_sort "<<NN/16<<": 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/16);
- }
-
- for (unsigned i=0; i<NN; ++i) values5[i]=NN-i-1;
- {
- std::cout << "parallel_sort "<<NN/32<<": reverse 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/32);
- }
- assert(boost::equal(values5, sorted));
- {
- std::cout << "parallel_sort "<<NN/32<<": 0.."<<NN;
- scoped_timer tmr; // start timing
- parallel_sort(pool, values5, NN/32);
- }
-
-
- std::cout << "shutdown"<< std::endl;
- pool.shutdown();
- std::cout << "end"<< std::endl;
- return 0;
-}

Modified: sandbox/threadpool/libs/tp/examples/pending.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/pending.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/pending.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -37,16 +37,17 @@
 {
         try
         {
- tp::get_default_pool().submit(
+ tp::task< void > t1(
                         boost::bind(
                                 long_running_fn) );
- tp::task< int > t(
- tp::get_default_pool().submit(
+ tp::task< int > t2(
                                 boost::bind(
                                         fibonacci_fn,
- 10) ) );
+ 10) );
+ tp::launch_in_pool( t1);
+ tp::launch_in_pool( t2);
                 std::cout << "pending tasks == " << tp::get_default_pool().pending() << std::endl;
- std::cout << t.get() << std::endl;
+ std::cout << t2.get() << std::endl;
 
                 return EXIT_SUCCESS;
         }

Modified: sandbox/threadpool/libs/tp/examples/priority.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/priority.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/priority.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -26,25 +26,27 @@
                 tp::pool<
                         tp::unbounded_channel< tp::priority< int > >
> pool( tp::poolsize( 1) );
- pool.submit(
+
+ tp::task< void > t1(
                         boost::bind(
- long_running_fn),
- 0);
- pool.submit(
+ long_running_fn) );
+ tp::task< void > t2(
                         boost::bind(
                                 print_fn,
- "This"),
- 0);
- pool.submit(
+ "This") );
+ tp::task< void > t3(
                         boost::bind(
                                 print_fn,
- "a text.\n"),
- 2);
- pool.submit(
+ "a text.\n") );
+ tp::task< void > t4(
                         boost::bind(
                                 print_fn,
- " is "),
- 1);
+ " is ") );
+
+ tp::launch_in_pool( pool, t1, 0);
+ tp::launch_in_pool( pool, t2, 0);
+ tp::launch_in_pool( pool, t3, 2);
+ tp::launch_in_pool( pool, t4, 1);
 
                 return EXIT_SUCCESS;
         }

Modified: sandbox/threadpool/libs/tp/examples/reschedule_until.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/reschedule_until.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/reschedule_until.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -22,8 +22,6 @@
 namespace pt = boost::posix_time;
 namespace tp = boost::tp;
 
-typedef tp::default_pool pool_type;
-
 long serial_fib( long n)
 {
         if( n < 2)
@@ -48,17 +46,17 @@
                 else
                 {
                         tp::task< long > t1(
- boost::this_task::get_thread_pool< pool_type >().submit(
                                         boost::bind(
                                                 & fib_task::execute,
                                                 boost::ref( * this),
- n - 1) ) );
+ n - 1) );
                         tp::task< long > t2(
- boost::this_task::get_thread_pool< pool_type >().submit(
                                         boost::bind(
                                                 & fib_task::execute,
                                                 boost::ref( * this),
- n - 2) ) );
+ n - 2) );
+ tp::launch_in_pool( t1);
+ tp::launch_in_pool( t2);
                         return t1.get() + t2.get();
                 }
         }
@@ -135,20 +133,22 @@
         {
                 int fd[2];
                 create_sockets( fd);
-
- tp::get_default_pool().submit(
+
+ tp::launch_in_pool(
+ tp::task< void >(
                                 boost::bind(
                                         & read,
- fd[0]) );
+ fd[0]) ) );
 
                 write( fd[1], "Hello ");
                 boost::this_thread::sleep( pt::seconds( 1) );
 
                 for ( int i = 0; i < 15; ++i)
- tp::get_default_pool().submit(
- boost::bind(
- & parallel_fib,
- i) );
+ tp::launch_in_pool(
+ tp::task< void >(
+ boost::bind(
+ & parallel_fib,
+ i) ) );
 
                 write( fd[1], "World!");
 

Modified: sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -37,13 +37,17 @@
                 tp::pool<
                         tp::unbounded_channel< tp::fifo >
> pool( tp::poolsize( 1) );
+
                 tp::task< int > t(
- pool.submit(
- boost::bind(
- fibonacci_fn,
- 10) ) );
+ boost::bind(
+ fibonacci_fn,
+ 10) );
+ tp::launch_in_pool( pool, t);
+
                 boost::this_thread::sleep( pt::milliseconds( 250) );
+
                 pool.shutdown_now();
+
                 std::cout << t.get() << std::endl;
 
                 return EXIT_SUCCESS;

Deleted: sandbox/threadpool/libs/tp/examples/sleep.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/sleep.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
+++ (empty file)
@@ -1,87 +0,0 @@
-#include <iostream>
-#include <cstdlib>
-#include <stdexcept>
-#include <vector>
-
-#include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-#include "boost/tp.hpp"
-
-namespace pt = boost::posix_time;
-namespace tp = boost::tp;
-
-typedef tp::default_pool pool_type;
-
-long serial_fib( long n)
-{
- if( n < 2)
- return n;
- else
- return serial_fib( n - 1) + serial_fib( n - 2);
-}
-
-class fib_task
-{
-private:
- long cutof_;
-
-public:
- fib_task( long cutof)
- : cutof_( cutof)
- {}
-
- long execute( long n)
- {
- if ( n < cutof_)
- {
- if ( n == 0)
- boost::this_task::sleep( pt::seconds( 2) );
- return serial_fib( n);
- }
- else
- {
- tp::task< long > t1(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fib_task::execute,
- boost::ref( * this),
- n - 1) ) );
- tp::task< long > t2(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fib_task::execute,
- boost::ref( * this),
- n - 2) ) );
- return t1.get() + t2.get();
- }
- }
-};
-
-
-void parallel_fib( long n)
-{
- fib_task a( 5);
- long result = a.execute( n);
- printf("n == %d, fibonnaci == %d\n", n, result);
-}
-
-int main( int argc, char *argv[])
-{
- try
- {
- for ( int i = 0; i < 10; ++i)
- tp::get_default_pool().submit(
- boost::bind(
- & parallel_fib,
- i) );
-
- return EXIT_SUCCESS;
- }
- catch ( std::exception const& e)
- { std::cerr << "exception: " << e.what() << std::endl; }
- catch ( ... )
- { std::cerr << "unhandled" << std::endl; }
-
- return EXIT_FAILURE;
-}

Modified: sandbox/threadpool/libs/tp/examples/smart.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/smart.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/smart.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -52,24 +52,33 @@
>
>
> pool( tp::poolsize( 1) );
- pool.submit(
- boost::bind(
- long_running_fn),
+
+ tp::launch_in_pool(
+ pool,
+ tp::task< void >(
+ boost::bind(
+ long_running_fn) ),
                         0);
- pool.submit(
- boost::bind(
- fibonacci_fn,
- 0),
+ tp::launch_in_pool(
+ pool,
+ tp::task< void >(
+ boost::bind(
+ fibonacci_fn,
+ 0) ),
                         1);
- pool.submit(
- boost::bind(
- fibonacci_fn,
- 1),
+ tp::launch_in_pool(
+ pool,
+ tp::task< void >(
+ boost::bind(
+ fibonacci_fn,
+ 1) ),
                         2);
- pool.submit(
- boost::bind(
- fibonacci_fn,
- 10),
+ tp::launch_in_pool(
+ pool,
+ tp::task< void >(
+ boost::bind(
+ fibonacci_fn,
+ 10) ),
                         2);
 
                 return EXIT_SUCCESS;

Modified: sandbox/threadpool/libs/tp/examples/submit.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/submit.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/submit.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -30,10 +30,12 @@
         try
         {
                 tp::task< int > t(
- tp::get_default_pool().submit(
                                 boost::bind(
                                         fibonacci_fn,
- 10) ) );
+ 10) );
+// tp::launch_in_pool( t);
+// tp::launch_in_thread( t);
+ tp::launch_local( t);
                 std::cout << t.get() << std::endl;
 
                 return EXIT_SUCCESS;

Modified: sandbox/threadpool/libs/tp/examples/yield.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/yield.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/yield.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -41,17 +41,25 @@
                 else
                 {
                         tp::task< long > t1(
- boost::this_task::get_thread_pool< pool_type >().submit(
                                         boost::bind(
                                                 & fib_task::execute,
                                                 boost::ref( * this),
- n - 1) ) );
+ n - 1) );
                         tp::task< long > t2(
- boost::this_task::get_thread_pool< pool_type >().submit(
                                         boost::bind(
                                                 & fib_task::execute,
                                                 boost::ref( * this),
- n - 2) ) );
+ n - 2) );
+ if ( boost::this_task::runs_in_pool() )
+ {
+ tp::launch_in_pool( t1);
+ tp::launch_in_pool( t2);
+ }
+ else
+ {
+ tp::launch_in_thread( t1);
+ tp::launch_in_thread( t2);
+ }
                         return t1.get() + t2.get();
                 }
         }
@@ -70,10 +78,12 @@
         try
         {
                 for ( int i = 0; i < 10; ++i)
- tp::get_default_pool().submit(
- boost::bind(
- & parallel_fib,
- i) );
+ tp::launch_in_thread(
+// tp::launch_in_pool(
+ tp::task< void >(
+ boost::bind(
+ & parallel_fib,
+ i) ) );
 
                 return EXIT_SUCCESS;
         }

Modified: sandbox/threadpool/libs/tp/src/callable.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/callable.cpp (original)
+++ sandbox/threadpool/libs/tp/src/callable.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -18,5 +18,12 @@
 void
 callable::clear()
 { impl_.reset(); }
+
+callable::scoped_lock::scoped_lock( callable & ca, shared_ptr< thread > & thrd)
+: ca_( ca)
+{ ca_.impl_->set( thrd); }
+
+callable::scoped_lock::~scoped_lock()
+{ ca_.impl_->reset(); }
 } } }
 

Modified: sandbox/threadpool/libs/tp/src/default_pool.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/default_pool.cpp (original)
+++ sandbox/threadpool/libs/tp/src/default_pool.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -1,6 +1,9 @@
 #include "boost/tp/default_pool.hpp"
+
 #include <boost/thread.hpp>
 
+#include "boost/tp/poolsize.hpp"
+
 namespace boost { namespace tp {
 namespace detail
 {

Modified: sandbox/threadpool/libs/tp/src/interrupter.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/interrupter.cpp (original)
+++ sandbox/threadpool/libs/tp/src/interrupter.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -1,5 +1,7 @@
 #include "boost/tp/detail/interrupter.hpp"
 
+#include <boost/assert.hpp>
+
 namespace boost { namespace tp {
 namespace detail
 {

Modified: sandbox/threadpool/libs/tp/src/poolsize.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/poolsize.cpp (original)
+++ sandbox/threadpool/libs/tp/src/poolsize.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -1,10 +1,12 @@
 #include "boost/tp/poolsize.hpp"
 
+#include <boost/tp/exceptions.hpp>
+
 namespace boost { namespace tp
 {
 poolsize::poolsize( std::size_t value)
 : value_( value)
-{ if ( value <= 0) throw invalid_poolsize("core poolsize must be greater than zero"); }
+{ if ( value <= 0) throw invalid_poolsize(); }
 
 poolsize::operator std::size_t () const
 { return value_; }

Modified: sandbox/threadpool/libs/tp/src/scanns.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/scanns.cpp (original)
+++ sandbox/threadpool/libs/tp/src/scanns.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -1,10 +1,12 @@
 #include "boost/tp/scanns.hpp"
 
+#include <boost/tp/exceptions.hpp>
+
 namespace boost { namespace tp
 {
 scanns::scanns( std::size_t value)
 : value_( value)
-{ if ( value < 0) throw invalid_scanns("scanns must be greater than or equal to zero"); }
+{ if ( value < 0) throw invalid_scanns(); }
 
 scanns::operator std::size_t () const
 { return value_; }

Added: sandbox/threadpool/libs/tp/src/semaphore_posix.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/semaphore_posix.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,56 @@
+#include "boost/tp/detail/semaphore_posix.hpp"
+
+#include <cerrno>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+semaphore::semaphore( int value)
+: sem_()
+{
+ if ( ::sem_init( & sem_, 0, value) == -1)
+ throw system::system_error( errno, system::system_category);
+}
+
+semaphore::~semaphore()
+{ ::sem_destroy( & sem_); }
+
+void
+semaphore::post()
+{
+ if ( ::sem_post( & sem_) == -1)
+ throw system::system_error( errno, system::system_category);
+}
+
+void
+semaphore::wait()
+{
+ if ( ::sem_wait( & sem_) == -1)
+ throw system::system_error( errno, system::system_category);
+}
+
+bool
+semaphore::try_wait()
+{
+ if ( ::sem_trywait( & sem_) == -1)
+ {
+ if ( errno == EAGAIN)
+ return false;
+ else
+ throw system::system_error( errno, system::system_category);
+ }
+ return true;
+}
+
+int
+semaphore::value()
+{
+ int value( 0);
+ if ( ::sem_getvalue( & sem_, & value) == -1)
+ throw system::system_error( errno, system::system_category);
+ return value;
+}
+}}}

Added: sandbox/threadpool/libs/tp/src/semaphore_windows.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/semaphore_windows.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,63 @@
+#include "boost/tp/detail/semaphore_windows.hpp"
+
+#include <cerrno>
+#include <limits>
+
+#include <boost/system/error_code.hpp>
+#include <boost/system/system_error.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+semaphore::semaphore( int value)
+: handle_()
+{
+ if ( ( handle_ = ::CreateSemaphore( 0, value, std::numeric_limits< int >::max(), 0) ) == 0)
+ throw system::system_error( ::GetLastError(), system::system_category);
+}
+
+semaphore::~semaphore()
+{ ::CloseHandle( handle_); }
+
+void
+semaphore::post()
+{
+ if ( ! ::ReleaseSemaphore( handle_, 1, 0) )
+ throw system::system_error( ::GetLastError(), system::system_category);
+}
+
+void
+semaphore::wait()
+{
+ if ( ::WaitForSingleObject( handle_, INFINITE) != WAIT_OBJECT_0)
+ throw system::system_error( ::GetLastError(), system::system_category);
+}
+
+bool
+semaphore::try_wait()
+{
+ switch ( ::WaitForSingleObject( handle_, 0) )
+ {
+ case WAIT_OBJECT_0:
+ return true;
+ case WAIT_TIMEOUT:
+ return false;
+ default:
+ throw system::system_error( ::GetLastError(), system::system_category);
+ }
+ return true;
+}
+
+int
+semaphore::value()
+{
+ int value( 0);
+ if ( ::WaitForSingleObject( handle_, 0) != WAIT_TIMEOUT)
+ {
+ if ( ! ::ReleaseSemaphore( handle_, 1, & value) )
+ throw system::system_error( ::GetLastError(), system::system_category);
+ ++ value;
+ }
+ return value;
+}
+}}}

Modified: sandbox/threadpool/libs/tp/src/watermark.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/watermark.cpp (original)
+++ sandbox/threadpool/libs/tp/src/watermark.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -1,5 +1,7 @@
 #include "boost/tp/watermark.hpp"
 
+#include <boost/tp/exceptions.hpp>
+
 namespace boost { namespace tp
 {
 high_watermark::high_watermark( std::size_t value)

Modified: sandbox/threadpool/libs/tp/src/worker.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/worker.cpp (original)
+++ sandbox/threadpool/libs/tp/src/worker.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -27,22 +27,16 @@
 { impl_->signal_shutdown_now(); }
 
 void
-worker::put(
- callable const& ca,
- interrupter const& intr)
-{ impl_->put( ca, intr); }
+worker::put( callable const& ca)
+{ impl_->put( ca); }
 
 bool
-worker::try_take(
- callable & ca,
- interrupter & intr)
-{ return impl_->try_take( ca, intr); }
+worker::try_take( callable & ca)
+{ return impl_->try_take( ca); }
 
 bool
-worker::try_steal(
- callable & ca,
- interrupter & intr)
-{ return impl_->try_steal( ca, intr); }
+worker::try_steal( callable & ca)
+{ return impl_->try_steal( ca); }
 
 void
 worker::reschedule_until( function< bool() > const& pred)

Modified: sandbox/threadpool/libs/tp/src/worker_group.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/src/worker_group.cpp (original)
+++ sandbox/threadpool/libs/tp/src/worker_group.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -4,6 +4,7 @@
 
 #include "boost/tp/detail/worker_group.hpp"
 
+#include <boost/foreach.hpp>
 #include <boost/utility.hpp>
 
 namespace boost { namespace tp {

Added: sandbox/threadpool/libs/tp/src/wsq.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/src/wsq.cpp 2009-04-16 11:57:04 EDT (Thu, 16 Apr 2009)
@@ -0,0 +1,112 @@
+#include "boost/tp/detail/wsq.hpp"
+
+#include <boost/thread/locks.hpp>
+
+#include <boost/tp/detail/atomic.hpp>
+
+namespace boost { namespace tp {
+namespace detail
+{
+wsq::wsq()
+:
+initial_size_( 32),
+array_( new callable[ initial_size_]),
+capacity_( initial_size_),
+mask_( initial_size_ - 1),
+head_idx_( 0),
+tail_idx_( 0),
+mtx_()
+{}
+
+bool
+wsq::empty() const
+{ return head_idx_ >= tail_idx_; }
+
+std::size_t
+wsq::size() const
+{ return tail_idx_ - head_idx_; }
+
+void
+wsq::put( callable const& ca)
+{
+ uint32_t tail( tail_idx_);
+ if ( tail <= head_idx_ + mask_)
+ {
+ array_[tail & mask_] = ca;
+ tail_idx_ = tail + 1;
+ }
+ else
+ {
+ unique_lock< recursive_mutex > lk( mtx_);
+ uint32_t head( head_idx_);
+ int count( size() );
+
+ if ( count >= mask_)
+ {
+ capacity_ <<= 1;
+ shared_array< callable > array( new callable[capacity_]);
+ for ( int i( 0); i != count; ++i)
+ array[i] = array_[(i + head) & mask_];
+ array_.swap( array);
+ head_idx_ = 0;
+ tail_idx_ = tail = count;
+ mask_ = (mask_ << 1) | 1;
+ }
+ array_[tail & mask_] = ca;
+ tail_idx_ = tail + 1;
+ }
+}
+
+bool
+wsq::try_take( callable & ca)
+{
+ uint32_t tail( tail_idx_);
+ if ( tail == 0)
+ return false;
+ tail -= 1;
+ atomic_write_32( & tail_idx_, tail); // Interlocked.Exchange( & tail_idx_, tail); -> XCHG
+ if ( head_idx_ <= tail)
+ {
+ ca = array_[tail & mask_];
+ return true;
+ }
+ else
+ {
+ unique_lock< recursive_mutex > lk( mtx_);
+ if ( head_idx_ <= tail)
+ {
+ ca = array_[tail & mask_];
+ return true;
+ }
+ else
+ {
+ tail_idx_ = tail + 1;
+ return false;
+ }
+ }
+}
+
+bool
+wsq::try_steal( callable & ca)
+{
+ recursive_mutex::scoped_try_lock lk( mtx_);
+ if ( lk.owns_lock() )
+ {
+ uint32_t head( head_idx_);
+ atomic_write_32( & head_idx_, head + 1); // Interlocked.Exchange( & head_idx_, head + 1);
+ if ( head < tail_idx_)
+ {
+ ca = array_[head & mask_];
+ return true;
+ }
+ else
+ {
+ head_idx_ = head;
+ return false;
+ }
+ }
+ return false;
+}
+} } }
+
+


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk