Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r84607 - in trunk: boost/thread libs/thread/test/sync/futures/async
From: vicente.botet_at_[hidden]
Date: 2013-06-02 01:56:14


Author: viboes
Date: 2013-06-02 01:56:13 EDT (Sun, 02 Jun 2013)
New Revision: 84607
URL: http://svn.boost.org/trac/boost/changeset/84607

Log:
Thread: fix shared_future::get when the shared state is async; refactor the shared_state classes.
Text files modified:
   trunk/boost/thread/future.hpp | 272 +++++++--------------------------------
   trunk/libs/thread/test/sync/futures/async/async_pass.cpp | 22 +++
   2 files changed, 70 insertions(+), 224 deletions(-)

Modified: trunk/boost/thread/future.hpp
==============================================================================
--- trunk/boost/thread/future.hpp (original)
+++ trunk/boost/thread/future.hpp 2013-06-02 01:56:13 EDT (Sun, 02 Jun 2013)
@@ -203,6 +203,7 @@
 #if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
             typedef shared_ptr<shared_state_base> continuation_ptr_type;
 #else
+ // This type shouldn't be included, but is included to maintain the same layout.
             typedef shared_ptr<void> continuation_ptr_type;
 #endif
 
@@ -311,22 +312,22 @@
                 }
             }
 
- void wait_internal(boost::unique_lock<boost::mutex> &lock, bool rethrow=true)
+ void wait_internal(boost::unique_lock<boost::mutex> &lk, bool rethrow=true)
             {
- do_callback(lock);
+ do_callback(lk);
               //if (!done) // fixme why this doesn't work?
               {
                 if (is_deferred_)
                 {
                   is_deferred_=false;
- execute(lock);
- //lock.unlock();
+ execute(lk);
+ //lk.unlock();
                 }
                 else
                 {
                   while(!done)
                   {
- waiters.wait(lock);
+ waiters.wait(lk);
                   }
 #if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
                   if(rethrow && thread_was_interrupted)
@@ -342,7 +343,7 @@
               }
             }
 
- void wait(bool rethrow=true)
+ virtual void wait(bool rethrow=true)
             {
                 boost::unique_lock<boost::mutex> lock(mutex);
                 wait_internal(lock, rethrow);
@@ -644,7 +645,7 @@
 #endif
             }
 
- virtual move_dest_type get()
+ move_dest_type get()
             {
                 wait();
                 return boost::move(*result);
@@ -794,40 +795,51 @@
         };
 
         /////////////////////////
- /// future_async_shared_state
+ /// future_async_shared_state_base
         /////////////////////////
- template<typename Rp, typename Fp>
- struct future_async_shared_state: shared_state<Rp>
+ template<typename Rp>
+ struct future_async_shared_state_base: shared_state<Rp>
         {
           typedef shared_state<Rp> base_type;
- typedef typename base_type::move_dest_type move_dest_type;
-
           boost::thread thr_;
 
         public:
- explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
- thr_(&future_async_shared_state::run, this, boost::forward<Fp>(f))
+ future_async_shared_state_base()
+ {
+ this->set_async();
+ }
+ explicit future_async_shared_state_base(BOOST_THREAD_RV_REF(boost::thread) th) :
+ thr_(boost::move(th))
           {
             this->set_async();
           }
 
- ~future_async_shared_state()
+ ~future_async_shared_state_base()
           {
             if (thr_.joinable()) thr_.join();
           }
 
- move_dest_type get()
+ virtual void wait(bool rethrow)
           {
               if (thr_.joinable()) thr_.join();
- // fixme Is the lock needed during the whole scope?
- //this->wait();
- boost::unique_lock<boost::mutex> lock(this->mutex);
- this->wait_internal(lock);
-
- //return static_cast<move_dest_type>(*(this->result));
- //return boost::move<Rp>(*(this->result));
- return boost::move(*(this->result));
+ this->base_type::wait(rethrow);
+ }
+ };
+
+ /////////////////////////
+ /// future_async_shared_state
+ /////////////////////////
+ template<typename Rp, typename Fp>
+ struct future_async_shared_state: future_async_shared_state_base<Rp>
+ {
+ typedef future_async_shared_state_base<Rp> base_type;
+
+ public:
+ explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
+ base_type(thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)))
+ {
           }
+
           static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f)
           {
             try
@@ -848,21 +860,14 @@
         };
 
         template<typename Fp>
- struct future_async_shared_state<void, Fp>: public shared_state<void>
+ struct future_async_shared_state<void, Fp>: public future_async_shared_state_base<void>
         {
- typedef shared_state<void> base_type;
- boost::thread thr_;
+ typedef future_async_shared_state_base<void> base_type;
 
         public:
           explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
- thr_(&future_async_shared_state::run, this, boost::forward<Fp>(f))
- {
- this->set_async();
- }
-
- ~future_async_shared_state()
+ base_type(thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)))
           {
- if (thr_.joinable()) thr_.join();
           }
 
           static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f)
@@ -886,36 +891,16 @@
         };
 
         template<typename Rp, typename Fp>
- struct future_async_shared_state<Rp&, Fp>: shared_state<Rp&>
+ struct future_async_shared_state<Rp&, Fp>: future_async_shared_state_base<Rp&>
         {
- typedef shared_state<Rp&> base_type;
- typedef typename base_type::move_dest_type move_dest_type;
-
- boost::thread thr_;
+ typedef future_async_shared_state_base<Rp&> base_type;
 
         public:
           explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
- thr_(&future_async_shared_state::run, this, boost::forward<Fp>(f))
+ base_type(thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)))
           {
- this->set_async();
           }
 
- ~future_async_shared_state()
- {
- if (thr_.joinable()) thr_.join();
- }
-
- move_dest_type get()
- {
- if (thr_.joinable()) thr_.join();
- // fixme Is the lock needed during the whole scope?
- //this->wait();
- boost::unique_lock<boost::mutex> lock(this->mutex);
- this->wait_internal(lock);
-
- //return static_cast<move_dest_type>(*(this->result));
- return boost::move(*(this->result));
- }
           static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f)
           {
             try
@@ -2722,10 +2707,8 @@
                 }
             }
         };
-
     }
 
-
 #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
   #if defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
     template<typename R, typename ...ArgTypes>
@@ -2989,7 +2972,6 @@
         // result retrieval
         BOOST_THREAD_FUTURE<R> get_future()
         {
-
             if(!task)
             {
                 boost::throw_exception(task_moved());
@@ -3004,10 +2986,8 @@
                 boost::throw_exception(future_already_retrieved());
             }
             //return BOOST_THREAD_FUTURE<R>();
-
         }
 
-
         // execution
 #if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
         void operator()(BOOST_THREAD_RV_REF(ArgTypes)... args)
@@ -3055,7 +3035,6 @@
         {
             task->set_wait_callback(f,this);
         }
-
     };
 
 #if defined BOOST_THREAD_PROVIDES_FUTURE_CTOR_ALLOCATORS
@@ -3219,11 +3198,7 @@
 
           BOOST_THREAD_FUTURE<R> ret = pt.get_future();
           ret.set_async();
-//#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
-// boost::thread( boost::move(pt), boost::forward<ArgTypes>(args)... ).detach(); // todo forward
-//#else
           boost::thread( boost::move(pt) ).detach();
-//#endif
           return ::boost::move(ret);
 #endif
         }
@@ -3434,57 +3409,26 @@
     /////////////////////////
 
     template<typename F, typename Rp, typename Fp>
- struct future_async_continuation_shared_state: shared_state<Rp>
+ struct future_async_continuation_shared_state: future_async_shared_state_base<Rp>
     {
- typedef shared_state<Rp> base_type;
- typedef typename base_type::move_dest_type move_dest_type;
- typedef weak_ptr<shared_state_base> parent_ptr_type;
-
       F parent;
       Fp continuation;
- boost::thread thr_;
 
     public:
       explicit future_async_continuation_shared_state(
           F& f, BOOST_THREAD_FWD_REF(Fp) c
           ) :
       parent(f.future_),
- //continuation(boost::forward<Fp>(c)
- continuation(c),
- thr_()
+ continuation(c)
       {
- this->set_async();
- }
-
- ~future_async_continuation_shared_state()
- {
- if (thr_.get_id()==thread::id())
- {
- //BOOST_THREAD_LOG << "ERRORRRRRRRRR ~future_async_continuation_shared_state " << this << " " << thr_.get_id() << BOOST_THREAD_END_LOG;
- return;
- }
- if (thr_.joinable()) {
- thr_.join();
- }
       }
 
       void launch_continuation(boost::unique_lock<boost::mutex>& lock)
       {
         lock.unlock();
- thr_ = thread(&future_async_continuation_shared_state::run, this);
+ this->thr_ = thread(&future_async_continuation_shared_state::run, this);
       }
 
- move_dest_type get()
- {
- if (thr_.joinable()) thr_.join();
- // fixme Is the lock needed during the whole scope?
- //this->wait();
- boost::unique_lock<boost::mutex> lock(this->mutex);
- this->wait_internal(lock);
-
- // fixme use boost::move
- return static_cast<move_dest_type>(*(this->result));
- }
       static void run(future_async_continuation_shared_state* that)
       {
         try
@@ -3505,12 +3449,10 @@
     };
 
     template<typename F, typename Fp>
- struct future_async_continuation_shared_state<F, void, Fp>: public shared_state<void>
+ struct future_async_continuation_shared_state<F, void, Fp>: public future_async_shared_state_base<void>
     {
- typedef shared_state<void> base_type;
       F& parent;
       Fp continuation;
- boost::thread thr_;
 
     public:
       explicit future_async_continuation_shared_state(
@@ -3518,25 +3460,14 @@
           ) :
       parent(f),
       //continuation(boost::forward<Fp>(c)
- continuation(boost::move(c)),
- thr_()
+ continuation(boost::move(c))
       {
- this->set_async();
- }
-
- ~future_async_continuation_shared_state()
- {
- if (thr_.get_id()==thread::id())
- {
- return;
- }
- if (thr_.joinable()) thr_.join();
       }
 
       void launch_continuation(boost::unique_lock<boost::mutex>& lk)
       {
         lk.unlock();
- thr_ = thread(&future_async_continuation_shared_state::run, this);
+ this->thr_ = thread(&future_async_continuation_shared_state::run, this);
       }
 
       static void run(future_async_continuation_shared_state* that)
@@ -3566,7 +3497,6 @@
     template<typename F, typename Rp, typename Fp>
     struct future_deferred_continuation_shared_state: shared_state<Rp>
     {
- typedef shared_state<Rp> base_type;
       F& parent;
       Fp continuation;
 
@@ -3601,7 +3531,6 @@
     template<typename F, typename Fp>
     struct future_deferred_continuation_shared_state<F,void,Fp>: shared_state<void>
     {
- typedef shared_state<void> base_type;
       F& parent;
       Fp continuation;
 
@@ -3610,7 +3539,6 @@
           F& f, BOOST_THREAD_FWD_REF(Fp) c
           ):
           parent(f),
- //continuation(boost::forward<Fp>(c)
           continuation(boost::move(c))
       {
         this->set_deferred();
@@ -3665,110 +3593,6 @@
       return BOOST_THREAD_FUTURE<Rp>(h);
     }
 
-// template <typename F, typename R, typename C>
-// struct future_continuation : shared_state<R>
-// {
-// F& parent;
-// C continuation;
-// launch policy_;
-//
-// future_continuation(boost::unique_lock<boost::mutex>& lk, F& f, BOOST_THREAD_FWD_REF(C) c) :
-// parent(f),
-// continuation(boost::forward<C>(c)),
-// policy_(f.launch_policy())
-// {
-// init_continuation(lk);
-// }
-// future_continuation(boost::unique_lock<boost::mutex>& lk, F& f, BOOST_THREAD_FWD_REF(C) c, launch policy) :
-// parent(f),
-// continuation(boost::forward<C>(c)),
-// policy_(policy)
-// {
-// init_continuation(lk);
-// }
-// ~future_continuation()
-// {}
-//
-// void init_continuation(boost::unique_lock<boost::mutex>& lk)
-// {
-// try
-// {
-// lk.unlock();
-// // fixme what to do depending on inherits_launch_policy_ and policy_?
-// if (int(policy_) & int(launch::deferred))
-// {
-// R val = continuation(parent);
-// next.set_value(boost::move(val));
-// }
-// else
-// {
-// BOOST_THREAD_FUTURE<R> f = async(policy_, continuation, boost::ref(parent));
-// R val = f.get();
-// next.set_value(boost::move(val));
-// }
-// }
-// catch (...)
-// {
-// next.set_exception(boost::current_exception());
-// }
-// }
-// private:
-//
-// future_continuation(future_continuation const&);
-// future_continuation& operator=(future_continuation const&);
-// };
-//#if defined(BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR)
-// template <typename F, typename R, typename CR>
-// struct future_continuation<F,R,CR(*)(F&)> : shared_state<R>
-// {
-// F& parent;
-// CR(*continuation)(F&) ;
-// launch policy_;
-//
-// future_continuation(F& f, CR(*c)(F&)) :
-// parent(f),
-// continuation(c),
-// policy_(f.launch_policy()),
-// next()
-// {}
-// future_continuation(F& f, CR(*c)(F&), launch policy) :
-// parent(f),
-// continuation(c),
-// policy_(policy),
-// next()
-// {}
-// ~future_continuation()
-// {}
-//
-// void start_continuation(boost::unique_lock<boost::mutex>& lk)
-// {
-// try
-// {
-// lk.unlock();
-// // fixme what to do depending on inherits_launch_policy_ and policy_?
-// if (int(policy_) & int(launch::deferred))
-// {
-// R val = continuation(parent);
-// next.set_value(boost::move(val));
-// }
-// else
-// {
-// BOOST_THREAD_FUTURE<R> f = async(policy_, continuation, boost::ref(parent));
-// R val = f.get();
-// next.set_value(boost::move(val));
-// }
-// }
-// catch (...)
-// {
-// next.set_exception(boost::current_exception());
-// }
-// }
-// private:
-//
-// future_continuation(future_continuation const&);
-// future_continuation& operator=(future_continuation const&);
-// };
-//#endif
   }
 
   ////////////////////////////////

Modified: trunk/libs/thread/test/sync/futures/async/async_pass.cpp
==============================================================================
--- trunk/libs/thread/test/sync/futures/async/async_pass.cpp (original)
+++ trunk/libs/thread/test/sync/futures/async/async_pass.cpp 2013-06-02 01:56:13 EDT (Sun, 02 Jun 2013)
@@ -169,6 +169,28 @@
   {
     try
     {
+ boost::shared_future<int> f = boost::async(f0).share();
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ BOOST_TEST(f.get() == 3);
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 < ms(300));
+ }
+ catch (std::exception& ex)
+ {
+ std::cout << __FILE__ << "[" << __LINE__ << "]" << ex.what() << std::endl;
+ BOOST_TEST(false && "exception thrown");
+ }
+ catch (...)
+ {
+ BOOST_TEST(false && "exception thrown");
+ }
+
+ }
+ std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
+ {
+ try
+ {
       boost::future<int> f = boost::async(boost::launch::async, f0);
       boost::this_thread::sleep_for(ms(300));
       Clock::time_point t0 = Clock::now();


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk