Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r52318 - in sandbox/threadpool: boost/tp libs/tp/examples
From: oliver.kowalke_at_[hidden]
Date: 2009-04-11 03:12:10


Author: olli
Date: 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
New Revision: 52318
URL: http://svn.boost.org/trac/boost/changeset/52318

Log:
* this_task::interrupt()
* examples

Added:
   sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp (contents, props changed)
   sandbox/threadpool/libs/tp/examples/sleep.cpp (contents, props changed)
   sandbox/threadpool/libs/tp/examples/yield.cpp (contents, props changed)
Removed:
   sandbox/threadpool/libs/tp/examples/spread_over_hardware.cpp
Text files modified:
   sandbox/threadpool/boost/tp/pool.hpp | 4
   sandbox/threadpool/boost/tp/task.hpp | 84 +++++++++++++++++++++++++++------------
   sandbox/threadpool/libs/tp/examples/Jamfile.v2 | 12 +++-
   sandbox/threadpool/libs/tp/examples/fork_join.cpp | 77 ++++++++++++++++++------------------
   sandbox/threadpool/libs/tp/examples/interrupt.cpp | 2
   sandbox/threadpool/libs/tp/examples/pending.cpp | 2
   sandbox/threadpool/libs/tp/examples/reschedule_until.cpp | 7 --
   sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp | 2
   sandbox/threadpool/libs/tp/examples/submit.cpp | 2
   9 files changed, 111 insertions(+), 81 deletions(-)

Modified: sandbox/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/threadpool/boost/tp/pool.hpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -191,7 +191,7 @@
         {
                 if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
- std::size_t psize( thread::hardware_concurrency() );
+ poolsize psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
                 unique_lock< shared_mutex > lk( mtx_wg_);
@@ -217,7 +217,7 @@
         {
                 if ( asleep.is_special() || asleep.is_negative() )
                         throw invalid_timeduration("argument asleep is not valid");
- std::size_t psize( thread::hardware_concurrency() );
+ poolsize psize( thread::hardware_concurrency() );
                 BOOST_ASSERT( psize > 0);
                 channel_.activate();
                 unique_lock< shared_mutex > lk( mtx_wg_);

Modified: sandbox/threadpool/boost/tp/task.hpp
==============================================================================
--- sandbox/threadpool/boost/tp/task.hpp (original)
+++ sandbox/threadpool/boost/tp/task.hpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -240,6 +240,43 @@
 
 namespace this_task
 {
+namespace detail
+{
+struct time_reached
+{
+ system_time abs_time;
+
+ time_reached( system_time & abs_time_)
+ : abs_time( abs_time_)
+ {}
+
+ bool operator()()
+ { return get_system_time() >= abs_time; }
+};
+
+class always_true
+{
+private:
+ bool result_;
+
+public:
+ always_true()
+ : result_( false)
+ {}
+
+ bool operator()()
+ {
+ if ( ! result_)
+ {
+ result_ = true;
+ return false;
+ }
+ else
+ return true;
+ }
+};
+}
+
 template< typename Pred >
 void reschedule_until( Pred const& pred)
 {
@@ -268,49 +305,42 @@
         return w->get_id();
 }
 
-template< typename Pool >
-void sleep_until( system_time & abs_time)
+inline
+void sleep( system_time abs_time)
 {
- struct time_reached
- {
- system_time abs_time;
-
- time_reached( system_time & abs_time_)
- : abs_time( abs_time_)
- {}
-
- bool operator()()
- { return get_system_time() >= abs_time; }
- };
-
         if ( is_worker() )
         {
- time_reached t( abs_time);
- get_thread_pool< Pool >()->reschedule_until( t);
+ detail::time_reached t( abs_time);
+ reschedule_until( t);
         }
         else
                 this_thread::sleep( abs_time);
 }
 
-template< typename Pool >
+template< typename Duration >
+void sleep( Duration const& rel_time)
+{ sleep( get_system_time() + rel_time); }
+
+inline
 void yield()
 {
- struct always_true
- {
- always_true() {}
-
- bool operator()()
- { return true; }
- };
-
         if ( is_worker() )
         {
- always_true t;
- get_thread_pool< Pool >()->reschedule_until( t);
+ detail::always_true t;
+ reschedule_until( t);
         }
         else
                 this_thread::yield();
 }
+
+inline
+void interrupt()
+{
+ tp::detail::worker * w( tp::detail::worker::tss_get() );
+ BOOST_ASSERT( w);
+ w->interrupt();
+ this_thread::interruption_point();
+}
 } }
 
 #endif // BOOST_TP_TASK_H

Modified: sandbox/threadpool/libs/tp/examples/Jamfile.v2
==============================================================================
--- sandbox/threadpool/libs/tp/examples/Jamfile.v2 (original)
+++ sandbox/threadpool/libs/tp/examples/Jamfile.v2 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -15,12 +15,16 @@
         <threading>multi
     ;
 
-exe chained_submit : chained_submit.cpp ;
+exe bind_to_processor : bind_to_processor.cpp ;
+exe fork_join : for_join.cpp ;
 exe interrupt : interrupt.cpp ;
-exe lazy_submit : lazy_submit.cpp ;
+exe launch : launch.cpp ;
+exe parrallel_sort : parralele_sort.cpp ;
 exe pending : pending.cpp ;
 exe priority : priority.cpp ;
-exe fork_join : fork_join.cpp ;
+exe reschedule_until : reschedule_until.cpp ;
 exe shutdonw_now : shutdonw_now.cpp ;
+exe sleep : sleep.cpp ;
 exe smart : smart.cpp ;
-exe submit : submit.cpp ;
\ No newline at end of file
+exe submit : submit.cpp ;
+exe yield : yield.cpp ;

Added: sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/examples/bind_to_processors.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -0,0 +1,102 @@
+#include <iostream>
+#include <cstdlib>
+#include <stdexcept>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "boost/tp.hpp"
+
+namespace pt = boost::posix_time;
+namespace tp = boost::tp;
+
+typedef tp::pool< tp::unbounded_channel< tp::fifo > > pool_type;
+
+long serial_fib( long n)
+{
+ if( n < 2)
+ return n;
+ else
+ return serial_fib( n - 1) + serial_fib( n - 2);
+}
+
+class fib_task
+{
+private:
+ long cutof_;
+
+public:
+ fib_task( long cutof)
+ : cutof_( cutof)
+ {}
+
+ long execute( long n)
+ {
+ if ( n < cutof_) return serial_fib( n);
+ else
+ {
+ tp::task< long > t1(
+ boost::this_task::get_thread_pool< pool_type >().submit(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 1) ) );
+ tp::task< long > t2(
+ boost::this_task::get_thread_pool< pool_type >().submit(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 2) ) );
+ return t1.get() + t2.get();
+ }
+ }
+};
+
+
+long parallel_fib( long n)
+{
+ fib_task a( 5);
+ return a.execute( n);
+}
+
+int main( int argc, char *argv[])
+{
+ try
+ {
+ pool_type pool;
+
+ std::vector< tp::task< long > > results;
+ results.reserve( 20);
+
+ pt::ptime start( pt::microsec_clock::universal_time() );
+
+ for ( int i = 0; i < 26; ++i)
+ results.push_back(
+ tp::get_default_pool().submit(
+ boost::bind(
+ & parallel_fib,
+ i) ) );
+
+ tp::waitfor_all( results.begin(), results.end() );
+
+ int k = 0;
+ std::vector< tp::task< long > >::iterator e( results.end() );
+ for (
+ std::vector< tp::task< long > >::iterator i( results.begin() );
+ i != e;
+ ++i)
+ std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;
+
+ pt::ptime stop( pt::microsec_clock::universal_time() );
+ std::cout << ( stop - start).total_milliseconds() << " milli seconds" << std::endl;
+
+ return EXIT_SUCCESS;
+ }
+ catch ( std::exception const& e)
+ { std::cerr << "exception: " << e.what() << std::endl; }
+ catch ( ... )
+ { std::cerr << "unhandled" << std::endl; }
+
+ return EXIT_FAILURE;
+}

Modified: sandbox/threadpool/libs/tp/examples/fork_join.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/fork_join.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/fork_join.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -6,83 +6,82 @@
 #include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 
-#include "boost/tp/fifo.hpp"
-#include "boost/tp/pool.hpp"
-#include "boost/tp/poolsize.hpp"
-#include "boost/tp/unbounded_channel.hpp"
+#include "boost/tp.hpp"
 
 namespace pt = boost::posix_time;
 namespace tp = boost::tp;
 
-typedef tp::pool< tp::unbounded_channel< tp::fifo > > pool_type;
+typedef tp::default_pool pool_type;
 
-class fibo
+long serial_fib( long n)
+{
+ if( n < 2)
+ return n;
+ else
+ return serial_fib( n - 1) + serial_fib( n - 2);
+}
+
+class fib_task
 {
 private:
- int offset_;
+ long cutof_;
 
- int seq_( int n)
- {
- if ( n <= 1) return n;
- else return seq_( n - 2) + seq_( n - 1);
- }
-
- int par_( int n)
+public:
+ fib_task( long cutof)
+ : cutof_( cutof)
+ {}
+
+ long execute( long n)
         {
- if ( n <= offset_) return seq_( n);
+ if ( n < cutof_) return serial_fib( n);
                 else
                 {
- tp::task< int > t1(
+ tp::task< long > t1(
                                 boost::this_task::get_thread_pool< pool_type >().submit(
                                         boost::bind(
- & fibo::par_,
+ & fib_task::execute,
                                                 boost::ref( * this),
                                                 n - 1) ) );
- tp::task< int > t2(
+ tp::task< long > t2(
                                 boost::this_task::get_thread_pool< pool_type >().submit(
                                         boost::bind(
- & fibo::par_,
+ & fib_task::execute,
                                                 boost::ref( * this),
                                                 n - 2) ) );
                         return t1.get() + t2.get();
                 }
         }
+};
 
-public:
- fibo( int offset)
- : offset_( offset)
- {}
 
- int execute( int n)
- {
- int result( par_( n) );
- return result;
- }
-};
+long parallel_fib( long n)
+{
+ fib_task a( 5);
+ return a.execute( n);
+}
 
 int main( int argc, char *argv[])
 {
         try
         {
- pool_type pool( tp::poolsize( 3) );
- fibo fib( 5);
- std::vector< tp::task< int > > results;
- results.reserve( 40);
+ std::vector< tp::task< long > > results;
+ results.reserve( 20);
 
                 pt::ptime start( pt::microsec_clock::universal_time() );
 
- for ( int i = 0; i < 32; ++i)
+ for ( int i = 0; i < 26; ++i)
                         results.push_back(
- pool.submit(
+ tp::get_default_pool().submit(
                                         boost::bind(
- & fibo::execute,
- boost::ref( fib),
+ & parallel_fib,
                                                 i) ) );
 
+ tp::waitfor_all( results.begin(), results.end() );
+
                 int k = 0;
- std::vector< tp::task< int > >::iterator e( results.end() );
+ std::vector< tp::task< long > >::iterator e( results.end() );
                 for (
- std::vector< tp::task< int > >::iterator i( results.begin() );
+ std::vector< tp::task< long > >::iterator i( results.begin() );
                         i != e;
                         ++i)
                         std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;

Modified: sandbox/threadpool/libs/tp/examples/interrupt.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/interrupt.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/interrupt.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -54,7 +54,7 @@
                                         fibonacci_fn,
                                         10) ) );
                 t.interrupt();
- std::cout << t.get() << std::endl;
+ std::cout << t.result().get() << std::endl;
 
                 return EXIT_SUCCESS;
         }

Modified: sandbox/threadpool/libs/tp/examples/pending.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/pending.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/pending.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -52,7 +52,7 @@
                                         fibonacci_fn,
                                         10) ) );
                 std::cout << "pending tasks == " << pool.pending() << std::endl;
- std::cout << t.get() << std::endl;
+ std::cout << t.result().get() << std::endl;
 
                 return EXIT_SUCCESS;
         }

Modified: sandbox/threadpool/libs/tp/examples/reschedule_until.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/reschedule_until.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/reschedule_until.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -70,7 +70,7 @@
                                 boost::this_task::get_thread_pool< pool_type >().submit(
                                         fn2) );
 
- return t1.get() + t2.get();
+ return t1.result().get() + t2.result().get();
                 }
         }
 
@@ -111,16 +111,13 @@
                                                 boost::ref( fib),
                                                 i) ) );
 
- ::sleep( 1);
- tsk();
-
                 int k = 0;
                 std::vector< tp::task< int > >::iterator e( results.end() );
                 for (
                         std::vector< tp::task< int > >::iterator i( results.begin() );
                         i != e;
                         ++i)
- std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;
+ std::cout << "fibonacci " << k++ << " == " << i->result().get() << std::endl;
 
                 pt::ptime stop( pt::microsec_clock::universal_time() );
                 std::cout << ( stop - start).total_milliseconds() << " milli seconds" << std::endl;

Modified: sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/shutdonw_now.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -47,7 +47,7 @@
                                         10) ) );
                 boost::this_thread::sleep( pt::milliseconds( 250) );
                 pool.shutdown_now();
- std::cout << t.get() << std::endl;
+ std::cout << t.result().get() << std::endl;
 
                 return EXIT_SUCCESS;
         }

Added: sandbox/threadpool/libs/tp/examples/sleep.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/examples/sleep.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -0,0 +1,89 @@
+#include <iostream>
+#include <cstdlib>
+#include <stdexcept>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "boost/tp.hpp"
+
+namespace pt = boost::posix_time;
+namespace tp = boost::tp;
+
+typedef tp::default_pool pool_type;
+
+long serial_fib( long n)
+{
+ if( n < 2)
+ return n;
+ else
+ return serial_fib( n - 1) + serial_fib( n - 2);
+}
+
+class fib_task
+{
+private:
+ long cutof_;
+
+public:
+ fib_task( long cutof)
+ : cutof_( cutof)
+ {}
+
+ long execute( long n)
+ {
+ if ( n < cutof_)
+ {
+ if ( n == 0)
+ boost::this_task::sleep( pt::seconds( 2) );
+ return serial_fib( n);
+ }
+ else
+ {
+ tp::task< long > t1(
+ boost::this_task::get_thread_pool< pool_type >().submit(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 1) ) );
+ tp::task< long > t2(
+ boost::this_task::get_thread_pool< pool_type >().submit(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 2) ) );
+ return t1.get() + t2.get();
+ }
+ }
+};
+
+
+void parallel_fib( long n)
+{
+ fib_task a( 5);
+ long result = a.execute( n);
+ printf("n == %d, fibonnaci == %d\n", n, result);
+}
+
+int main( int argc, char *argv[])
+{
+ try
+ {
+ pool_type pool( tp::poolsize( 1) );
+ for ( int i = 0; i < 10; ++i)
+ pool.submit(
+ boost::bind(
+ & parallel_fib,
+ i) );
+ pool.shutdown();
+
+ return EXIT_SUCCESS;
+ }
+ catch ( std::exception const& e)
+ { std::cerr << "exception: " << e.what() << std::endl; }
+ catch ( ... )
+ { std::cerr << "unhandled" << std::endl; }
+
+ return EXIT_FAILURE;
+}

Deleted: sandbox/threadpool/libs/tp/examples/spread_over_hardware.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/spread_over_hardware.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
+++ (empty file)
@@ -1,102 +0,0 @@
-#include <iostream>
-#include <cstdlib>
-#include <stdexcept>
-#include <vector>
-
-#include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-
-#include "boost/tp/fifo.hpp"
-#include "boost/tp/pool.hpp"
-#include "boost/tp/poolsize.hpp"
-#include "boost/tp/unbounded_channel.hpp"
-
-namespace pt = boost::posix_time;
-namespace tp = boost::tp;
-
-typedef tp::pool< tp::unbounded_channel< tp::fifo > > pool_type;
-
-class fibo
-{
-private:
- int offset_;
-
- int seq_( int n)
- {
- if ( n <= 1) return n;
- else return seq_( n - 2) + seq_( n - 1);
- }
-
- int par_( int n)
- {
- if ( n <= offset_) return seq_( n);
- else
- {
- tp::task< int > t1(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fibo::par_,
- boost::ref( * this),
- n - 1) ) );
- tp::task< int > t2(
- boost::this_task::get_thread_pool< pool_type >().submit(
- boost::bind(
- & fibo::par_,
- boost::ref( * this),
- n - 2) ) );
- return t1.get() + t2.get();
- }
- }
-
-public:
- fibo( int offset)
- : offset_( offset)
- {}
-
- int execute( int n)
- {
- int result( par_( n) );
- return result;
- }
-};
-
-int main( int argc, char *argv[])
-{
- try
- {
- // ! BOOST_BIND_WORKER_TO_PROCESSORS must be defined !
- pool_type pool;
- fibo fib( 5);
- std::vector< tp::task< int > > results;
- results.reserve( 40);
-
- pt::ptime start( pt::microsec_clock::universal_time() );
-
- for ( int i = 0; i < 32; ++i)
- results.push_back(
- pool.submit(
- boost::bind(
- & fibo::execute,
- boost::ref( fib),
- i) ) );
-
- int k = 0;
- std::vector< tp::task< int > >::iterator e( results.end() );
- for (
- std::vector< tp::task< int > >::iterator i( results.begin() );
- i != e;
- ++i)
- std::cout << "fibonacci " << k++ << " == " << i->get() << std::endl;
-
- pt::ptime stop( pt::microsec_clock::universal_time() );
- std::cout << ( stop - start).total_milliseconds() << " milli seconds" << std::endl;
-
- return EXIT_SUCCESS;
- }
- catch ( std::exception const& e)
- { std::cerr << "exception: " << e.what() << std::endl; }
- catch ( ... )
- { std::cerr << "unhandled" << std::endl; }
-
- return EXIT_FAILURE;
-}

Modified: sandbox/threadpool/libs/tp/examples/submit.cpp
==============================================================================
--- sandbox/threadpool/libs/tp/examples/submit.cpp (original)
+++ sandbox/threadpool/libs/tp/examples/submit.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -40,7 +40,7 @@
                                 boost::bind(
                                         fibonacci_fn,
                                         10) ) );
- std::cout << t.get() << std::endl;
+ std::cout << t.result().get() << std::endl;
 
                 return EXIT_SUCCESS;
         }

Added: sandbox/threadpool/libs/tp/examples/yield.cpp
==============================================================================
--- (empty file)
+++ sandbox/threadpool/libs/tp/examples/yield.cpp 2009-04-11 03:12:09 EDT (Sat, 11 Apr 2009)
@@ -0,0 +1,88 @@
+#include <iostream>
+#include <cstdlib>
+#include <stdexcept>
+#include <vector>
+
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "boost/tp.hpp"
+
+namespace pt = boost::posix_time;
+namespace tp = boost::tp;
+
+typedef tp::default_pool pool_type;
+
+long serial_fib( long n)
+{
+ if( n < 2)
+ return n;
+ else
+ return serial_fib( n - 1) + serial_fib( n - 2);
+}
+
+class fib_task
+{
+private:
+ long cutof_;
+
+public:
+ fib_task( long cutof)
+ : cutof_( cutof)
+ {}
+
+ long execute( long n)
+ {
+ if ( n == 7)
+ boost::this_task::yield();
+
+ if ( n < cutof_)
+ return serial_fib( n);
+ else
+ {
+ tp::task< long > t1(
+ boost::this_task::get_thread_pool< pool_type >().submit(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 1) ) );
+ tp::task< long > t2(
+ boost::this_task::get_thread_pool< pool_type >().submit(
+ boost::bind(
+ & fib_task::execute,
+ boost::ref( * this),
+ n - 2) ) );
+ return t1.get() + t2.get();
+ }
+ }
+};
+
+
+void parallel_fib( long n)
+{
+ fib_task a( 5);
+ long result = a.execute( n);
+ printf("n == %d, fibonnaci == %d\n", n, result);
+}
+
+int main( int argc, char *argv[])
+{
+ try
+ {
+ pool_type pool( tp::poolsize( 1) );
+ for ( int i = 0; i < 10; ++i)
+ pool.submit(
+ boost::bind(
+ & parallel_fib,
+ i) );
+ pool.shutdown();
+
+ return EXIT_SUCCESS;
+ }
+ catch ( std::exception const& e)
+ { std::cerr << "exception: " << e.what() << std::endl; }
+ catch ( ... )
+ { std::cerr << "unhandled" << std::endl; }
+
+ return EXIT_FAILURE;
+}


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk