Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r52810 - in sandbox/task: boost/task boost/task/detail libs/task/doc libs/task/src
From: oliver.kowalke_at_[hidden]
Date: 2009-05-06 16:38:28


Author: olli
Date: 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
New Revision: 52810
URL: http://svn.boost.org/trac/boost/changeset/52810

Log:
* as_sub_task introduced: execute child task in pool -> store in worker-local-queue

Text files modified:
   sandbox/task/boost/task/async.hpp | 30 ++++++++++++++++++++
   sandbox/task/boost/task/detail/atomic.hpp | 2 +
   sandbox/task/boost/task/handle.hpp | 8 ++--
   sandbox/task/boost/task/pool.hpp | 58 ++++++---------------------------------
   sandbox/task/libs/task/doc/introduction.qbk | 38 ++++++++++++++++---------
   sandbox/task/libs/task/doc/pool.qbk | 20 +++++++++++++
   sandbox/task/libs/task/doc/task.qbk | 12 ++++++++
   sandbox/task/libs/task/src/semaphore_posix.cpp | 25 +++++++++++++---
   sandbox/task/libs/task/src/semaphore_windows.cpp | 16 +++++++++-
   sandbox/task/libs/task/src/wsq.cpp | 4 +-
   10 files changed, 138 insertions(+), 75 deletions(-)

Modified: sandbox/task/boost/task/async.hpp
==============================================================================
--- sandbox/task/boost/task/async.hpp (original)
+++ sandbox/task/boost/task/async.hpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -7,12 +7,16 @@
 #ifndef BOOST_TASK_ASYNC_H
 #define BOOST_TASK_ASYNC_H
 
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread.hpp>
 
 #include <boost/task/default_pool.hpp>
 #include <boost/task/detail/interrupter.hpp>
+#include <boost/task/detail/pool_callable.hpp>
 #include <boost/task/detail/thread_callable.hpp>
+#include <boost/task/detail/worker.hpp>
 #include <boost/task/handle.hpp>
 #include <boost/task/future.hpp>
 #include <boost/task/pool.hpp>
@@ -63,6 +67,32 @@
         }
 };
 
+struct as_sub_task
+{
+ template< typename R >
+ handle< R > operator()( task< R > t)
+ {
+ detail::worker * w( detail::worker::tss_get() );
+ if ( w)
+ {
+ shared_future< R > fut( t.get_future() );
+ function< bool() > wcb(
+ bind(
+ & shared_future< R >::is_ready,
+ fut) );
+ t.set_wait_callback(
+ bind(
+ ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
+ w,
+ wcb) );
+ w->put( detail::pool_callable( t, intr) );
+ return handle< R >( t.get_id(), fut, intr);
+ }
+ else
+ return new_thread()( t);
+ }
+};
+
 template< typename Fn, typename R >
 handle< R > async( Fn fn, task< R > t)
 { return fn( t); }

Modified: sandbox/task/boost/task/detail/atomic.hpp
==============================================================================
--- sandbox/task/boost/task/detail/atomic.hpp (original)
+++ sandbox/task/boost/task/detail/atomic.hpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -15,6 +15,8 @@
 {
 namespace detail
 {
+//TODO: use interlocked_exchange on Windows
+// and inline assembler on otherplatforms (XCHG etc.)
 inline
 void atomic_exchange( volatile uint32_t * object, uint32_t desired)
 { interprocess::detail::atomic_write32( object, desired); }

Modified: sandbox/task/boost/task/handle.hpp
==============================================================================
--- sandbox/task/boost/task/handle.hpp (original)
+++ sandbox/task/boost/task/handle.hpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -114,10 +114,10 @@
         { fut_.wait(); }
 
     template< typename Duration >
- bool timed_wait( Duration const& rel_time) const
+ bool wait_for( Duration const& rel_time) const
         { return fut_.timed_wait( rel_time); }
 
- bool timed_wait_until( system_time const& abs_time) const
+ bool wait_until( system_time const& abs_time) const
         { return fut_.timed_wait_until( abs_time); }
 
         void swap( handle< R > & other)
@@ -217,10 +217,10 @@
         { fut_.wait(); }
 
     template< typename Duration >
- bool timed_wait( Duration const& rel_time) const
+ bool wait_for( Duration const& rel_time) const
         { return fut_.timed_wait( rel_time); }
 
- bool timed_wait_until( system_time const& abs_time) const
+ bool wait_until( system_time const& abs_time) const
         { return fut_.timed_wait_until( abs_time); }
 
         void swap( handle< void > & other)

Modified: sandbox/task/boost/task/pool.hpp
==============================================================================
--- sandbox/task/boost/task/pool.hpp (original)
+++ sandbox/task/boost/task/pool.hpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -287,31 +287,12 @@
         template< typename R >
         handle< R > submit( task< R > t)
         {
- detail::interrupter intr;
- shared_future< R > fut( t.get_future() );
- detail::worker * w( detail::worker::tss_get() );
- if ( w)
- {
- function< bool() > wcb(
- bind(
- & shared_future< R >::is_ready,
- fut) );
- t.set_wait_callback(
- bind(
- ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
- w,
- wcb) );
- w->put( detail::pool_callable( t, intr) );
- return handle< R >( t.get_id(), fut, intr);
- }
- else
- {
- if ( closed_() )
- throw task_rejected("pool is closed");
+ if ( closed_() )
+ throw task_rejected("pool is closed");
 
- channel_.put( detail::pool_callable( t, intr) );
- return handle< R >( t.get_id(), fut, intr);
- }
+ detail::interrupter intr;
+ channel_.put( detail::pool_callable( t, intr) );
+ return handle< R >( t.get_id(), t.get_future(), intr);
         }
 
         template<
@@ -320,31 +301,12 @@
>
         handle< R > submit( task< R > t, Attr const& attr)
         {
- detail::interrupter intr;
- shared_future< R > fut( t.get_future() );
- detail::worker * w( detail::worker::tss_get() );
- if ( w)
- {
- function< bool() > wcb(
- bind(
- & shared_future< R >::is_ready,
- fut) );
- t.set_wait_callback(
- bind(
- ( void ( detail::worker::*)( function< bool() > const&) ) & detail::worker::reschedule_until,
- w,
- wcb) );
- w->put( detail::pool_callable( t, intr) );
- return handle< R >( t.get_id(), fut, intr);
- }
- else
- {
- if ( closed_() )
- throw task_rejected("pool is closed");
+ if ( closed_() )
+ throw task_rejected("pool is closed");
 
- channel_.put( channel_item( detail::pool_callable( t, intr), attr) );
- return handle< R >( t.get_id(), fut, intr);
- }
+ detail::interrupter intr;
+ channel_.put( channel_item( detail::pool_callable( t, intr), attr) );
+ return handle< R >( t.get_id(), t.get_future(), intr);
         }
 };
 }}

Modified: sandbox/task/libs/task/doc/introduction.qbk
==============================================================================
--- sandbox/task/libs/task/doc/introduction.qbk (original)
+++ sandbox/task/libs/task/doc/introduction.qbk 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -1,24 +1,34 @@
 [/
- (C) Copyright 2008 Oliver Kowalke.
- Distributed under the Boost Software License, Version 1.0.
- (See accompanying file LICENSE_1_0.txt or copy at
- http://www.boost.org/LICENSE_1_0.txt).
+ Copyright Oliver Kowalke 2009.
+ Distributed under the Boost Software License, Version 1.0.
+ (See accompanying file LICENSE_1_0.txt or copy at
+ http://www.boost.org/LICENSE_1_0.txt
 ]
 
-[section Introduction]
-__boost_threadpool__ is designed for effectively utilize the available hardware and provide a way for efficient asynchronous processing of independent time consuming operations (__actions__) in the same process.
-A __threadpool__ contains a number of __worker_threads__ to perform __actions__ in parallel, which are usually managed by a scheduler. If a __worker_thread__ completes its __action__, it will take the next __action__ from the scheduler until all __actions__ have been completed. The __worker_thread__ then sleeps until there are new __actions__ available.
-Using a pool of __worker_threads__ over creating a new thread for each __action__ may result in better performance and better system stability because the overhead for thread creation and destruction is negated.
 
-__boost_threadpool__ uses a __work_stealing__ algorithm and supports __fork_join__ parallelism (recursively splitting of __actions__ into sub-actions that are solved in parallel, waiting for them to complete, and then joining results).
+[section:motivation Motivation]
 
-In order to use the classes and functions described here, you can either include the specific headers specified by the descriptions of each class or function, or include the master library header:
+In order to take advantage of the technological evolution from single-core to many-core architectures multi-core machines requires to partition the compute bound work into smaller chunks that can run in parallel.
 
- #include <boost/tp/tp.hpp>
+The goal of parallel programming is to write once and have our code scale well as the hardware underneath it gets better, i.e. see incremental benefits when running our app on machines with more cores without changing the code.
 
-which includes all the other headers in turn.
+Had we partitioned the work into much smaller chunks, when a core is finished with the item it executes, it would be able to start working on another chunk hence achieving the desired load balancing.
 
-[note
-__boost_threadpool__ uses a modified version of the futures library, N2561 C++0x proposal, from Anthony Williams ([@http://www.justsoftwaresolutions.co.uk/threading/updated-implementation-of-c++-futures-3.html]).]
+__boost_task__ provides a framework to utilize the available hardware and provide a way for efficient asynchronous processing of time consuming operations in the same process.
+
+[note Equating multithreading to parallelism is not accurate. You can have multithreading on a single-core machine, but you can only have parallelism on a multi-core machine. the portions of your code that can truly run at the same time.]
+
+[heading Executing a task]
+
+ std::string echo( std::string const& msg)
+ { return msg; }
+
+ boost::task::handle< std::string > h(
+ boost::task::async(
+ boost::task::new_thread(),
+ boost::task::make_task(
+ echo,
+ "Hello World!") ) );
+ std::cout << h.get() << std::endl;
 
 [endsect]

Modified: sandbox/task/libs/task/doc/pool.qbk
==============================================================================
--- sandbox/task/libs/task/doc/pool.qbk (original)
+++ sandbox/task/libs/task/doc/pool.qbk 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -6,9 +6,29 @@
 ]
 
 [section:pool Pool]
+
+The CLR thread pool has the correct internal logic and a simple interface (QueueUserWorkItem) for scheduling work items to be executed by the threads it manages. This appears to be the answer to achieving fine grained parallelism.
+
+A thread pool basically has two functions: It maintains a queue (or queues) of work to be done, and a collection of threads which execute work from the queue(s). So designing a thread pool really comes down to a) finding ways to enqueue and dequeue work items very quickly (to keep the overhead of using the thread pool to a minimum) and b) developing an algorithm for choosing an optimal number of threads to service the queues.
+
+ThreadPool.QueueUserWorkItem conveys basically zero information about each work item, aside from that it exists. This places some important constraints on the execution of these items. For example, the thread pool does not know whether individual work items are related or not, so it has to assume they are all completely independent, implying that we cannot reorder work to optimize its execution, as independent work items typically must be executed in FIFO order to ensure fairness.
+
+In prior versions of the CLR, this queue was a simple linked list, protected by a Monitor lock. This incurs some overhead: we must allocate nodes for the list (and pay the cost of the GC having to traverse the list each time a GC occurs), and we must pay the cost of acquiring that lock every time we enqueue or dequeue a work item.
+
+We can improve this situation in a couple of ways: we can implement a more efficient FIFO queue, and we can enhance the API to allow the user to give us more information, allowing us to turn to even more efficient queuing strategies.
+
+Recall that the overhead of the existing FIFO queue comes from the expense of allocating and traversing the data structure, and the cost of acquiring the lock on each enqueue and dequeue operation. For 4.0, we are switching to a lock-free data structure with much lower synchronization overhead.
+
+However, we are still restricted in what we can do here – we still have very little information about the work we’re executing, and so we still need to use the same basic strategy to execute it. We can trim overhead here and there, but QUWI will probably never be a great way to execute very fine-grained workloads. We need a new API.
+
+Since a child task is just a piece of a larger task, we don’t need to worry about execution order. We just need to execute these things quickly. One well-known strategy for fast execution of unordered work items is “work stealing.”
+
 The pool manages internaly __worker_threads__ and submitted __actions__ are stored in a __channel__ (__global_queue__) for processing by the __worker_threads__ (using a __work_stealing__ algorithm). Each submitted __action__ gets associated with a __task__ object that will be returned. The __task__ object acts as a proxy for a result that is initially not known and gets evaluated later by a __worker_thread__.
 
 [heading Work-Stealing]
+The most important aspect of work-stealing is that it enables very fast enqueue and dequeue in the typical case, often requiring no synchronization at all. This virtually eliminates a large part of the overhead of QUWI, when working with child tasks. We still do need to allocate memory for the Task itself, and for the work-stealing queue, but like the improvements to the FIFO queue these data structures have been optimized for good GC performance. Parent tasks are fast; child tasks are much faster.
+
+
 Traditional thread poola do not scale because they use a single global queue protected by a global lock. The frequency at which __worker_threads__ aquire the global lock becomes a limiting factor for the throughput if:
 
 * the __actions__ become smaller

Modified: sandbox/task/libs/task/doc/task.qbk
==============================================================================
--- sandbox/task/libs/task/doc/task.qbk (original)
+++ sandbox/task/libs/task/doc/task.qbk 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -6,6 +6,18 @@
 ]
 
 [section:task Task]
+
+Even without parent/child relationships, Task is a major improvement over QUWI. QUWI returns nothing of use to the caller; it simply queues a delegate, and leaves it up to the implementation of that delegate to coordinate its activities with the rest of the application. QUWI provides no means of waiting for the completion of the work item, for handling exceptions, or getting the result of a computation. Task provides all of this in a very easy-to-use form, while adding very little overhead vs. QUWI.
+
+The fact that Task has a Wait method is not just a convenience; it eliminates one of the most common problems people face when using QUWI. It is fairly common for one work item to need to wait for the execution of another work item to complete. If the second work item has not yet begun executing, it will be sitting in the queue waiting for a worker thread to pick it up. It is possible that there are no available worker threads – maybe they’re all waiting for other work items to complete! This can cause deadlock in the worst case, and very slow execution in the best, as the thread pool may be slow to add more worker threads to pick up these work items. Task.Wait, on the other hand, knows it’s waiting for another task, and is tightly integrated with the thread pool such that it is able to determine whether the task has started executing, and if not it executes it immediately, in-line on the current thread. This greatly improves performance and eliminates the possibility of deadlock in this situati
on.
+
+For new code, Task is now the preferred way to queue work to the thread pool.
+
+Top-level Tasks have no parent. These are Tasks created by non-thread-pool threads, or with certain options specified at Task-creation time. These tasks are queued to the same FIFO queue we use for QUWI, and thus benefit from the improvements we’ve made there – but they are also subject to the same limitations. Tasks queued in this way are simply a better QUWI – but now the fun starts: A parent task can create child tasks. This happens whenever a Task creates another Task (unless it overrides this behavior). These children are implicitly treated as sub-tasks of the larger task. We assume that sub-tasks can be executed in any order – fairness is not necessary – because all that matters is that the overall operation be completed as fast as possible. This lets us throw those FIFO restrictions out the window, and opens up the possibility for much more efficient work scheduling strategies.
+
+Since a child task is just a piece of a larger task, we don’t need to worry about execution order. We just need to execute these things quickly. One well-known strategy for fast execution of unordered work items is “work stealing.”
+
+
 For each submitted __action__ a new task object will be created and returned by the pool.
 The __task__ object acts as a proxy for a result that is initially not known and gets evaluated later by a __worker_thread__. It transports the result (value or exception) of the __action__ back to the caller.
 

Modified: sandbox/task/libs/task/src/semaphore_posix.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_posix.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_posix.cpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -11,6 +11,8 @@
 #include <boost/system/error_code.hpp>
 #include <boost/system/system_error.hpp>
 
+#include "boost/task/utility.hpp"
+
 namespace boost { namespace task
 {
 semaphore::semaphore( int value)
@@ -34,11 +36,24 @@
 semaphore::wait()
 {
         int ret( -1);
- do
- { ret = ::sem_wait( & handle_); }
- while ( ret == -1 && errno == EINTR);
- if ( ret == -1)
- throw system::system_error( errno, system::system_category);
+ if ( this_task::runs_in_pool() )
+ {
+ // TODO: use semaphore::try_wait(), create a fiber and do a reschedule until
+ // semaphore::try_wait() returns true
+ do
+ { ret = ::sem_wait( & handle_); }
+ while ( ret == -1 && errno == EINTR);
+ if ( ret == -1)
+ throw system::system_error( errno, system::system_category);
+ }
+ else
+ {
+ do
+ { ret = ::sem_wait( & handle_); }
+ while ( ret == -1 && errno == EINTR);
+ if ( ret == -1)
+ throw system::system_error( errno, system::system_category);
+ }
 }
 
 bool

Modified: sandbox/task/libs/task/src/semaphore_windows.cpp
==============================================================================
--- sandbox/task/libs/task/src/semaphore_windows.cpp (original)
+++ sandbox/task/libs/task/src/semaphore_windows.cpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -12,6 +12,8 @@
 #include <boost/system/error_code.hpp>
 #include <boost/system/system_error.hpp>
 
+#include "boost/task/utility.hpp"
+
 namespace boost { namespace task
 {
 semaphore::semaphore( int value)
@@ -34,8 +36,18 @@
 void
 semaphore::wait()
 {
- if ( ::WaitForSingleObject( handle_, INFINITE) != WAIT_OBJECT_0)
- throw system::system_error( ::GetLastError(), system::system_category);
+ if ( this_task::runs_in_pool() )
+ {
+ // TODO: use semaphore::try_wait(), create a fiber and do a reschedule until
+ // semaphore::try_wait() returns true
+ if ( ::WaitForSingleObject( handle_, INFINITE) != WAIT_OBJECT_0)
+ throw system::system_error( ::GetLastError(), system::system_category);
+ }
+ else
+ {
+ if ( ::WaitForSingleObject( handle_, INFINITE) != WAIT_OBJECT_0)
+ throw system::system_error( ::GetLastError(), system::system_category);
+ }
 }
 
 bool

Modified: sandbox/task/libs/task/src/wsq.cpp
==============================================================================
--- sandbox/task/libs/task/src/wsq.cpp (original)
+++ sandbox/task/libs/task/src/wsq.cpp 2009-05-06 16:38:26 EDT (Wed, 06 May 2009)
@@ -71,7 +71,7 @@
         if ( tail == 0)
                 return false;
         tail -= 1;
- atomic_exchange( & tail_idx_, tail); // Interlocked.Exchange( & tail_idx_, tail); -> XCHG
+ atomic_exchange( & tail_idx_, tail);
         if ( head_idx_ <= tail)
         {
                 ca = array_[tail & mask_];
@@ -100,7 +100,7 @@
         if ( lk.owns_lock() )
         {
                 uint32_t head( head_idx_);
- atomic_exchange( & head_idx_, head + 1); // Interlocked.Exchange( & head_idx_, head + 1);
+ atomic_exchange( & head_idx_, head + 1);
                 if ( head < tail_idx_)
                 {
                         ca = array_[head & mask_];


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk