|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r53149 - in sandbox/task: boost/task libs/task/doc libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-05-21 02:57:04
Author: olli
Date: 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
New Revision: 53149
URL: http://svn.boost.org/trac/boost/changeset/53149
Log:
* thread::join() mayb throw -> catch exceptions
Text files modified:
sandbox/task/boost/task/async.hpp | 5 ++
sandbox/task/libs/task/doc/handle.qbk | 8 ++--
sandbox/task/libs/task/doc/introduction.qbk | 13 ++++++--
sandbox/task/libs/task/doc/own_thread.qbk | 4 +-
sandbox/task/libs/task/doc/pool.qbk | 47 +++++++++++++++++---------------
sandbox/task/libs/task/doc/task.qbk | 58 +++++++++++++++++++++++----------------
sandbox/task/libs/task/src/worker_group.cpp | 7 ++++
7 files changed, 85 insertions(+), 57 deletions(-)
Modified: sandbox/task/boost/task/async.hpp
==============================================================================
--- sandbox/task/boost/task/async.hpp (original)
+++ sandbox/task/boost/task/async.hpp 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -31,7 +31,10 @@
{
void operator()( thread * thrd)
{
- thrd->join();
+ try
+ { thrd->join(); }
+ catch (...)
+ {}
delete thrd;
}
};
Modified: sandbox/task/libs/task/doc/handle.qbk
==============================================================================
--- sandbox/task/libs/task/doc/handle.qbk (original)
+++ sandbox/task/libs/task/doc/handle.qbk 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -31,7 +31,7 @@
* __fn_wait_until__: wait until result ist set or time-point has elapsed
-
+``
long fibonacci( long n)
{
boost::this_thread::interruption_point(); // interruption point
@@ -69,7 +69,7 @@
std::cout << "has exception == " << std::boolalpha << h.has_exception() << "\n";
std::cout << "fibonacci(10) == " << h.get() << std::endl;
}
-
+``
[heading:task_interruption Task interruption]
@@ -83,7 +83,7 @@
* __fn_interruption_requested__: return bool if interruption was requested
-
+``
long cooperative( long n)
{
boost::this_thread::interruption_point(); // interruption point
@@ -121,6 +121,6 @@
std::cout << "has exception == " << std::boolalpha << h.has_exception() << "\n";
std::cout << h.get() << std::endl; // throws boost::task::task_interrupted
}
-
+``
[endsect]
Modified: sandbox/task/libs/task/doc/introduction.qbk
==============================================================================
--- sandbox/task/libs/task/doc/introduction.qbk (original)
+++ sandbox/task/libs/task/doc/introduction.qbk 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -24,6 +24,7 @@
The framework provides some link_ae[__aes__], like __new_thread__, in order to execute the task asynchronously in another execution context (__boost_task__
uses preemptible threads for this purpose).
+``
void print( std::string const& msg)
{ printf("%s\n", msg.c_str() ); }
@@ -35,10 +36,12 @@
print,
"Hello World!") ) );
}
+``
In order to manage the task __assync__ returns a handle __handle__ (associated with the submitted task). It functions as a __act__ - that means it transfers
the result of the execution back to the caller thread.
+``
std::string echo( std::string const& msg)
{ return msg; }
@@ -52,9 +55,11 @@
"Hello World!") ) );
std::cout << h.get() << std::endl; // wait until task is finished and return the result
}
+``
The task can also be interrupted via __handle__ if it is a so-called link_coop_task[__coop_task__].
+``
void long_running( boost::posix_time::time_duration const& rel_time)
{ boost::this_thread::sleep( rel_time);
@@ -69,11 +74,12 @@
h.interrupt(); // interrupt execution of task
std::cout << h.get() << std::endl; // wait until task is finished, will throw an exeception
}
-
+``
Beside __new_thread__ (which creates a new task for each submitted task) __boost_task__ provides link_pool[__thread_pools__] to prevent the overhead of thread creation
and destruction for each task. __default_pool__ submitts the tasks to the default __thread_pool__ which contains a fixed number of pre-spawned __worker_threads__ (custom __thread_pools__ are supported too).
+``
long serial_fib( long n)
{
if( n < 2) return n;
@@ -115,19 +121,20 @@
5) ) );
std::cout << h.get() << std::endl;
}
-
+``
With function __fn_runs_in_pool__ a task can detect if it is executed inside a __thread_pool__.
__sub_tasks__ arecreated by __as_sub_task__. Where the __sub_task__ is executed by a new thread if the parent task
is not executed inside a __thread_pool__. In the other case the __sub_task__ is put into the local __worker_queue__
which enables link_work_stealing[__work_stealing__] and link_forkjoin[inline execution] of tasks.
+``
boost::task::async(
boost::task::as_sub_task(), // sub-task executed in a new thread or inside
boost::task::make_task( // the thread-pool depending upon the parent
parallel_fib, // task is executed inside a thread-pool or not
10,
5) ) );
-
+``
[endsect]
Modified: sandbox/task/libs/task/doc/own_thread.qbk
==============================================================================
--- sandbox/task/libs/task/doc/own_thread.qbk (original)
+++ sandbox/task/libs/task/doc/own_thread.qbk 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -10,7 +10,7 @@
__own_thread__ executes the task in the current thread (synchronous execution - concerns of [@http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2008/n2802.html N2802] do not aply).
-
+``
long fibonacci( long n)
{
if ( n == 0) return 0;
@@ -35,7 +35,7 @@
10) ) );
std::cout << "fibonacci(10) == " << h.get() << std::endl;
}
-
+``
[endsect]
Modified: sandbox/task/libs/task/doc/pool.qbk
==============================================================================
--- sandbox/task/libs/task/doc/pool.qbk (original)
+++ sandbox/task/libs/task/doc/pool.qbk 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -7,56 +7,51 @@
[section:pool Pool]
-It is fairly common for one work item to need to wait for the execution of another work item to complete. If the second work item has not yet begun executing, it will be sitting in the queue waiting for a worker thread to pick it up. It is possible that there are no available worker threads â maybe theyâre all waiting for other work items to complete! This can cause deadlock in the worst case, and very slow execution in the best, as the thread pool may be slow to add more worker threads to pick up these work items. Task.Wait, on the other hand, knows itâs waiting for another task, and is tightly integrated with the thread pool such that it is able to determine whether the task has started executing, and if not it executes it immediately, in-line on the current thread. This greatly improves performance and eliminates the possibility of deadlock in this situation.
+A __thread_pool__ maintains a queue (or queues) of work to be done, and a collection of __worker_threads__ which execute work from the queue(s).
-The CLR thread pool has the correct internal logic and a simple interface (QueueUserWorkItem) for scheduling work items to be executed by the threads it manages. This appears to be the answer to achieving fine grained parallelism.
+Since a __sub_task__ is just a piece of a larger __task__, we donât need to worry about execution order. We just need to execute these things quickly. One well-known strategy for fast execution of unordered work items is __work_stealing__.
-A thread pool basically has two functions: It maintains a queue (or queues) of work to be done, and a collection of threads which execute work from the queue(s). So designing a thread pool really comes down to a) finding ways to enqueue and dequeue work items very quickly (to keep the overhead of using the thread pool to a minimum) and b) developing an algorithm for choosing an optimal number of threads to service the queues.
-ThreadPool.QueueUserWorkItem conveys basically zero information about each work item, aside from that it exists. This places some important constraints on the execution of these items. For example, the thread pool does not know whether individual work items are related or not, so it has to assume they are all completely independent, implying that we cannot reorder work to optimize its execution, as independent work items typically must be executed in FIFO order to ensure fairness.
-
-In prior versions of the CLR, this queue was a simple linked list, protected by a Monitor lock. This incurs some overhead: we must allocate nodes for the list (and pay the cost of the GC having to traverse the list each time a GC occurs), and we must pay the cost of acquiring that lock every time we enqueue or dequeue a work item.
-
-We can improve this situation in a couple of ways: we can implement a more efficient FIFO queue, and we can enhance the API to allow the user to give us more information, allowing us to turn to even more efficient queuing strategies.
-
-Recall that the overhead of the existing FIFO queue comes from the expense of allocating and traversing the data structure, and the cost of acquiring the lock on each enqueue and dequeue operation. For 4.0, we are switching to a lock-free data structure with much lower synchronization overhead.
-
-However, we are still restricted in what we can do here â we still have very little information about the work weâre executing, and so we still need to use the same basic strategy to execute it. We can trim overhead here and there, but QUWI will probably never be a great way to execute very fine-grained workloads. We need a new API.
+[heading Work-Stealing]
-Since a child task is just a piece of a larger task, we donât need to worry about execution order. We just need to execute these things quickly. One well-known strategy for fast execution of unordered work items is âwork stealing.â
+The most important aspect of work-stealing is that it enables very fast enqueue and dequeue in the typical case, often requiring no synchronization at all. This virtually eliminates a large part of the overhead of QUWI, when working with child tasks. We still do need to allocate memory for the Task itself, and for the work-stealing queue, but like the improvements to the FIFO queue these data structures have been optimized for good GC performance. Parent tasks are fast; child tasks are much faster.
-The pool manages internaly __worker_threads__ and submitted __actions__ are stored in a __channel__ (__global_queue__) for processing by the __worker_threads__ (using a __work_stealing__ algorithm). Each submitted __action__ gets associated with a __task__ object that will be returned. The __task__ object acts as a proxy for a result that is initially not known and gets evaluated later by a __worker_thread__.
-[heading Work-Stealing]
-The most important aspect of work-stealing is that it enables very fast enqueue and dequeue in the typical case, often requiring no synchronization at all. This virtually eliminates a large part of the overhead of QUWI, when working with child tasks. We still do need to allocate memory for the Task itself, and for the work-stealing queue, but like the improvements to the FIFO queue these data structures have been optimized for good GC performance. Parent tasks are fast; child tasks are much faster.
+Traditional thread pool do not scale because they use a single global queue protected by a global lock. The frequency at which __worker_threads__ aquire the global lock becomes a limiting factor for the throughput if:
-Traditional thread poola do not scale because they use a single global queue protected by a global lock. The frequency at which __worker_threads__ aquire the global lock becomes a limiting factor for the throughput if:
+* the __tasks__ become smaller
-* the __actions__ become smaller
* more processors are added
+
A work-stealing algorithm can be used to solve this problem. It uses a special kind of queue which has two ends, and allows lock-free pushes and pops from the ['private end] (accessed by the __worker_thread__ owning the queue), but requires synchronization from the ['public end] (accessed by the other __worker_threads__). Synchronization is necessary when the queue is sufficiently small that private and public operations could conflict.
The pool contains one global queue (__bounded_channel__ or __unbounded_channel__) protected by a global lock and each __worker_thread__ has its own private worker queue. If work is enqueued by a __worker_thread__ the __action__ is stored in the worker queue. If the work is enqueued by a application thread it goes into the global queue. When __worker_threads__ are looking for work, they have following search order:
+
* look into the private worker queue - __actions__ can be dequeued without locks
+
* look in the global queue - locks are used for synchronization
+
* check other worker queues ('stealing' __actions__ from private worker queues of other __worker_threads__) - requires locks
-For a lot of recursively queued __actions__, the use of a worker queue per thread substantially reduces the synchronization necessary to complete the work. There are also fewer cache effects due to sharing of the global queue information.
+
+For a lot of recursively queued __tasks__, the use of a worker queue per thread substantially reduces the synchronization necessary to complete the work. There are also fewer cache effects due to sharing of the global queue information.
Operations on the private worker queue are executed in LIFO order and operations on worker queues of other __worker_threads__ in FIFO order (steals).
+
* There are chances that memory is still hot in the cache, if the __actions__ are pushed in LIFO order into the private worker queue.
-* If a __worker_thread__ steals work in FIFO order, increases the chances that a larger 'chunk' of work will be stolen (the need for other steals will be possibly reduced). Because the __actions__ are stored in LIFO order, the oldest items are closer to the ['public end] of the queue (forming a tree). Stealing such an older __action__ also steals a (probably) larger subtree of __actions__ unfolded if the stolen work item get executed.
-[note __Actions__ submitted by a __worker_thread__ are stored into its private worker queue in LIFO order, thatswhy priorities and timeouts specified at the submit-function get ignored.]
+* If a __worker_thread__ steals work in FIFO order, increases the chances that a larger 'chunk' of work will be stolen (the need for other steals will be possibly reduced). Because the __tasks__ are stored in LIFO order, the oldest items are closer to the ['public end] of the queue (forming a tree). Stealing such an older __task__ also steals a (probably) larger subtree of __tasks__ unfolded if the stolen work item get executed.
+
-[important Because of the work-stealing algorithm the execution order of __actions__ may be not strict as in the global queue.]
+[important Because of the work-stealing algorithm the execution order of __tasks__ may be not strict as in the global queue.]
[heading Creation]
+
The first template argument specifies the channel type and the scheduling policy.
boost::tp::pool<
@@ -79,6 +74,7 @@
[heading Shutdown]
+
If `boost::tp::pool< Channel >::shutdown()` is called - the the pool is set closed and all __worker_threads__ are joined until all pending __actions__ are processed. No futher __actions__ can be submitted by application threads.
[note The deconstructor calls `boost::tp::pool< Channel >::shutdown()` if the pool was not shutdown yet.]
@@ -110,14 +106,19 @@
[heading Default pool]
+
The free function `boost::tp::get_default_pool()` returns a reference to the default __threadpool__ instance. The default __threadpool__ is
of type `boost::tp::pool< boost::tp::unbounded_channel< boost::tp::fifo > >` and will contain as many __worker_threads__ as
`boost::thread::hardware_concurrency()` returns.
+
[heading Launch in pool]
+
The free function `boost::tp::launch_in_pool( Act const& act)` submits the __action__ to the default pool and returns a task object.
+
[heading Meta functions]
+
If the __threadpool__ supports priorities `boost::tp::has_priority< pool_type >` evaluates to `true`. The priority type is determined by `boost::tp::priority_type< pool_type >`.
typedef boost::tp::pool<
@@ -131,7 +132,9 @@
std::cout << std::boolalpha << boost::tp::has_fibers< pool_type >::value << std::endl;
+
[heading Processor binding]
+
For some applications it is convenient to bind the worker threads of the pool to processors of the system. For this purpose BOOST_BIND_WORKER_TO_PROCESSORS must be defined. Without the poolsize in the construtor the __threadpool__ will contain as many
__worker_threads__ as processors (== __hardware_concurrency__) are available and each __worker_thread__ is bound to one processor.
Modified: sandbox/task/libs/task/doc/task.qbk
==============================================================================
--- sandbox/task/libs/task/doc/task.qbk (original)
+++ sandbox/task/libs/task/doc/task.qbk 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -21,35 +21,40 @@
* create task from free-function with arguments:
- boost::task::task< int > t(
- boost::task::make_task(
- parallel_fib,
- 10) );
+``
+ boost::task::task< int > t(
+ boost::task::make_task(
+ parallel_fib,
+ 10) );
+``
* create task from member-function with arguments:
- struct X
- {
- void f( int i);
- };
-
- X x;
- boost::task::task< int > t(
- boost::task::make_task(
- & X::f,
- x,
- 10) );
-
+``
+ struct X
+ {
+ void f( int i);
+ };
+
+ X x;
+ boost::task::task< int > t(
+ boost::task::make_task(
+ & X::f,
+ x,
+ 10) );
+``
+
It is possible to create __task__ from a __callable__ too:
- struct Y
- {
- std::string operator()();
- };
-
- Y y;
- boost::task::task< std::string > t( y);
+``
+ struct Y
+ {
+ std::string operator()();
+ };
+ Y y;
+ boost::task::task< std::string > t( y);
+``
[heading:cooperative Cooperative task amd interruption]
@@ -73,6 +78,7 @@
A __interruption__point__ throws __task_interrupted__ if an interruption was requested.
+``
long cooperative( long n)
{
boost::this_thread::interruption_point(); // interruption point
@@ -105,12 +111,13 @@
10) ) );
std::cout << h.get() << std::endl; // throws boost::task::task_interrupted
}
-
+``
[heading:exceptions Exceptions]
Exceptions thrown by __task__ are transported by the __act__.
+``
void throwing()
{
...
@@ -127,6 +134,7 @@
throwing) ) );
std::cout << h.wait() << std::endl; // throws std::domain_error
}
+``
Exceptions rethrown by type are:
@@ -158,6 +166,7 @@
Top-level tasks have no parent. A parent task can create child tasks when it creates another task by using __as_sub_task__ as __ae__. These children are implicitly treated as __sub_tasks__ of the larger task. It is assumed that that __sub_tasks__ can be executed in any order because only overall operation
speed matters (enabling strategies for fast execution of unordered work-items as link_work_stealing[__work_stealing__]).
+``
long serial_fib( long n)
{
if( n < 2) return n;
@@ -199,5 +208,6 @@
5) ) );
std::cout << h.get() << std::endl;
}
+``
[endsect]
Modified: sandbox/task/libs/task/src/worker_group.cpp
==============================================================================
--- sandbox/task/libs/task/src/worker_group.cpp (original)
+++ sandbox/task/libs/task/src/worker_group.cpp 2009-05-21 02:57:02 EDT (Thu, 21 May 2009)
@@ -52,7 +52,12 @@
worker_group::join_all()
{
BOOST_FOREACH( worker w, cont_)
- { w.join(); }
+ {
+ try
+ { w.join(); }
+ catch (...)
+ {}
+ }
}
void
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk