|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r81152 - in trunk: boost/thread boost/thread/detail boost/thread/pthread libs/thread/test/sync/futures/async
From: vicente.botet_at_[hidden]
Date: 2012-11-02 17:30:37
Author: viboes
Date: 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
New Revision: 81152
URL: http://svn.boost.org/trac/boost/changeset/81152
Log:
Thread: Added asynch deferred on a specifc configuration+ prepare the work for async futures joining the producer thread
Text files modified:
trunk/boost/thread/detail/move.hpp | 11 +
trunk/boost/thread/future.hpp | 312 +++++++++++++++++++++++++++++++++++++--
trunk/boost/thread/pthread/condition_variable.hpp | 5
trunk/boost/thread/pthread/condition_variable_fwd.hpp | 5
trunk/libs/thread/test/sync/futures/async/async_pass.cpp | 59 ++++---
5 files changed, 341 insertions(+), 51 deletions(-)
Modified: trunk/boost/thread/detail/move.hpp
==============================================================================
--- trunk/boost/thread/detail/move.hpp (original)
+++ trunk/boost/thread/detail/move.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -230,19 +230,26 @@
-#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
namespace boost
{ namespace thread_detail
{
+#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
template <class T>
typename decay<T>::type
decay_copy(T&& t)
{
return boost::forward<T>(t);
}
+#else
+ template <class T>
+ typename decay<T>::type
+ decay_copy(BOOST_THREAD_FWD_REF(T) t)
+ {
+ return boost::forward<T>(t);
}
-}
#endif
+ }
+}
#include <boost/config/abi_suffix.hpp>
Modified: trunk/boost/thread/future.hpp
==============================================================================
--- trunk/boost/thread/future.hpp (original)
+++ trunk/boost/thread/future.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -18,6 +18,7 @@
#include <boost/detail/scoped_enum_emulation.hpp>
#include <stdexcept>
#include <boost/thread/detail/move.hpp>
+#include <boost/thread/detail/async_func.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
@@ -42,6 +43,7 @@
#include <list>
#include <boost/next_prior.hpp>
#include <vector>
+
#include <boost/system/error_code.hpp>
#ifdef BOOST_THREAD_USES_CHRONO
#include <boost/chrono/system_clocks.hpp>
@@ -206,7 +208,7 @@
namespace future_state
{
- enum state { uninitialized, waiting, ready, moved };
+ enum state { uninitialized, waiting, ready, moved, deferred };
}
namespace detail
@@ -259,6 +261,7 @@
{
boost::exception_ptr exception;
bool done;
+ bool is_deferred;
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
bool thread_was_interrupted;
#endif
@@ -273,8 +276,9 @@
future_object_base():
done(false),
+ is_deferred(false)
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
- thread_was_interrupted(false)
+ , thread_was_interrupted(false)
#endif
#if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
, continuation_ptr()
@@ -283,6 +287,8 @@
virtual ~future_object_base()
{}
+ void set_deferred() {is_deferred = true;}
+
waiter_list::iterator register_external_waiter(boost::condition_variable_any& cv)
{
boost::unique_lock<boost::mutex> lock(mutex);
@@ -342,22 +348,33 @@
void wait_internal(boost::unique_lock<boost::mutex> &lock, bool rethrow=true)
{
- do_callback(lock);
- while(!done)
+ do_callback(lock);
+ //if (!done)
+ {
+ if (is_deferred)
{
- waiters.wait(lock);
+ is_deferred=false;
+ execute(lock);
+ //lock.unlock();
}
-#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
- if(rethrow && thread_was_interrupted)
+ else
{
- throw boost::thread_interrupted();
- }
+ while(!done)
+ {
+ waiters.wait(lock);
+ }
+#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
+ if(rethrow && thread_was_interrupted)
+ {
+ throw boost::thread_interrupted();
+ }
#endif
- if(rethrow && exception)
- {
- boost::rethrow_exception(exception);
+ if(rethrow && exception)
+ {
+ boost::rethrow_exception(exception);
+ }
}
-
+ }
}
void wait(bool rethrow=true)
{
@@ -369,6 +386,9 @@
bool timed_wait_until(boost::system_time const& target_time)
{
boost::unique_lock<boost::mutex> lock(mutex);
+ if (is_deferred)
+ return false;
+
do_callback(lock);
while(!done)
{
@@ -388,6 +408,8 @@
wait_until(const chrono::time_point<Clock, Duration>& abs_time)
{
boost::unique_lock<boost::mutex> lock(mutex);
+ if (is_deferred)
+ return future_status::deferred;
do_callback(lock);
while(!done)
{
@@ -434,7 +456,7 @@
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
|| thread_was_interrupted
#endif
-);
+ );
}
template<typename F,typename U>
@@ -442,6 +464,7 @@
{
callback=boost::bind(f,boost::ref(*u));
}
+ virtual void execute(boost::unique_lock<boost::mutex>&) {}
private:
future_object_base(future_object_base const&);
@@ -649,6 +672,145 @@
future_object& operator=(future_object const&);
};
+ /// future_async_object
+ template<typename Rp, typename Fp>
+ struct future_async_object: future_object<Rp>
+ {
+ typedef future_object<Rp> base_type;
+ Fp func_;
+ boost::thread thr_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_async_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_async_object(Fp f)
+ : func_(f)
+ #endif
+ , thr_(&future_async_object::execute, *this)
+ {
+ }
+
+ ~future_async_object()
+ {
+ thr_.join();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lock) {
+ try
+ {
+ this->mark_finished_with_result_internal(func_(), lock);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lock);
+ }
+
+ }
+
+ };
+
+ template<typename Fp>
+ struct future_async_object<void, Fp>: public future_object<void>
+ {
+ typedef future_object<void> base_type;
+ Fp func_;
+ boost::thread thr_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_async_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_async_object(Fp f)
+ : func_(f)
+ #endif
+ , thr_(&future_async_object::execute, *this)
+ {
+ }
+
+ ~future_async_object()
+ {
+ thr_.join();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lock) {
+ try
+ {
+ func_();
+ this->mark_finished_with_result_internal(lock);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lock);
+ }
+
+ }
+
+ };
+ /// future_deferred_object
+ template<typename Rp, typename Fp>
+ struct future_deferred_object: future_object<Rp>
+ {
+ typedef future_object<Rp> base_type;
+ Fp func_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_deferred_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_deferred_object(Fp f)
+ : func_(f)
+ #endif
+ {
+ this->set_deferred();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lck) {
+ try
+ {
+ this->mark_finished_with_result_internal(func_(), lck);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lck);
+ }
+ }
+ };
+
+ template<typename Fp>
+ struct future_deferred_object<void,Fp>: future_object<void>
+ {
+ typedef future_object<void> base_type;
+ Fp func_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_deferred_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_deferred_object(Fp f)
+ : func_(f)
+ #endif
+ {
+ this->set_deferred();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lck) {
+ try
+ {
+ func_();
+ this->mark_finished_with_result_internal(lck);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lck);
+ }
+ }
+ };
+
// template<typename T, typename Allocator>
// struct future_object_alloc: public future_object<T>
// {
@@ -1019,6 +1181,25 @@
} // detail
BOOST_THREAD_DCL_MOVABLE_BEG(R) detail::basic_future<R> BOOST_THREAD_DCL_MOVABLE_END
+ namespace detail
+ {
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_async_object(Fp&& f);
+ #else
+ make_future_async_object(Fp f);
+ #endif
+
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_deferred_object(Fp&& f);
+ #else
+ make_future_deferred_object(Fp f);
+ #endif
+ }
+
template <typename R>
class BOOST_THREAD_FUTURE : public detail::basic_future<R>
{
@@ -1039,6 +1220,22 @@
#endif
friend class detail::future_waiter;
+ template <class Rp, class Fp>
+ friend BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ detail::make_future_async_object(Fp&& f);
+ #else
+ detail::make_future_async_object(Fp f);
+ #endif
+
+ template <class Rp, class Fp>
+ friend BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ detail::make_future_deferred_object(Fp&& f);
+ #else
+ detail::make_future_deferred_object(Fp f);
+ #endif
+
typedef typename detail::future_traits<R>::move_dest_type move_dest_type;
BOOST_THREAD_FUTURE(future_ptr a_future):
@@ -2094,6 +2291,47 @@
BOOST_THREAD_DCL_MOVABLE_BEG(T) packaged_task<T> BOOST_THREAD_DCL_MOVABLE_END
+ namespace detail
+ {
+ ////////////////////////////////
+ // make_future_deferred_object
+ ////////////////////////////////
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_deferred_object(Fp&& f)
+ #else
+ make_future_deferred_object(Fp f)
+ #endif
+ {
+ shared_ptr<future_deferred_object<Rp, Fp> >
+ h(new future_deferred_object<Rp, Fp>(boost::forward<Fp>(f)));
+ return BOOST_THREAD_FUTURE<Rp>(h);
+ }
+
+ ////////////////////////////////
+ // make_future_async_object
+ ////////////////////////////////
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_async_object(Fp&& f)
+ #else
+ make_future_async_object(Fp f)
+ #endif
+ {
+ shared_ptr<future_async_object<Rp, Fp> >
+ h(new future_async_object<Rp, Fp>(boost::forward<Fp>(f)));
+ return BOOST_THREAD_FUTURE<Rp>(h);
+ }
+
+ }
+
+ ////////////////////////////////
+ // template <class F, class... ArgTypes>
+ // future<R> async(F&&, ArgTypes&&...);
+ ////////////////////////////////
+
#if defined BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR
#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
@@ -2164,6 +2402,10 @@
typename decay<ArgTypes>::type...
)>::type R;
typedef packaged_task<R(ArgTypes...)> packaged_task_type;
+
+ typedef detail::async_func<typename decay<F>::type, typename decay<ArgTypes>::type...> BF;
+ typedef typename BF::result_type Rp;
+
#else
template <class F>
BOOST_THREAD_FUTURE<typename boost::result_of<typename decay<F>::type()>::type>
@@ -2171,6 +2413,10 @@
{
typedef typename boost::result_of<typename decay<F>::type()>::type R;
typedef packaged_task<R()> packaged_task_type;
+
+ typedef detail::async_func<typename decay<F>::type> BF;
+ typedef typename BF::result_type Rp;
+
#endif
#else
template <class F>
@@ -2179,6 +2425,9 @@
{
typedef typename boost::result_of<typename decay<F>::type()>::type R;
typedef packaged_task<R> packaged_task_type;
+
+ typedef detail::async_func<typename decay<F>::type> BF;
+ typedef typename BF::result_type Rp;
#endif
if (int(policy) & int(launch::async))
@@ -2195,10 +2444,23 @@
}
else if (int(policy) & int(launch::deferred))
{
- packaged_task_type pt( boost::forward<F>(f) );
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ return boost::detail::make_future_deferred_object<Rp>(
+ BF(
+ thread_detail::decay_copy(boost::forward<F>(f))
+ , thread_detail::decay_copy(boost::forward<ArgTypes>(args))...
+ )
+ );
+#else
+ BOOST_THREAD_FUTURE<R> ret;
+ return ::boost::move(ret);
+// return boost::detail::make_future_deferred_object<Rp>(
+// BF(
+// thread_detail::decay_copy(boost::forward<F>(f))
+// )
+// );
+#endif
- BOOST_THREAD_FUTURE<R> ret = pt.get_future();
- return ::boost::move(ret);
} else {
BOOST_THREAD_FUTURE<R> ret;
return ::boost::move(ret);
@@ -2211,6 +2473,10 @@
return async(launch::any, boost::forward<F>(f));
}
+
+ ////////////////////////////////
+ // make_future
+ ////////////////////////////////
template <typename T>
BOOST_THREAD_FUTURE<typename decay<T>::type> make_future(BOOST_THREAD_FWD_REF(T) value)
{
@@ -2228,6 +2494,9 @@
}
+ ////////////////////////////////
+ // make_shared_future
+ ////////////////////////////////
template <typename T>
shared_future<typename decay<T>::type> make_shared_future(BOOST_THREAD_FWD_REF(T) value)
{
@@ -2245,6 +2514,9 @@
}
+ ////////////////////////////////
+ // detail::future_continuation
+ ////////////////////////////////
#if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
namespace detail
{
@@ -2318,8 +2590,10 @@
#endif
}
-// template<typename F>
-// auto then(F&& func) -> BOOST_THREAD_FUTURE<decltype(func(*this))>;
+ ////////////////////////////////
+ // template<typename F>
+ // auto future<R>::then(F&& func) -> BOOST_THREAD_FUTURE<decltype(func(*this))>;
+ ////////////////////////////////
template <typename R>
template <typename F>
Modified: trunk/boost/thread/pthread/condition_variable.hpp
==============================================================================
--- trunk/boost/thread/pthread/condition_variable.hpp (original)
+++ trunk/boost/thread/pthread/condition_variable.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -289,7 +289,7 @@
}
template <class lock_type>
- inline void wait_until(
+ cv_status wait_until(
lock_type& lk,
chrono::time_point<chrono::system_clock, chrono::nanoseconds> tp)
{
@@ -299,7 +299,8 @@
seconds s = duration_cast<seconds>(d);
ts.tv_sec = static_cast<long>(s.count());
ts.tv_nsec = static_cast<long>((d - s).count());
- do_timed_wait(lk, ts);
+ if (do_timed_wait(lk, ts)) return cv_status::no_timeout;
+ else return cv_status::timeout;
}
#endif
Modified: trunk/boost/thread/pthread/condition_variable_fwd.hpp
==============================================================================
--- trunk/boost/thread/pthread/condition_variable_fwd.hpp (original)
+++ trunk/boost/thread/pthread/condition_variable_fwd.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -212,7 +212,7 @@
void notify_all() BOOST_NOEXCEPT;
#ifdef BOOST_THREAD_USES_CHRONO
- inline void wait_until(
+ inline cv_status wait_until(
unique_lock<mutex>& lk,
chrono::time_point<chrono::system_clock, chrono::nanoseconds> tp)
{
@@ -222,7 +222,8 @@
seconds s = duration_cast<seconds>(d);
ts.tv_sec = static_cast<long>(s.count());
ts.tv_nsec = static_cast<long>((d - s).count());
- do_timed_wait(lk, ts);
+ if (do_timed_wait(lk, ts)) return cv_status::no_timeout;
+ else return cv_status::timeout;
}
#endif
//private: // used by boost::thread::try_join_until
Modified: trunk/libs/thread/test/sync/futures/async/async_pass.cpp
==============================================================================
--- trunk/libs/thread/test/sync/futures/async/async_pass.cpp (original)
+++ trunk/libs/thread/test/sync/futures/async/async_pass.cpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -145,15 +145,16 @@
Clock::time_point t1 = Clock::now();
BOOST_TEST(t1 - t0 < ms(100));
}
-// {
-// boost::future<int> f = boost::async(boost::launch::deferred, f0);
-// boost::this_thread::sleep_for(ms(300));
-// Clock::time_point t0 = Clock::now();
-// BOOST_TEST(f.get() == 3);
-// Clock::time_point t1 = Clock::now();
-// BOOST_TEST(t1 - t0 > ms(100));
-// }
-//
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ {
+ boost::future<int> f = boost::async(boost::launch::deferred, f0);
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ BOOST_TEST(f.get() == 3);
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 > ms(100));
+ }
+#endif
{
boost::future<int&> f = boost::async(f1);
boost::this_thread::sleep_for(ms(300));
@@ -178,15 +179,16 @@
Clock::time_point t1 = Clock::now();
BOOST_TEST(t1 - t0 < ms(100));
}
-// {
-// boost::future<int&> f = boost::async(boost::launch::deferred, f1);
-// boost::this_thread::sleep_for(ms(300));
-// Clock::time_point t0 = Clock::now();
-// BOOST_TEST(&f.get() == &i);
-// Clock::time_point t1 = Clock::now();
-// BOOST_TEST(t1 - t0 > ms(100));
-// }
-//
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ {
+ boost::future<int&> f = boost::async(boost::launch::deferred, f1);
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ BOOST_TEST(&f.get() == &i);
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 > ms(100));
+ }
+#endif
{
boost::future<void> f = boost::async(f2);
boost::this_thread::sleep_for(ms(300));
@@ -211,14 +213,18 @@
Clock::time_point t1 = Clock::now();
BOOST_TEST(t1 - t0 < ms(100));
}
-// {
-// boost::future<void> f = boost::async(boost::launch::deferred, f2);
-// boost::this_thread::sleep_for(ms(300));
-// Clock::time_point t0 = Clock::now();
-// f.get();
-// Clock::time_point t1 = Clock::now();
-// BOOST_TEST(t1 - t0 > ms(100));
-// }
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ {
+ boost::future<void> f = boost::async(boost::launch::deferred, f2);
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ f.get();
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 > ms(100));
+ }
+#endif
+
+ // todo fixme
#if 0 && defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
{
boost::future<boost::interprocess::unique_ptr<int, boost::default_delete<int> > > f = boost::async(boost::launch::async, &f3, 3);
@@ -230,6 +236,7 @@
}
#endif
+ // todo fixme
#if 0 && defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
{
boost::future<boost::interprocess::unique_ptr<int, boost::default_delete<int> > > f = boost::async(&f4, boost::interprocess::unique_ptr<int, boost::default_delete<int> >(new int(3)));
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk