Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r51111 - in sandbox/interthreads/libs/interthreads/example: . detail
From: vicente.botet_at_[hidden]
Date: 2009-02-08 16:10:44


Author: viboes
Date: 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
New Revision: 51111
URL: http://svn.boost.org/trac/boost/changeset/51111

Log:
interthreads version 0.4
    * New free functions for all the AsynchronousCompletionToken operations, providing a higher degree of freedom.
    * Missing have_all_values(), have_all_exception() and are_all_ready() functions on AsynchronousCompletionToken fusion tuples.
    * get_all: getting all the values from a tuple of AsynchronousCompletionToken works now.
    * fork_after overloaded for a single dependency
    * wait_all overloaded for a single ACT.
    * wait_for_all evaluate one of its elements on the current thread
    * No need to use wait_and_get() on thread_specific_shared_ptr<> to synchronize with the decoration if the thread is created using a AsynchronousExecutor decorator. In this case the synchro is done before returning the AsynchronousCompletionToken. See the tutorial and the mono_thread_id example.

Text files modified:
   sandbox/interthreads/libs/interthreads/example/async_ostream.cpp | 80 +++++++++++++++++++++++++++++----------
   sandbox/interthreads/libs/interthreads/example/async_ostream.hpp | 4 +
   sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp | 66 +++++++++++++++++---------------
   sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp | 3 +
   sandbox/interthreads/libs/interthreads/example/hello_world.cpp | 2
   sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp | 18 +++++----
   sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp | 21 +++++-----
   sandbox/interthreads/libs/interthreads/example/timestamped.hpp | 6 +-
   8 files changed, 124 insertions(+), 76 deletions(-)

Modified: sandbox/interthreads/libs/interthreads/example/async_ostream.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/async_ostream.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/async_ostream.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,6 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. Distributed under the Boost
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. Distributed under the Boost
 // Software License, Version 1.0. (See accompanying file
 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -21,7 +22,7 @@
 #include <iostream> // std::ostream
 #include <sstream> // std::stringstream
 
-boost::mutex out_global_mutex;
+//boost::mutex out_global_mutex;
 
 namespace boost {
 namespace interthreads {
@@ -37,6 +38,10 @@
         : current_(new element_type())
         , seq_(0)
         {}
+
+ ~async_ostream_thread_ctx() {
+ delete current_;
+ }
 
         boost::mutex& get_mutex() {return mutex_;}
         std::stringstream& buffer() {return current_->value_;}
@@ -70,7 +75,6 @@
         }
         void print_stats() {
             boost::thread::id id= boost::this_thread::get_id();
- boost::lock_guard<boost::mutex> lock(out_global_mutex);
             std::cout << "TID=" << id <<" nb_push " << nb_push << std::endl;
             std::cout << "TID=" << id <<" nb_push_gt_1 " << nb_push << std::endl;
             std::cout << "TID=" << id <<" nb_get " << nb_push << std::endl;
@@ -78,6 +82,7 @@
             std::cout << "TID=" << id <<" inc " << inc_ << std::endl;
         };
         bool empty() {
+ boost::lock_guard<boost::mutex> lock(mutex_);
             return queue_.empty();
         }
         void inc() {
@@ -97,7 +102,10 @@
             return e;
         }
         element_type *current_;
+#if 1
         queue_type queue_;
+#else
+#endif
         unsigned seq_;
         boost::mutex mutex_;
         unsigned nb_push_gt_1;
@@ -111,29 +119,34 @@
     //==========================================================================================
 #ifdef CONCENTRATOR
     struct async_ostream_concentrator {
- static void loop(async_ostream_sink::impl* that);
- async_ostream_concentrator(async_ostream_sink::impl* impl_ptr)
+ static void loop(async_ostream_sink::impl* that);
+ async_ostream_concentrator(async_ostream_sink::impl* impl_ptr)
             : thread_(boost::bind(loop, impl_ptr)) {}
- ~async_ostream_concentrator() {}
+ ~async_ostream_concentrator() {}
 
- private:
- boost::thread thread_;
- };
+ private:
+ boost::thread thread_;
+ };
 #endif
-
+
     //==========================================================================================
     
     typedef thread_specific_shared_ptr<async_ostream_thread_ctx> tsss_type;
     
     struct async_ostream_sink::impl {
         impl(std::ostream& os)
- : os_(os)
+ : end_(false)
+ ,os_(os)
         , tsss_(terminate)
 #ifndef CONCENTRATOR
         , thread_(boost::bind(loop, this))
 #endif
         {}
-
+
+ ~impl() {
+ }
+
+ bool end_;
         std::ostream& os_;
         tsss_type tsss_;
         priority_queue_type queue_;
@@ -141,14 +154,13 @@
         boost::once_flag concentrator_flag_;
         async_ostream_concentrator* concentrator_;
 #else
- boost::thread thread_;
+ boost::thread thread_;
 #endif
             
         static void terminate(shared_ptr<async_ostream_thread_ctx> that) {
             while (!that->empty()) {
                 that->inc();
             }
- //that->print_stats();
         }
         
 #ifdef CONCENTRATOR
@@ -160,7 +172,7 @@
                              boost::bind(create_concentrator_once, this));
         }
 #else
- static void loop(impl* that);
+ static void loop(impl* that);
 #endif
         
         std::streamsize write(const char* s, std::streamsize n) {
@@ -171,6 +183,14 @@
             tsss_->flush();
         }
         
+ void close() {
+ end_=true;
+#ifdef CONCENTRATOR
+ concentrator_.close();
+#else
+ thread_.join();
+#endif
+ }
             
     };
 
@@ -179,6 +199,10 @@
     async_ostream_sink::async_ostream_sink(std::ostream& os)
         : impl_(new async_ostream_sink::impl(os)) {}
 
+ async_ostream_sink::~async_ostream_sink()
+ {
+ }
+
     std::streamsize async_ostream_sink::write(const char* s, std::streamsize n) {
         return impl_->write(s,n);
     }
@@ -209,14 +233,16 @@
                 //it->second->print_stats();
             }
             if (that->queue_.empty()) {
+ if (that->end_) break;
                 boost::this_thread::sleep(boost::posix_time::milliseconds(10));
             } else {
                 element_type* e = that->queue_.top();
                 that->queue_.pop();
+
     #ifdef XTIME
- os_ << e->seq_ << "["<<e->date_.sec<<":"<<e->date_.nsec<<"]| " << e->value_.str();
+ os_ << e->seq_ << "["<<e->date_.sec<<":"<<e->date_.nsec<<"]| " << e->value_.str().length() << " | " << e->value_.str();
     #else
- os_ << e->seq_ << "| " << e->value_.str();
+ os_ << e->seq_ << " | " << e->value_.str().length() << " | " << e->value_.str();
     #endif
                 delete e;
             }
@@ -233,14 +259,21 @@
         : base_type(os)
         {}
 
+ async_ostream::~async_ostream()
+ {
+ cout_->impl_->close();
+
+ }
+
     void async_ostream::flush() {
- this->base_type::flush();
+ //this->base_type::flush();
         async_ostream& d = *this;
         d->flush();
     }
             
     //==========================================================================================
     
+ // WARNING: static_variable
     async_ostream cout_(std::cout);
     
     void async_ostream::thread_specific_setup() {
@@ -249,9 +282,14 @@
         cout_->impl_->create_concentrator();
 #endif
     }
-
- namespace detail {
- thread_decoration async_ostream_decoration(boost::interthreads::async_ostream::thread_specific_setup);
+
+#if 0
+ void close() {
+ //cout_->impl_->close();
+ }
+#endif
+ namespace detail {
+ thread_decoration async_ostream_decoration(boost::interthreads::async_ostream::thread_specific_setup);
     }
     
 }

Modified: sandbox/interthreads/libs/interthreads/example/async_ostream.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/async_ostream.hpp (original)
+++ sandbox/interthreads/libs/interthreads/example/async_ostream.hpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
 
 //////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
 // Distributed under the Boost Software License, Version 1.0.
 // (See accompanying file LICENSE_1_0.txt or
 // copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -30,9 +30,11 @@
         typedef iostreams::sink_tag category;
 
         async_ostream(std::ostream& os);
+ ~async_ostream();
         void flush();
 
         static void thread_specific_setup();
+ static void close();
     };
     
     extern async_ostream cout_;

Modified: sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
 // Distributed under the Boost Software License, Version 1.0.
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -12,26 +12,25 @@
 
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/thread.hpp>
-
+#define COUT_
+#ifdef COUT_
+#else
 boost::mutex out_global_mutex2;
-
+#endif
 void sleep(int sec)
 {
- boost::xtime t;
- boost::xtime_get(&t,1);
+ boost::xtime t;
+ boost::xtime_get(&t,1);
     t.sec += sec;
     boost::thread::sleep(t);
 }
 
 
-// #include <boost/interthreads/threader.hpp>
-// #include <boost/interthreads/joiner_tuple.hpp>
 #include <boost/interthreads/thread_decorator.hpp>
 #include <boost/interthreads/thread_keep_alive.hpp>
 
 #include <boost/interthreads/typeof/threader_decorator.hpp>
-#include <boost/interthreads/wait_for_all.hpp>
-//#include <boost/type_traits.hpp>
+#include <boost/interthreads/algorithm.hpp>
 
 #include "./async_ostream.hpp"
 #include <boost/thread/thread.hpp>
@@ -40,19 +39,39 @@
 #include <sstream>
         
 namespace bith = boost::interthreads;
+void my_thread1_on_dead_thread(boost::thread::id id, boost::thread*) {
+ std::cout << "my_thread1 " << id << " do not responds to keep-alive" << std::endl;
+}
 
-int my_thread1() {
- sleep(2);
+
+int my_thread11() {
+ bith::this_thread::enable_keep_alive enabler;
+ bith::this_thread::set_on_dead_thread(my_thread1_on_dead_thread);
+ boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
+ bith::this_thread::keep_alive_point();
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ bith::this_thread::keep_alive_point();
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ bith::this_thread::keep_alive_point();
+
+#ifdef COUT_
+ bith::cout_ << "my_thread1" << std::endl;
+ bith::cout_.flush();
+#else
+ {
+ boost::lock_guard<boost::mutex> lock(out_global_mutex2);
+ std::cout << "my_thread1" << std::endl;
+ }
+#endif
     return 0;
 }
 int my_thread() {
     bith::this_thread::enable_keep_alive enabler;
- for (int i=0; i<10; i++) {
+ for (unsigned i=0; i<10; i++) {
         bith::this_thread::keep_alive_point();
 
         boost::thread::id id= boost::this_thread::get_id();
         std::stringstream sss;
-//#define COUT_
 #ifdef COUT_
         bith::cout_ << "TID=" << i << " " << id << std::endl;
         bith::cout_.flush();
@@ -62,29 +81,14 @@
             std::cout << "TID=" << i << " " << id << std::endl;
         }
 #endif
- boost::this_thread::sleep(boost::posix_time::milliseconds(10));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(500));
     }
     return 0;
 }
     
 int main() {
- {
+
     bith::shared_threader_decorator ae;
- bith::wait_for_all(ae, my_thread, my_thread);
- }
- #if 0
- boost::thread th1(bith::make_decorator(my_thread));
- boost::thread th2(bith::make_decorator(my_thread));
- boost::thread th3(bith::make_decorator(my_thread));
- boost::thread th4(bith::make_decorator(my_thread));
- boost::thread th5(bith::make_decorator(my_thread));
- th1.join();
- th2.join();
- th3.join();
- th4.join();
- th5.join();
- #endif
-
-
+ bith::wait_for_all(ae, my_thread, my_thread, my_thread11);
     return 0;
 }

Modified: sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp (original)
+++ sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
 
 //////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
 // Distributed under the Boost Software License, Version 1.0.
 // (See accompanying file LICENSE_1_0.txt or
 // copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -28,6 +28,7 @@
             typedef boost::iostreams::sink_tag category;
             
             async_ostream_sink(std::ostream& os);
+ ~async_ostream_sink();
             
             std::streamsize write(const char* s, std::streamsize n);
             void flush();

Modified: sandbox/interthreads/libs/interthreads/example/hello_world.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/hello_world.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/hello_world.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
 // Distributed under the Boost Software License, Version 1.0.
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //

Modified: sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
 // Distributed under the Boost Software License, Version 1.0.
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -16,8 +16,8 @@
 
 void sleep(int sec)
 {
- boost::xtime t;
- boost::xtime_get(&t,1);
+ boost::xtime t;
+ boost::xtime_get(&t,1);
     t.sec += sec;
     boost::thread::sleep(t);
 }
@@ -76,7 +76,7 @@
               << std::endl;
     }
 };
-
+#if 0
 struct mono_thread_id_wait_and_get {
     template<typename T>
     void operator()(T& t) const {
@@ -87,7 +87,7 @@
         }
     }
 };
-
+#endif
 bith::thread_decoration mono_thread_id::decoration_(mono_thread_id::setup);
 mono_thread_id::tssp_type mono_thread_id::current_;
 unsigned mono_thread_id::counter_=0;
@@ -107,8 +107,9 @@
 
 void doit() {
     bith::shared_threader_decorator ae;
- bith::result_of::fork_all<bith::shared_threader_decorator, boost::fusion::tuple<void(*)(), void(*)()> >::type handles =
- bith::fork_all(ae, my_thread, my_thread);
+ BOOST_AUTO(handles, bith::fork_all(ae, my_thread, my_thread));
+#if 0
+ //bith::result_of::fork_all<bith::shared_threader_decorator, boost::fusion::tuple<void(*)(), void(*)()> >::type handles =
     {
         const boost::shared_ptr<unsigned> shp1 = mono_thread_id::current_.wait_and_get(boost::fusion::at_c<0>(handles).get_id());
         if (shp1.get()==0) {
@@ -124,7 +125,8 @@
         }
     }
     //sleep(1);
- boost::fusion::for_each(handles, mono_thread_id_wait_and_get());
+ //boost::fusion::for_each(handles, mono_thread_id_wait_and_get());
+#endif
     boost::fusion::for_each(handles, mono_thread_id_out());
     bith::join_all(handles);
 }

Modified: sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
 //////////////////////////////////////////////////////////////////////////////
 //
 // (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
 // Distributed under the Boost Software License, Version 1.0.
 // (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -16,8 +16,8 @@
 
 void sleep(int sec)
 {
- boost::xtime t;
- boost::xtime_get(&t,1);
+ boost::xtime t;
+ boost::xtime_get(&t,1);
     t.sec += sec;
     boost::thread::sleep(t);
 }
@@ -35,39 +35,42 @@
 namespace bith = boost::interthreads;
 
 int my_thread1() {
+ sleep(3);
     {
         boost::lock_guard<boost::mutex> lock(out_global_mutex);
         std::cout << "1 thread_id=" << boost::this_thread::get_id() << std::endl;
     }
- sleep(3);
     return 0;
 }
     
 int my_thread2() {
+ sleep(1);
     {
         boost::lock_guard<boost::mutex> lock(out_global_mutex);
         std::cout << "2 thread_id=" << boost::this_thread::get_id() << std::endl;
     }
- sleep(1);
     return 0;
 }
 
 int my_thread3() {
+ sleep(2);
     {
         boost::lock_guard<boost::mutex> lock(out_global_mutex);
         std::cout << "3 thread_id=" << boost::this_thread::get_id() << std::endl;
     }
- sleep(2);
     return 0;
 }
 
 
 int main() {
     bith::shared_threader ae;
- BOOST_AUTO(handles,bith::fork_all(ae, my_thread1, my_thread2, my_thread3));
     BOOST_AUTO(result,bith::wait_for_any(ae, my_thread1, my_thread2, my_thread3));
- std::cout << "Algotithm " << result.first+1 << " finished the first with wait_for_any" << std::endl;
+ std::cout << "Algotithm " << result.first+1 << " finished the first with wait_for_any result=" << result.second << std::endl;
 
+
+#if 0
+ //BOOST_AUTO(handles,bith::fork_all(ae, my_thread1, my_thread2, my_thread3));
+
     bith::thread_tuple<3> tt_0(my_thread1, my_thread2, my_thread3);
     bith::thread_tuple<3> tt_1;
     tt_1= tt_0.move();
@@ -80,8 +83,6 @@
     bith::thread_tuple<3> kk_0= bith::make_thread_tuple(my_thread1, my_thread2, my_thread3);
     kk_0.join_all();
     std::cout << "All finished join_all" << std::endl;
-
-#if 0
         
     bith::thread_group_once tgo;
     boost::thread* th1 = tgo.create_thread(my_thread1);

Modified: sandbox/interthreads/libs/interthreads/example/timestamped.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/timestamped.hpp (original)
+++ sandbox/interthreads/libs/interthreads/example/timestamped.hpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
 
 //////////////////////////////////////////////////////////////////////////////
 //
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. Distributed under the Boost
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. Distributed under the Boost
 // Software License, Version 1.0. (See accompanying file
 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 //
@@ -23,7 +23,7 @@
 #ifdef XTIME
         boost::xtime date_;
         void reset_date(unsigned seq) {
- boost::xtime_get(&date_,1);
+ boost::xtime_get(&date_,1);
             seq_ = seq;
         }
         struct ref_comparator {
@@ -56,7 +56,7 @@
 #else
         system_time date_;
         void reset_date(unsigned seq) {
- date_ = system_time();
+ date_ = system_time();
             seq_ = seq;
         }
         struct ref_comparator {


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk