|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r51111 - in sandbox/interthreads/libs/interthreads/example: . detail
From: vicente.botet_at_[hidden]
Date: 2009-02-08 16:10:44
Author: viboes
Date: 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
New Revision: 51111
URL: http://svn.boost.org/trac/boost/changeset/51111
Log:
interthreads version 0.4
* New free functions for all the AsynchronousCompletionToken operations, providing a higher degree of freedom.
* Missing have_all_values(), have_all_exception() and are_all_ready() functions on AsynchronousCompletionToken fusion tuples.
* get_all: getting all the values from a tuple of AsynchronousCompletionToken works now.
* fork_after overloaded for a single dependency
* wait_all overloaded for a single ACT.
* wait_for_all evaluate one of its elements on the current thread
* No need to use wait_and_get() on thread_specific_shared_ptr<> to synchronize with the decoration if the thread is created using a AsynchronousExecutor decorator. In this case the synchro is done before returning the AsynchronousCompletionToken. See the tutorial and the mono_thread_id example.
Text files modified:
sandbox/interthreads/libs/interthreads/example/async_ostream.cpp | 80 +++++++++++++++++++++++++++++----------
sandbox/interthreads/libs/interthreads/example/async_ostream.hpp | 4 +
sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp | 66 +++++++++++++++++---------------
sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp | 3 +
sandbox/interthreads/libs/interthreads/example/hello_world.cpp | 2
sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp | 18 +++++----
sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp | 21 +++++-----
sandbox/interthreads/libs/interthreads/example/timestamped.hpp | 6 +-
8 files changed, 124 insertions(+), 76 deletions(-)
Modified: sandbox/interthreads/libs/interthreads/example/async_ostream.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/async_ostream.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/async_ostream.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,6 +1,7 @@
//////////////////////////////////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////////////////
//
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. Distributed under the Boost
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
@@ -21,7 +22,7 @@
#include <iostream> // std::ostream
#include <sstream> // std::stringstream
-boost::mutex out_global_mutex;
+//boost::mutex out_global_mutex;
namespace boost {
namespace interthreads {
@@ -37,6 +38,10 @@
: current_(new element_type())
, seq_(0)
{}
+
+ ~async_ostream_thread_ctx() {
+ delete current_;
+ }
boost::mutex& get_mutex() {return mutex_;}
std::stringstream& buffer() {return current_->value_;}
@@ -70,7 +75,6 @@
}
void print_stats() {
boost::thread::id id= boost::this_thread::get_id();
- boost::lock_guard<boost::mutex> lock(out_global_mutex);
std::cout << "TID=" << id <<" nb_push " << nb_push << std::endl;
std::cout << "TID=" << id <<" nb_push_gt_1 " << nb_push << std::endl;
std::cout << "TID=" << id <<" nb_get " << nb_push << std::endl;
@@ -78,6 +82,7 @@
std::cout << "TID=" << id <<" inc " << inc_ << std::endl;
};
bool empty() {
+ boost::lock_guard<boost::mutex> lock(mutex_);
return queue_.empty();
}
void inc() {
@@ -97,7 +102,10 @@
return e;
}
element_type *current_;
+#if 1
queue_type queue_;
+#else
+#endif
unsigned seq_;
boost::mutex mutex_;
unsigned nb_push_gt_1;
@@ -111,29 +119,34 @@
//==========================================================================================
#ifdef CONCENTRATOR
struct async_ostream_concentrator {
- static void loop(async_ostream_sink::impl* that);
- async_ostream_concentrator(async_ostream_sink::impl* impl_ptr)
+ static void loop(async_ostream_sink::impl* that);
+ async_ostream_concentrator(async_ostream_sink::impl* impl_ptr)
: thread_(boost::bind(loop, impl_ptr)) {}
- ~async_ostream_concentrator() {}
+ ~async_ostream_concentrator() {}
- private:
- boost::thread thread_;
- };
+ private:
+ boost::thread thread_;
+ };
#endif
-
+
//==========================================================================================
typedef thread_specific_shared_ptr<async_ostream_thread_ctx> tsss_type;
struct async_ostream_sink::impl {
impl(std::ostream& os)
- : os_(os)
+ : end_(false)
+ ,os_(os)
, tsss_(terminate)
#ifndef CONCENTRATOR
, thread_(boost::bind(loop, this))
#endif
{}
-
+
+ ~impl() {
+ }
+
+ bool end_;
std::ostream& os_;
tsss_type tsss_;
priority_queue_type queue_;
@@ -141,14 +154,13 @@
boost::once_flag concentrator_flag_;
async_ostream_concentrator* concentrator_;
#else
- boost::thread thread_;
+ boost::thread thread_;
#endif
static void terminate(shared_ptr<async_ostream_thread_ctx> that) {
while (!that->empty()) {
that->inc();
}
- //that->print_stats();
}
#ifdef CONCENTRATOR
@@ -160,7 +172,7 @@
boost::bind(create_concentrator_once, this));
}
#else
- static void loop(impl* that);
+ static void loop(impl* that);
#endif
std::streamsize write(const char* s, std::streamsize n) {
@@ -171,6 +183,14 @@
tsss_->flush();
}
+ void close() {
+ end_=true;
+#ifdef CONCENTRATOR
+ concentrator_.close();
+#else
+ thread_.join();
+#endif
+ }
};
@@ -179,6 +199,10 @@
async_ostream_sink::async_ostream_sink(std::ostream& os)
: impl_(new async_ostream_sink::impl(os)) {}
+ async_ostream_sink::~async_ostream_sink()
+ {
+ }
+
std::streamsize async_ostream_sink::write(const char* s, std::streamsize n) {
return impl_->write(s,n);
}
@@ -209,14 +233,16 @@
//it->second->print_stats();
}
if (that->queue_.empty()) {
+ if (that->end_) break;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
} else {
element_type* e = that->queue_.top();
that->queue_.pop();
+
#ifdef XTIME
- os_ << e->seq_ << "["<<e->date_.sec<<":"<<e->date_.nsec<<"]| " << e->value_.str();
+ os_ << e->seq_ << "["<<e->date_.sec<<":"<<e->date_.nsec<<"]| " << e->value_.str().length() << " | " << e->value_.str();
#else
- os_ << e->seq_ << "| " << e->value_.str();
+ os_ << e->seq_ << " | " << e->value_.str().length() << " | " << e->value_.str();
#endif
delete e;
}
@@ -233,14 +259,21 @@
: base_type(os)
{}
+ async_ostream::~async_ostream()
+ {
+ cout_->impl_->close();
+
+ }
+
void async_ostream::flush() {
- this->base_type::flush();
+ //this->base_type::flush();
async_ostream& d = *this;
d->flush();
}
//==========================================================================================
+ // WARNING: static_variable
async_ostream cout_(std::cout);
void async_ostream::thread_specific_setup() {
@@ -249,9 +282,14 @@
cout_->impl_->create_concentrator();
#endif
}
-
- namespace detail {
- thread_decoration async_ostream_decoration(boost::interthreads::async_ostream::thread_specific_setup);
+
+#if 0
+ void close() {
+ //cout_->impl_->close();
+ }
+#endif
+ namespace detail {
+ thread_decoration async_ostream_decoration(boost::interthreads::async_ostream::thread_specific_setup);
}
}
Modified: sandbox/interthreads/libs/interthreads/example/async_ostream.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/async_ostream.hpp (original)
+++ sandbox/interthreads/libs/interthreads/example/async_ostream.hpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
//////////////////////////////////////////////////////////////////////////////
//
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or
// copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -30,9 +30,11 @@
typedef iostreams::sink_tag category;
async_ostream(std::ostream& os);
+ ~async_ostream();
void flush();
static void thread_specific_setup();
+ static void close();
};
extern async_ostream cout_;
Modified: sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/basic_keep_alive.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
@@ -12,26 +12,25 @@
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
-
+#define COUT_
+#ifdef COUT_
+#else
boost::mutex out_global_mutex2;
-
+#endif
void sleep(int sec)
{
- boost::xtime t;
- boost::xtime_get(&t,1);
+ boost::xtime t;
+ boost::xtime_get(&t,1);
t.sec += sec;
boost::thread::sleep(t);
}
-// #include <boost/interthreads/threader.hpp>
-// #include <boost/interthreads/joiner_tuple.hpp>
#include <boost/interthreads/thread_decorator.hpp>
#include <boost/interthreads/thread_keep_alive.hpp>
#include <boost/interthreads/typeof/threader_decorator.hpp>
-#include <boost/interthreads/wait_for_all.hpp>
-//#include <boost/type_traits.hpp>
+#include <boost/interthreads/algorithm.hpp>
#include "./async_ostream.hpp"
#include <boost/thread/thread.hpp>
@@ -40,19 +39,39 @@
#include <sstream>
namespace bith = boost::interthreads;
+void my_thread1_on_dead_thread(boost::thread::id id, boost::thread*) {
+ std::cout << "my_thread1 " << id << " do not responds to keep-alive" << std::endl;
+}
-int my_thread1() {
- sleep(2);
+
+int my_thread11() {
+ bith::this_thread::enable_keep_alive enabler;
+ bith::this_thread::set_on_dead_thread(my_thread1_on_dead_thread);
+ boost::this_thread::sleep(boost::posix_time::milliseconds(2000));
+ bith::this_thread::keep_alive_point();
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ bith::this_thread::keep_alive_point();
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
+ bith::this_thread::keep_alive_point();
+
+#ifdef COUT_
+ bith::cout_ << "my_thread1" << std::endl;
+ bith::cout_.flush();
+#else
+ {
+ boost::lock_guard<boost::mutex> lock(out_global_mutex2);
+ std::cout << "my_thread1" << std::endl;
+ }
+#endif
return 0;
}
int my_thread() {
bith::this_thread::enable_keep_alive enabler;
- for (int i=0; i<10; i++) {
+ for (unsigned i=0; i<10; i++) {
bith::this_thread::keep_alive_point();
boost::thread::id id= boost::this_thread::get_id();
std::stringstream sss;
-//#define COUT_
#ifdef COUT_
bith::cout_ << "TID=" << i << " " << id << std::endl;
bith::cout_.flush();
@@ -62,29 +81,14 @@
std::cout << "TID=" << i << " " << id << std::endl;
}
#endif
- boost::this_thread::sleep(boost::posix_time::milliseconds(10));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(500));
}
return 0;
}
int main() {
- {
+
bith::shared_threader_decorator ae;
- bith::wait_for_all(ae, my_thread, my_thread);
- }
- #if 0
- boost::thread th1(bith::make_decorator(my_thread));
- boost::thread th2(bith::make_decorator(my_thread));
- boost::thread th3(bith::make_decorator(my_thread));
- boost::thread th4(bith::make_decorator(my_thread));
- boost::thread th5(bith::make_decorator(my_thread));
- th1.join();
- th2.join();
- th3.join();
- th4.join();
- th5.join();
- #endif
-
-
+ bith::wait_for_all(ae, my_thread, my_thread, my_thread11);
return 0;
}
Modified: sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp (original)
+++ sandbox/interthreads/libs/interthreads/example/detail/async_ostream_sink.hpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
//////////////////////////////////////////////////////////////////////////////
//
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or
// copy at http://www.boost.org/LICENSE_1_0.txt)
@@ -28,6 +28,7 @@
typedef boost::iostreams::sink_tag category;
async_ostream_sink(std::ostream& os);
+ ~async_ostream_sink();
std::streamsize write(const char* s, std::streamsize n);
void flush();
Modified: sandbox/interthreads/libs/interthreads/example/hello_world.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/hello_world.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/hello_world.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
Modified: sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/mono_thread_id.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
@@ -16,8 +16,8 @@
void sleep(int sec)
{
- boost::xtime t;
- boost::xtime_get(&t,1);
+ boost::xtime t;
+ boost::xtime_get(&t,1);
t.sec += sec;
boost::thread::sleep(t);
}
@@ -76,7 +76,7 @@
<< std::endl;
}
};
-
+#if 0
struct mono_thread_id_wait_and_get {
template<typename T>
void operator()(T& t) const {
@@ -87,7 +87,7 @@
}
}
};
-
+#endif
bith::thread_decoration mono_thread_id::decoration_(mono_thread_id::setup);
mono_thread_id::tssp_type mono_thread_id::current_;
unsigned mono_thread_id::counter_=0;
@@ -107,8 +107,9 @@
void doit() {
bith::shared_threader_decorator ae;
- bith::result_of::fork_all<bith::shared_threader_decorator, boost::fusion::tuple<void(*)(), void(*)()> >::type handles =
- bith::fork_all(ae, my_thread, my_thread);
+ BOOST_AUTO(handles, bith::fork_all(ae, my_thread, my_thread));
+#if 0
+ //bith::result_of::fork_all<bith::shared_threader_decorator, boost::fusion::tuple<void(*)(), void(*)()> >::type handles =
{
const boost::shared_ptr<unsigned> shp1 = mono_thread_id::current_.wait_and_get(boost::fusion::at_c<0>(handles).get_id());
if (shp1.get()==0) {
@@ -124,7 +125,8 @@
}
}
//sleep(1);
- boost::fusion::for_each(handles, mono_thread_id_wait_and_get());
+ //boost::fusion::for_each(handles, mono_thread_id_wait_and_get());
+#endif
boost::fusion::for_each(handles, mono_thread_id_out());
bith::join_all(handles);
}
Modified: sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp (original)
+++ sandbox/interthreads/libs/interthreads/example/multiple_algorithms.cpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -1,7 +1,7 @@
//////////////////////////////////////////////////////////////////////////////
//
// (C) Copyright Roland Schwarz 2006.
-// (C) Copyright Vicente J. Botet Escriba 2008-20009.
+// (C) Copyright Vicente J. Botet Escriba 2008-2009.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
@@ -16,8 +16,8 @@
void sleep(int sec)
{
- boost::xtime t;
- boost::xtime_get(&t,1);
+ boost::xtime t;
+ boost::xtime_get(&t,1);
t.sec += sec;
boost::thread::sleep(t);
}
@@ -35,39 +35,42 @@
namespace bith = boost::interthreads;
int my_thread1() {
+ sleep(3);
{
boost::lock_guard<boost::mutex> lock(out_global_mutex);
std::cout << "1 thread_id=" << boost::this_thread::get_id() << std::endl;
}
- sleep(3);
return 0;
}
int my_thread2() {
+ sleep(1);
{
boost::lock_guard<boost::mutex> lock(out_global_mutex);
std::cout << "2 thread_id=" << boost::this_thread::get_id() << std::endl;
}
- sleep(1);
return 0;
}
int my_thread3() {
+ sleep(2);
{
boost::lock_guard<boost::mutex> lock(out_global_mutex);
std::cout << "3 thread_id=" << boost::this_thread::get_id() << std::endl;
}
- sleep(2);
return 0;
}
int main() {
bith::shared_threader ae;
- BOOST_AUTO(handles,bith::fork_all(ae, my_thread1, my_thread2, my_thread3));
BOOST_AUTO(result,bith::wait_for_any(ae, my_thread1, my_thread2, my_thread3));
- std::cout << "Algotithm " << result.first+1 << " finished the first with wait_for_any" << std::endl;
+ std::cout << "Algotithm " << result.first+1 << " finished the first with wait_for_any result=" << result.second << std::endl;
+
+#if 0
+ //BOOST_AUTO(handles,bith::fork_all(ae, my_thread1, my_thread2, my_thread3));
+
bith::thread_tuple<3> tt_0(my_thread1, my_thread2, my_thread3);
bith::thread_tuple<3> tt_1;
tt_1= tt_0.move();
@@ -80,8 +83,6 @@
bith::thread_tuple<3> kk_0= bith::make_thread_tuple(my_thread1, my_thread2, my_thread3);
kk_0.join_all();
std::cout << "All finished join_all" << std::endl;
-
-#if 0
bith::thread_group_once tgo;
boost::thread* th1 = tgo.create_thread(my_thread1);
Modified: sandbox/interthreads/libs/interthreads/example/timestamped.hpp
==============================================================================
--- sandbox/interthreads/libs/interthreads/example/timestamped.hpp (original)
+++ sandbox/interthreads/libs/interthreads/example/timestamped.hpp 2009-02-08 16:10:43 EST (Sun, 08 Feb 2009)
@@ -3,7 +3,7 @@
//////////////////////////////////////////////////////////////////////////////
//
-// (C) Copyright Vicente J. Botet Escriba 2008-20009. Distributed under the Boost
+// (C) Copyright Vicente J. Botet Escriba 2008-2009. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
@@ -23,7 +23,7 @@
#ifdef XTIME
boost::xtime date_;
void reset_date(unsigned seq) {
- boost::xtime_get(&date_,1);
+ boost::xtime_get(&date_,1);
seq_ = seq;
}
struct ref_comparator {
@@ -56,7 +56,7 @@
#else
system_time date_;
void reset_date(unsigned seq) {
- date_ = system_time();
+ date_ = system_time();
seq_ = seq;
}
struct ref_comparator {
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk