Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r81152 - in trunk: boost/thread boost/thread/detail boost/thread/pthread libs/thread/test/sync/futures/async
From: vicente.botet_at_[hidden]
Date: 2012-11-02 17:30:37


Author: viboes
Date: 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
New Revision: 81152
URL: http://svn.boost.org/trac/boost/changeset/81152

Log:
Thread: Added asynch deferred on a specifc configuration+ prepare the work for async futures joining the producer thread
Text files modified:
   trunk/boost/thread/detail/move.hpp | 11 +
   trunk/boost/thread/future.hpp | 312 +++++++++++++++++++++++++++++++++++++--
   trunk/boost/thread/pthread/condition_variable.hpp | 5
   trunk/boost/thread/pthread/condition_variable_fwd.hpp | 5
   trunk/libs/thread/test/sync/futures/async/async_pass.cpp | 59 ++++---
   5 files changed, 341 insertions(+), 51 deletions(-)

Modified: trunk/boost/thread/detail/move.hpp
==============================================================================
--- trunk/boost/thread/detail/move.hpp (original)
+++ trunk/boost/thread/detail/move.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -230,19 +230,26 @@
 
 
 
-#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
 namespace boost
 { namespace thread_detail
   {
+#ifndef BOOST_NO_CXX11_RVALUE_REFERENCES
       template <class T>
       typename decay<T>::type
       decay_copy(T&& t)
       {
           return boost::forward<T>(t);
       }
+#else
+ template <class T>
+ typename decay<T>::type
+ decay_copy(BOOST_THREAD_FWD_REF(T) t)
+ {
+ return boost::forward<T>(t);
   }
-}
 #endif
+ }
+}
 
 #include <boost/config/abi_suffix.hpp>
 

Modified: trunk/boost/thread/future.hpp
==============================================================================
--- trunk/boost/thread/future.hpp (original)
+++ trunk/boost/thread/future.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -18,6 +18,7 @@
 #include <boost/detail/scoped_enum_emulation.hpp>
 #include <stdexcept>
 #include <boost/thread/detail/move.hpp>
+#include <boost/thread/detail/async_func.hpp>
 #include <boost/thread/thread_time.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/condition_variable.hpp>
@@ -42,6 +43,7 @@
 #include <list>
 #include <boost/next_prior.hpp>
 #include <vector>
+
 #include <boost/system/error_code.hpp>
 #ifdef BOOST_THREAD_USES_CHRONO
 #include <boost/chrono/system_clocks.hpp>
@@ -206,7 +208,7 @@
 
     namespace future_state
     {
- enum state { uninitialized, waiting, ready, moved };
+ enum state { uninitialized, waiting, ready, moved, deferred };
     }
 
     namespace detail
@@ -259,6 +261,7 @@
         {
             boost::exception_ptr exception;
             bool done;
+ bool is_deferred;
 #if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
             bool thread_was_interrupted;
 #endif
@@ -273,8 +276,9 @@
 
             future_object_base():
                 done(false),
+ is_deferred(false)
 #if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
- thread_was_interrupted(false)
+ , thread_was_interrupted(false)
 #endif
 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
                , continuation_ptr()
@@ -283,6 +287,8 @@
             virtual ~future_object_base()
             {}
 
+ void set_deferred() {is_deferred = true;}
+
             waiter_list::iterator register_external_waiter(boost::condition_variable_any& cv)
             {
                 boost::unique_lock<boost::mutex> lock(mutex);
@@ -342,22 +348,33 @@
 
             void wait_internal(boost::unique_lock<boost::mutex> &lock, bool rethrow=true)
             {
- do_callback(lock);
- while(!done)
+ do_callback(lock);
+ //if (!done)
+ {
+ if (is_deferred)
                 {
- waiters.wait(lock);
+ is_deferred=false;
+ execute(lock);
+ //lock.unlock();
                 }
-#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
- if(rethrow && thread_was_interrupted)
+ else
                 {
- throw boost::thread_interrupted();
- }
+ while(!done)
+ {
+ waiters.wait(lock);
+ }
+#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
+ if(rethrow && thread_was_interrupted)
+ {
+ throw boost::thread_interrupted();
+ }
 #endif
- if(rethrow && exception)
- {
- boost::rethrow_exception(exception);
+ if(rethrow && exception)
+ {
+ boost::rethrow_exception(exception);
+ }
                 }
-
+ }
             }
             void wait(bool rethrow=true)
             {
@@ -369,6 +386,9 @@
             bool timed_wait_until(boost::system_time const& target_time)
             {
                 boost::unique_lock<boost::mutex> lock(mutex);
+ if (is_deferred)
+ return false;
+
                 do_callback(lock);
                 while(!done)
                 {
@@ -388,6 +408,8 @@
             wait_until(const chrono::time_point<Clock, Duration>& abs_time)
             {
               boost::unique_lock<boost::mutex> lock(mutex);
+ if (is_deferred)
+ return future_status::deferred;
               do_callback(lock);
               while(!done)
               {
@@ -434,7 +456,7 @@
 #if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
                     || thread_was_interrupted
 #endif
-);
+ );
             }
 
             template<typename F,typename U>
@@ -442,6 +464,7 @@
             {
                 callback=boost::bind(f,boost::ref(*u));
             }
+ virtual void execute(boost::unique_lock<boost::mutex>&) {}
 
         private:
             future_object_base(future_object_base const&);
@@ -649,6 +672,145 @@
             future_object& operator=(future_object const&);
         };
 
+ /// future_async_object
+ template<typename Rp, typename Fp>
+ struct future_async_object: future_object<Rp>
+ {
+ typedef future_object<Rp> base_type;
+ Fp func_;
+ boost::thread thr_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_async_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_async_object(Fp f)
+ : func_(f)
+ #endif
+ , thr_(&future_async_object::execute, *this)
+ {
+ }
+
+ ~future_async_object()
+ {
+ thr_.join();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lock) {
+ try
+ {
+ this->mark_finished_with_result_internal(func_(), lock);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lock);
+ }
+
+ }
+
+ };
+
+ template<typename Fp>
+ struct future_async_object<void, Fp>: public future_object<void>
+ {
+ typedef future_object<void> base_type;
+ Fp func_;
+ boost::thread thr_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_async_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_async_object(Fp f)
+ : func_(f)
+ #endif
+ , thr_(&future_async_object::execute, *this)
+ {
+ }
+
+ ~future_async_object()
+ {
+ thr_.join();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lock) {
+ try
+ {
+ func_();
+ this->mark_finished_with_result_internal(lock);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lock);
+ }
+
+ }
+
+ };
+ /// future_deferred_object
+ template<typename Rp, typename Fp>
+ struct future_deferred_object: future_object<Rp>
+ {
+ typedef future_object<Rp> base_type;
+ Fp func_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_deferred_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_deferred_object(Fp f)
+ : func_(f)
+ #endif
+ {
+ this->set_deferred();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lck) {
+ try
+ {
+ this->mark_finished_with_result_internal(func_(), lck);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lck);
+ }
+ }
+ };
+
+ template<typename Fp>
+ struct future_deferred_object<void,Fp>: future_object<void>
+ {
+ typedef future_object<void> base_type;
+ Fp func_;
+
+ public:
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ explicit future_deferred_object(Fp&& f)
+ : func_(boost::forward<Fp>(f))
+ #else
+ explicit future_deferred_object(Fp f)
+ : func_(f)
+ #endif
+ {
+ this->set_deferred();
+ }
+
+ virtual void execute(boost::unique_lock<boost::mutex>& lck) {
+ try
+ {
+ func_();
+ this->mark_finished_with_result_internal(lck);
+ }
+ catch (...)
+ {
+ this->mark_exceptional_finish_internal(current_exception(), lck);
+ }
+ }
+ };
+
 // template<typename T, typename Allocator>
 // struct future_object_alloc: public future_object<T>
 // {
@@ -1019,6 +1181,25 @@
     } // detail
     BOOST_THREAD_DCL_MOVABLE_BEG(R) detail::basic_future<R> BOOST_THREAD_DCL_MOVABLE_END
 
+ namespace detail
+ {
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_async_object(Fp&& f);
+ #else
+ make_future_async_object(Fp f);
+ #endif
+
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_deferred_object(Fp&& f);
+ #else
+ make_future_deferred_object(Fp f);
+ #endif
+ }
+
     template <typename R>
     class BOOST_THREAD_FUTURE : public detail::basic_future<R>
     {
@@ -1039,6 +1220,22 @@
 #endif
         friend class detail::future_waiter;
 
+ template <class Rp, class Fp>
+ friend BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ detail::make_future_async_object(Fp&& f);
+ #else
+ detail::make_future_async_object(Fp f);
+ #endif
+
+ template <class Rp, class Fp>
+ friend BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ detail::make_future_deferred_object(Fp&& f);
+ #else
+ detail::make_future_deferred_object(Fp f);
+ #endif
+
         typedef typename detail::future_traits<R>::move_dest_type move_dest_type;
 
         BOOST_THREAD_FUTURE(future_ptr a_future):
@@ -2094,6 +2291,47 @@
 
     BOOST_THREAD_DCL_MOVABLE_BEG(T) packaged_task<T> BOOST_THREAD_DCL_MOVABLE_END
 
+ namespace detail
+ {
+ ////////////////////////////////
+ // make_future_deferred_object
+ ////////////////////////////////
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_deferred_object(Fp&& f)
+ #else
+ make_future_deferred_object(Fp f)
+ #endif
+ {
+ shared_ptr<future_deferred_object<Rp, Fp> >
+ h(new future_deferred_object<Rp, Fp>(boost::forward<Fp>(f)));
+ return BOOST_THREAD_FUTURE<Rp>(h);
+ }
+
+ ////////////////////////////////
+ // make_future_async_object
+ ////////////////////////////////
+ template <class Rp, class Fp>
+ BOOST_THREAD_FUTURE<Rp>
+ #if ! defined BOOST_NO_CXX11_RVALUE_REFERENCES
+ make_future_async_object(Fp&& f)
+ #else
+ make_future_async_object(Fp f)
+ #endif
+ {
+ shared_ptr<future_async_object<Rp, Fp> >
+ h(new future_async_object<Rp, Fp>(boost::forward<Fp>(f)));
+ return BOOST_THREAD_FUTURE<Rp>(h);
+ }
+
+ }
+
+ ////////////////////////////////
+ // template <class F, class... ArgTypes>
+ // future<R> async(F&&, ArgTypes&&...);
+ ////////////////////////////////
+
 #if defined BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR
 
 #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
@@ -2164,6 +2402,10 @@
               typename decay<ArgTypes>::type...
           )>::type R;
           typedef packaged_task<R(ArgTypes...)> packaged_task_type;
+
+ typedef detail::async_func<typename decay<F>::type, typename decay<ArgTypes>::type...> BF;
+ typedef typename BF::result_type Rp;
+
   #else
         template <class F>
         BOOST_THREAD_FUTURE<typename boost::result_of<typename decay<F>::type()>::type>
@@ -2171,6 +2413,10 @@
         {
           typedef typename boost::result_of<typename decay<F>::type()>::type R;
           typedef packaged_task<R()> packaged_task_type;
+
+ typedef detail::async_func<typename decay<F>::type> BF;
+ typedef typename BF::result_type Rp;
+
   #endif
 #else
         template <class F>
@@ -2179,6 +2425,9 @@
         {
           typedef typename boost::result_of<typename decay<F>::type()>::type R;
           typedef packaged_task<R> packaged_task_type;
+
+ typedef detail::async_func<typename decay<F>::type> BF;
+ typedef typename BF::result_type Rp;
 #endif
 
         if (int(policy) & int(launch::async))
@@ -2195,10 +2444,23 @@
         }
         else if (int(policy) & int(launch::deferred))
         {
- packaged_task_type pt( boost::forward<F>(f) );
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ return boost::detail::make_future_deferred_object<Rp>(
+ BF(
+ thread_detail::decay_copy(boost::forward<F>(f))
+ , thread_detail::decay_copy(boost::forward<ArgTypes>(args))...
+ )
+ );
+#else
+ BOOST_THREAD_FUTURE<R> ret;
+ return ::boost::move(ret);
+// return boost::detail::make_future_deferred_object<Rp>(
+// BF(
+// thread_detail::decay_copy(boost::forward<F>(f))
+// )
+// );
+#endif
 
- BOOST_THREAD_FUTURE<R> ret = pt.get_future();
- return ::boost::move(ret);
         } else {
           BOOST_THREAD_FUTURE<R> ret;
           return ::boost::move(ret);
@@ -2211,6 +2473,10 @@
         return async(launch::any, boost::forward<F>(f));
     }
 
+
+ ////////////////////////////////
+ // make_future
+ ////////////////////////////////
   template <typename T>
   BOOST_THREAD_FUTURE<typename decay<T>::type> make_future(BOOST_THREAD_FWD_REF(T) value)
   {
@@ -2228,6 +2494,9 @@
 
   }
 
+ ////////////////////////////////
+ // make_shared_future
+ ////////////////////////////////
   template <typename T>
   shared_future<typename decay<T>::type> make_shared_future(BOOST_THREAD_FWD_REF(T) value)
   {
@@ -2245,6 +2514,9 @@
 
   }
 
+ ////////////////////////////////
+ // detail::future_continuation
+ ////////////////////////////////
 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
   namespace detail
   {
@@ -2318,8 +2590,10 @@
 #endif
   }
 
-// template<typename F>
-// auto then(F&& func) -> BOOST_THREAD_FUTURE<decltype(func(*this))>;
+ ////////////////////////////////
+ // template<typename F>
+ // auto future<R>::then(F&& func) -> BOOST_THREAD_FUTURE<decltype(func(*this))>;
+ ////////////////////////////////
 
   template <typename R>
   template <typename F>

Modified: trunk/boost/thread/pthread/condition_variable.hpp
==============================================================================
--- trunk/boost/thread/pthread/condition_variable.hpp (original)
+++ trunk/boost/thread/pthread/condition_variable.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -289,7 +289,7 @@
         }
 
         template <class lock_type>
- inline void wait_until(
+ cv_status wait_until(
             lock_type& lk,
             chrono::time_point<chrono::system_clock, chrono::nanoseconds> tp)
         {
@@ -299,7 +299,8 @@
             seconds s = duration_cast<seconds>(d);
             ts.tv_sec = static_cast<long>(s.count());
             ts.tv_nsec = static_cast<long>((d - s).count());
- do_timed_wait(lk, ts);
+ if (do_timed_wait(lk, ts)) return cv_status::no_timeout;
+ else return cv_status::timeout;
         }
 #endif
 

Modified: trunk/boost/thread/pthread/condition_variable_fwd.hpp
==============================================================================
--- trunk/boost/thread/pthread/condition_variable_fwd.hpp (original)
+++ trunk/boost/thread/pthread/condition_variable_fwd.hpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -212,7 +212,7 @@
         void notify_all() BOOST_NOEXCEPT;
 
 #ifdef BOOST_THREAD_USES_CHRONO
- inline void wait_until(
+ inline cv_status wait_until(
             unique_lock<mutex>& lk,
             chrono::time_point<chrono::system_clock, chrono::nanoseconds> tp)
         {
@@ -222,7 +222,8 @@
             seconds s = duration_cast<seconds>(d);
             ts.tv_sec = static_cast<long>(s.count());
             ts.tv_nsec = static_cast<long>((d - s).count());
- do_timed_wait(lk, ts);
+ if (do_timed_wait(lk, ts)) return cv_status::no_timeout;
+ else return cv_status::timeout;
         }
 #endif
         //private: // used by boost::thread::try_join_until

Modified: trunk/libs/thread/test/sync/futures/async/async_pass.cpp
==============================================================================
--- trunk/libs/thread/test/sync/futures/async/async_pass.cpp (original)
+++ trunk/libs/thread/test/sync/futures/async/async_pass.cpp 2012-11-02 17:30:36 EDT (Fri, 02 Nov 2012)
@@ -145,15 +145,16 @@
     Clock::time_point t1 = Clock::now();
     BOOST_TEST(t1 - t0 < ms(100));
   }
-// {
-// boost::future<int> f = boost::async(boost::launch::deferred, f0);
-// boost::this_thread::sleep_for(ms(300));
-// Clock::time_point t0 = Clock::now();
-// BOOST_TEST(f.get() == 3);
-// Clock::time_point t1 = Clock::now();
-// BOOST_TEST(t1 - t0 > ms(100));
-// }
-//
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ {
+ boost::future<int> f = boost::async(boost::launch::deferred, f0);
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ BOOST_TEST(f.get() == 3);
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 > ms(100));
+ }
+#endif
   {
     boost::future<int&> f = boost::async(f1);
     boost::this_thread::sleep_for(ms(300));
@@ -178,15 +179,16 @@
     Clock::time_point t1 = Clock::now();
     BOOST_TEST(t1 - t0 < ms(100));
   }
-// {
-// boost::future<int&> f = boost::async(boost::launch::deferred, f1);
-// boost::this_thread::sleep_for(ms(300));
-// Clock::time_point t0 = Clock::now();
-// BOOST_TEST(&f.get() == &i);
-// Clock::time_point t1 = Clock::now();
-// BOOST_TEST(t1 - t0 > ms(100));
-// }
-//
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ {
+ boost::future<int&> f = boost::async(boost::launch::deferred, f1);
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ BOOST_TEST(&f.get() == &i);
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 > ms(100));
+ }
+#endif
   {
     boost::future<void> f = boost::async(f2);
     boost::this_thread::sleep_for(ms(300));
@@ -211,14 +213,18 @@
     Clock::time_point t1 = Clock::now();
     BOOST_TEST(t1 - t0 < ms(100));
   }
-// {
-// boost::future<void> f = boost::async(boost::launch::deferred, f2);
-// boost::this_thread::sleep_for(ms(300));
-// Clock::time_point t0 = Clock::now();
-// f.get();
-// Clock::time_point t1 = Clock::now();
-// BOOST_TEST(t1 - t0 > ms(100));
-// }
+#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
+ {
+ boost::future<void> f = boost::async(boost::launch::deferred, f2);
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ f.get();
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 > ms(100));
+ }
+#endif
+
+ // todo fixme
 #if 0 && defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
   {
     boost::future<boost::interprocess::unique_ptr<int, boost::default_delete<int> > > f = boost::async(boost::launch::async, &f3, 3);
@@ -230,6 +236,7 @@
   }
 #endif
 
+ // todo fixme
 #if 0 && defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
   {
     boost::future<boost::interprocess::unique_ptr<int, boost::default_delete<int> > > f = boost::async(&f4, boost::interprocess::unique_ptr<int, boost::default_delete<int> >(new int(3)));


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk