|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r52194 - in sandbox/threadpool: boost boost/tp boost/tp/detail libs/tp/doc libs/tp/doc/html
From: oliver.kowalke_at_[hidden]
Date: 2009-04-05 15:35:34
Author: olli
Date: 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
New Revision: 52194
URL: http://svn.boost.org/trac/boost/changeset/52194
Log:
* waitfor_all() / waitfor_any() added
Removed:
sandbox/threadpool/boost/tp/launch.hpp
sandbox/threadpool/boost/tp/lockfree_channel.hpp
sandbox/threadpool/libs/tp/doc/html/
Text files modified:
sandbox/threadpool/boost/tp.hpp | 2
sandbox/threadpool/boost/tp/default_pool.hpp | 5 +
sandbox/threadpool/boost/tp/detail/interrupter.hpp | 6
sandbox/threadpool/boost/tp/pool.hpp | 40 -------
sandbox/threadpool/boost/tp/task.hpp | 184 +++++++++++++++++++++++++++++++++++++++
sandbox/threadpool/libs/tp/doc/task.qbk | 4
6 files changed, 196 insertions(+), 45 deletions(-)
Modified: sandbox/threadpool/boost/tp.hpp
==============================================================================
--- sandbox/threadpool/boost/tp.hpp (original)
+++ sandbox/threadpool/boost/tp.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
@@ -11,8 +11,6 @@
#include <boost/tp/fifo.hpp>
#include <boost/tp/info.hpp>
#include <boost/tp/lifo.hpp>
-#include <boost/tp/launch.hpp>
-//#include <boost/tp/lockfree_channel.hpp>
#include <boost/tp/pool.hpp>
#include <boost/tp/poolsize.hpp>
#include <boost/tp/priority.hpp>
Modified: sandbox/threadpool/boost/tp/default_pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/default_pool.hpp (original)
+++ sandbox/threadpool/boost/tp/default_pool.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
@@ -7,6 +7,7 @@
#include <boost/tp/fifo.hpp>
#include <boost/tp/pool.hpp>
+#include <boost/tp/task.hpp>
#include <boost/tp/unbounded_channel.hpp>
namespace boost { namespace tp
@@ -22,6 +23,10 @@
inline
default_pool & get_default_pool()
{ return detail::static_pool::instance; }
+
+template< typename Act >
+task< typename result_of< Act() >::type > lauch_in_pool( Act const& act)
+{ return get_default_pool().submit( act); }
} }
#endif // BOOST_TP_DEFAULT_POOL_H
Modified: sandbox/threadpool/boost/tp/detail/interrupter.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/detail/interrupter.hpp (original)
+++ sandbox/threadpool/boost/tp/detail/interrupter.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
@@ -74,8 +74,10 @@
{ impl_->interrupt_and_wait( rel_time); }
bool interruption_requested();
+
+ void swap( interrupter & other)
+ { impl_.swap( other.impl_); }
};
-}
-} }
+}}}
#endif // BOOST_TP_DETAIL_INTERRUPTER_H
Deleted: sandbox/threadpool/boost/tp/launch.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/launch.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
+++ (empty file)
@@ -1,20 +0,0 @@
-// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
-// Software License, Version 1.0. (See accompanying file
-// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef BOOST_TP_LAUNCH_H
-#define BOOST_TP_LAUNCH_H
-
-#include <boost/utility/result_of.hpp>
-
-#include <boost/tp/default_pool.hpp>
-#include <boost/tp/task.hpp>
-
-namespace boost { namespace tp
-{
-template< typename Act >
-task< typename result_of< Act() >::type > lauch_in_pool( Act const& act)
-{ return get_default_pool().submit( act); }
-} }
-
-#endif // BOOST_TP_LAUNCH_H
Deleted: sandbox/threadpool/boost/tp/lockfree_channel.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/lockfree_channel.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
+++ (empty file)
@@ -1,315 +0,0 @@
-#ifndef BOOST_TP_LOCKFREE_CHANNEL_H
-#define BOOST_TP_LOCKFREE_CHANNEL_H
-
-#include <vector>
-
-#include <boost/assert.hpp>
-#include <boost/optional.hpp>
-
-#include <boost/tp/detail/atomic.hpp>
-#include <boost/tp/detail/callable.hpp>
-#include <boost/tp/detail/interrupter.hpp>
-#include <boost/tp/exceptions.hpp>
-
-namespace boost {
-namespace tp
-{
-
-class lockfree_channel
-{
-public:
- class item
- {
- private:
- detail::callable ca_;
- detail::interrupter intr_;
-
- public:
- item(
- detail::callable const& ca,
- detail::interrupter const& intr)
- : ca_( ca), intr_( intr)
- { BOOST_ASSERT( ! ca_.empty() ); }
-
- const detail::callable ca() const
- { return ca_; }
-
- const detail::interrupter intr() const
- { return intr_; }
- };
-
-private:
- enum channel_state
- {
- channel_active = 0,
- channel_deactive = 1,
- channel_deactive_now = 2
- };
-
- struct node_t;
-
- struct pointer_t
- {
- node_t * ptr;
- unsigned int tag;
-
- pointer_t()
- : ptr( 0), tag( 0)
- {}
-
- pointer_t( node_t * ptr_)
- : ptr( ptr_), tag( 0)
- {}
-
- pointer_t( node_t * ptr_, unsigned int tag_)
- : ptr( ptr_), tag( tag_)
- {}
-
- pointer_t( pointer_t const& rhs)
- : ptr( 0), tag( 0)
- {
- detail::atomic_write_ptr( & ptr, rhs.ptr);
- detail::atomic_write_32( & tag, rhs.tag);
- }
-
- bool operator==( pointer_t const& rhs)
- { return ptr == rhs.ptr && tag == rhs.tag; }
-
- bool operator!=( pointer_t const& rhs)
- { return ptr != rhs.ptr || tag != rhs.tag; }
- };
-
- struct node_t
- {
- boost::optional< item > itm;
- pointer_t next;
- pointer_t prev;
-
- node_t()
- : itm( boost::none), next(), prev()
- {}
-
- node_t( item const& itm_)
- : itm( itm_), next(), prev()
- {}
- };
-
- channel_state state_;
- unsigned int size_;
- pointer_t tail_;
- pointer_t head_;
-
- bool compare_exchange_( pointer_t & dest, pointer_t & cmp, pointer_t & value)
- {
- if ( detail::atomic_compare_exchange_ptr( & dest.ptr, cmp.ptr, value.ptr) )
- {
- detail::atomic_write_32( & dest.tag, value.tag);
- return true;
- }
-
- return false;
- }
-
- void fix_list_( pointer_t const& tail, pointer_t const& head)
- {
- pointer_t cur( tail), next, prev;
- while ( head_ == head && cur != head)
- {
- next = cur.ptr->next;
- if ( next.tag != cur.tag) return;
- prev = next.ptr->prev;
- pointer_t ncur( cur.ptr, cur.tag - 1);
- if ( prev != ncur)
- next.ptr->prev = ncur;
- cur = pointer_t( next.ptr, cur.tag - 1);
- }
- }
-
- void increment_size_()
- { detail::atomic_inc_32( & size_); }
-
- void decrement_size_()
- { detail::atomic_dec_32( & size_); }
-
-public:
- lockfree_channel()
- : state_( channel_active), size_( 0), tail_(), head_()
- { tail_.ptr = head_.ptr = new node_t; }
-
- ~lockfree_channel()
- { clear(); }
-
- void put( item const& itm)
- {
- pointer_t tail;
- node_t * node( new node_t( itm) );
- while ( active() )
- for(;;)
- {
- if ( ! active() )
- throw task_rejected("channel is not active");
-
- tail = pointer_t( tail_);
- node->next = pointer_t( tail.ptr, tail.tag + 1);
- pointer_t ntail( node, tail.tag + 1);
- if ( compare_exchange_( tail_, tail, ntail) )
- {
- tail.ptr->prev = pointer_t( node, tail.tag);
- increment_size_();
- break;
- }
- }
- }
-
- bool take(
- detail::callable & ca,
- detail::interrupter & intr)
- {
- pointer_t head, tail, first;
- node_t * dummy( 0);
- while ( ! deactive_now() && ! ( deactive() && empty() ) )
- {
- head = head_;
- tail = tail_;
- first = head.ptr->prev;
- boost::optional< item > val( head.ptr->itm);
- if ( head_ == head)
- {
- if ( val)
- {
- if ( tail != head)
- {
- if ( first.tag != head.tag)
- {
- fix_list_( tail, head);
- continue;
- }
- }
- else
- {
- dummy = new node_t;
- dummy->next = pointer_t( tail.ptr, tail.tag + 1);
- pointer_t ntail( dummy, tail.tag + 1);
- if ( compare_exchange_( tail_, tail, ntail) )
- head.ptr->prev = pointer_t( dummy, tail.tag);
- else
- {
- delete dummy;
- dummy = 0;
- }
- continue;
- }
- pointer_t nhead( first.ptr, head.tag + 1);
- if ( compare_exchange_( head_, head, nhead) )
- {
- ca = val->ca();
- intr = val->intr();
- delete head.ptr;
- head.ptr = 0;
- decrement_size_();
- return true;
- }
- }
- else
- {
- if ( tail.ptr == head.ptr)
- return false;
- else
- {
- if ( first.tag != head.tag)
- {
- fix_list_( tail, head);
- continue;
- }
- pointer_t nhead( first.ptr, head.tag + 1);
- compare_exchange_( head_, head, nhead);
- }
- }
- }
- }
- return false;
- }
-
- template< typename Duration >
- bool take(
- detail::callable & ca,
- detail::interrupter & intr,
- Duration const& rel_time)
- { return take( ca, intr); }
-
- bool try_take(
- detail::callable & ca,
- detail::interrupter & intr)
- { return take( ca, intr); }
-
- bool active() const
- { return state_ == channel_active; }
-
- bool deactive() const
- { return state_ == channel_deactive; }
-
- bool deactive_now() const
- { return state_ == channel_deactive_now; }
-
- void activate()
- { detail::atomic_write_32( ( unsigned int *) & state_, channel_active); }
-
- void deactivate()
- {
- if ( active() )
- detail::atomic_write_32( ( unsigned int *) & state_, channel_deactive);
-
- BOOST_ASSERT( deactive() );
- }
-
- void deactivate_now()
- {
- if ( active() )
- detail::atomic_write_32( ( unsigned int *) & state_, channel_deactive_now);
-
- BOOST_ASSERT( deactive_now() );
- }
-
- const std::vector< detail::callable > drain()
- {
- BOOST_ASSERT( deactive_now() );
- std::vector< detail::callable > unprocessed;
- unprocessed.reserve( size() );
- pointer_t head( head_);
- while ( head.ptr)
- {
- if ( head.ptr->itm)
- unprocessed.push_back( head.ptr->itm->ca() );
- head = head.ptr->prev;
- }
- clear();
- BOOST_ASSERT( empty() );
- return unprocessed;
- }
-
- void clear()
- {
- while ( head_.ptr)
- {
- pointer_t tmp( head_);
- head_ = tmp.ptr->prev;
- if ( tmp.ptr->itm)
- decrement_size_();
- delete tmp.ptr;
- tmp.ptr = 0;
- }
- }
-
- bool empty()
- { return size_ <= 0; }
-
- bool full()
- { return false; }
-
- std::size_t size()
- { return size_; }
-};
-
-}}
-
-#endif // BOOST_TP_LOCKFREE_CHANNEL_H
Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
@@ -32,7 +32,7 @@
#include <boost/tp/detail/callable.hpp>
#include <boost/tp/detail/guard.hpp>
#include <boost/tp/detail/interrupter.hpp>
-#ifdef BOOST_BIND_WORKER_TO_PROCESSORS
+#ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
#include <boost/tp/detail/bind_processor.hpp>
#endif
#include <boost/tp/detail/worker.hpp>
@@ -42,39 +42,7 @@
#include <boost/tp/task.hpp>
#include <boost/tp/watermark.hpp>
-namespace boost {
-namespace this_task
-{
-template< typename Pred >
-void reschedule_until( Pred const& pred)
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- w->reschedule_until( pred);
-}
-
-template< typename Pool >
-Pool & get_thread_pool()
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- return w->get_thread_pool< Pool >();
-}
-
-inline
-bool is_worker()
-{ return tp::detail::worker::tss_get() != 0; }
-
-inline
-thread::id worker_id()
-{
- tp::detail::worker * w( tp::detail::worker::tss_get() );
- BOOST_ASSERT( w);
- return w->get_id();
-}
-}
-
-namespace tp
+namespace boost { namespace tp
{
template< typename Channel >
class pool : private noncopyable
@@ -269,7 +237,7 @@
this) ) );
}
-#ifdef BOOST_BIND_WORKER_TO_PROCESSORS
+#ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
void entry_( std::size_t n)
{
this_thread::bind_to_processor( n);
@@ -372,7 +340,7 @@
lk.unlock();
}
-#ifdef BOOST_BIND_WORKER_TO_PROCESSORS
+#ifdef BOOST_TP_BIND_WORKER_TO_PROCESSOR
explicit pool(
posix_time::time_duration const& asleep = posix_time::microseconds( 10),
scanns const& scns = scanns( 20) )
Modified: sandbox/threadpool/boost/tp/task.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/task.hpp (original)
+++ sandbox/threadpool/boost/tp/task.hpp 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
@@ -5,17 +5,43 @@
#ifndef BOOST_TP_TASK_H
#define BOOST_TP_TASK_H
+#include <boost/assert.hpp>
#include <boost/future.hpp>
+#include <boost/next_prior.hpp>
+#include <boost/thread.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/tp/detail/interrupter.hpp>
+#include <boost/tp/detail/worker.hpp>
-namespace boost { namespace tp
+namespace boost {
+namespace tp
{
template< typename R >
class task
{
private:
+ template< typename Iterator >
+ friend void waitfor_all( Iterator begin, Iterator end);
+ template< typename T1, typename T2 >
+ friend void waitfor_all( T1 & t1, T2 & t2);
+ template< typename T1, typename T2, typename T3 >
+ friend void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3);
+ template< typename T1, typename T2, typename T3, typename T4 >
+ friend void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4);
+ template< typename T1, typename T2, typename T3, typename T4, typename T5 >
+ friend void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5);
+ template< typename Iterator >
+ friend Iterator waitfor_any( Iterator begin, Iterator end);
+ template< typename T1, typename T2 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2);
+ template< typename T1, typename T2, typename T3 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3);
+ template< typename T1, typename T2, typename T3, typename T4 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4);
+ template< typename T1, typename T2, typename T3, typename T4, typename T5 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5);
+
shared_future< R > fut_;
detail::interrupter intr_;
@@ -69,12 +95,39 @@
bool timed_wait_until( system_time const& abs_time) const
{ return fut_.timed_wait_until( abs_time); }
+
+ void swap( task< R > & other)
+ {
+ fut_.swap( other.fut_);
+ intr_.swap( other.intr_);
+ }
};
template<>
class task< void >
{
private:
+ template< typename Iterator >
+ friend void waitfor_all( Iterator begin, Iterator end);
+ template< typename T1, typename T2 >
+ friend void waitfor_all( T1 & t1, T2 & t2);
+ template< typename T1, typename T2, typename T3 >
+ friend void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3);
+ template< typename T1, typename T2, typename T3, typename T4 >
+ friend void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4);
+ template< typename T1, typename T2, typename T3, typename T4, typename T5 >
+ friend void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5);
+ template< typename Iterator >
+ friend Iterator waitfor_any( Iterator begin, Iterator end);
+ template< typename T1, typename T2 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2);
+ template< typename T1, typename T2, typename T3 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3);
+ template< typename T1, typename T2, typename T3, typename T4 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4);
+ template< typename T1, typename T2, typename T3, typename T4, typename T5 >
+ friend unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5);
+
shared_future< void > fut_;
detail::interrupter intr_;
@@ -128,7 +181,136 @@
bool timed_wait_until( system_time const& abs_time) const
{ return fut_.timed_wait_until( abs_time); }
+
+ void swap( task< void > & other)
+ {
+ fut_.swap( other.fut_);
+ intr_.swap( other.intr_);
+ }
};
+
+template< typename Iterator >
+void waitfor_all( Iterator begin, Iterator end)
+{
+ for ( Iterator i = begin; i != end; ++i)
+ i->wait();
+}
+
+template< typename T1, typename T2 >
+void waitfor_all( T1 & t1, T2 & t2)
+{ wait_for_all( t1.fut_, t2.fut_); }
+
+template< typename T1, typename T2, typename T3 >
+void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3)
+{ wait_for_all( t1.fut_, t2.fut_, t3.fut_); }
+
+template< typename T1, typename T2, typename T3, typename T4 >
+void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4)
+{ wait_for_all( t1.fut_, t2.fut_, t3.fut_, t4.fut_); }
+
+template< typename T1, typename T2, typename T3, typename T4, typename T5 >
+void waitfor_all( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5)
+{ wait_for_all( t1.fut_, t2.fut_, t3.fut_, t4.fut_, t5.fut_); }
+
+template< typename Iterator >
+Iterator waitfor_any( Iterator begin, Iterator end)
+{
+ boost::detail::future_waiter waiter;
+ for ( Iterator i = begin; i != end; ++i)
+ waiter.add( i->fut_);
+ return next( begin, waiter.wait() );
+}
+
+template< typename T1, typename T2 >
+unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2)
+{ return wait_for_any( t1.fut_, t2.fut_); }
+
+template< typename T1, typename T2, typename T3 >
+unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3)
+{ return wait_for_any( t1.fut_, t2.fut_, t3.fut_); }
+
+template< typename T1, typename T2, typename T3, typename T4 >
+unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4)
+{ return wait_for_any( t1.fut_, t2.fut_, t3.fut_, t4.fut_); }
+
+template< typename T1, typename T2, typename T3, typename T4, typename T5 >
+unsigned int waitfor_any( task< T1 > & t1, task< T2 > & t2, task< T3 > & t3, task< T4 > & t4, task< T5 > & t5)
+{ return wait_for_any( t1.fut_, t2.fut_, t3.fut_, t4.fut_, t5.fut_); }
+}
+
+namespace this_task
+{
+template< typename Pred >
+void reschedule_until( Pred const& pred)
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ w->reschedule_until( pred);
+}
+
+template< typename Pool >
+Pool & get_thread_pool()
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ return w->get_thread_pool< Pool >();
+}
+
+inline
+bool is_worker()
+{ return tp::detail::worker::tss_get() != 0; }
+
+inline
+thread::id worker_id()
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ return w->get_id();
+}
+
+template< typename Pool >
+void sleep_until( system_time & abs_time)
+{
+ struct time_reached
+ {
+ system_time abs_time;
+
+ time_reached( system_time & abs_time_)
+ : abs_time( abs_time_)
+ {}
+
+ bool operator()()
+ { return get_system_time() >= abs_time; }
+ };
+
+ if ( is_worker() )
+ {
+ time_reached t( abs_time);
+ get_thread_pool< Pool >()->reschedule_until( t);
+ }
+ else
+ this_thread::sleep( abs_time);
+}
+
+template< typename Pool >
+void yield()
+{
+ struct always_true
+ {
+ always_true() {}
+
+ bool operator()()
+ { return true; }
+ };
+
+ if ( is_worker() )
+ {
+ always_true t;
+ get_thread_pool< Pool >()->reschedule_until( t);
+ }
+ else
+ this_thread::yield();
+}
} }
#endif // BOOST_TP_TASK_H
Modified: sandbox/threadpool/libs/tp/doc/task.qbk
==============================================================================
--- sandbox/threadpool/libs/tp/doc/task.qbk (original)
+++ sandbox/threadpool/libs/tp/doc/task.qbk 2009-04-05 15:35:33 EDT (Sun, 05 Apr 2009)
@@ -76,10 +76,6 @@
// throws boost::thread_interrupted exception
std::cout << t.get() << std::endl;
-[heading Waiting for multiple tasks]
-It is possible to wait for multiple tasks - `boost::tp::wait_for_all(tsk1,...,tskn)` blocks until all n tasks are ready and
-`boost::tp::wait_for_any(tsk1,...,tskn)` blocks until at least one of the tasks becomes ready.
-
[heading Exceptions in tasks]
Exceptions thrown inside an __action__ are transported by the associated task object.
Exceptions rethrown by type:
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk