|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r84607 - in trunk: boost/thread libs/thread/test/sync/futures/async
From: vicente.botet_at_[hidden]
Date: 2013-06-02 01:56:14
Author: viboes
Date: 2013-06-02 01:56:13 EDT (Sun, 02 Jun 2013)
New Revision: 84607
URL: http://svn.boost.org/trac/boost/changeset/84607
Log:
Thread: fix shared_future::get when the shared state is async; refactor the shared_state classes.
Text files modified:
trunk/boost/thread/future.hpp | 272 +++++++--------------------------------
trunk/libs/thread/test/sync/futures/async/async_pass.cpp | 22 +++
2 files changed, 70 insertions(+), 224 deletions(-)
Modified: trunk/boost/thread/future.hpp
==============================================================================
--- trunk/boost/thread/future.hpp (original)
+++ trunk/boost/thread/future.hpp 2013-06-02 01:56:13 EDT (Sun, 02 Jun 2013)
@@ -203,6 +203,7 @@
#if defined BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
typedef shared_ptr<shared_state_base> continuation_ptr_type;
#else
+ // This type shouldn't be included, but is included to maintain the same layout.
typedef shared_ptr<void> continuation_ptr_type;
#endif
@@ -311,22 +312,22 @@
}
}
- void wait_internal(boost::unique_lock<boost::mutex> &lock, bool rethrow=true)
+ void wait_internal(boost::unique_lock<boost::mutex> &lk, bool rethrow=true)
{
- do_callback(lock);
+ do_callback(lk);
//if (!done) // fixme why this doesn't work?
{
if (is_deferred_)
{
is_deferred_=false;
- execute(lock);
- //lock.unlock();
+ execute(lk);
+ //lk.unlock();
}
else
{
while(!done)
{
- waiters.wait(lock);
+ waiters.wait(lk);
}
#if defined BOOST_THREAD_PROVIDES_INTERRUPTIONS
if(rethrow && thread_was_interrupted)
@@ -342,7 +343,7 @@
}
}
- void wait(bool rethrow=true)
+ virtual void wait(bool rethrow=true)
{
boost::unique_lock<boost::mutex> lock(mutex);
wait_internal(lock, rethrow);
@@ -644,7 +645,7 @@
#endif
}
- virtual move_dest_type get()
+ move_dest_type get()
{
wait();
return boost::move(*result);
@@ -794,40 +795,51 @@
};
/////////////////////////
- /// future_async_shared_state
+ /// future_async_shared_state_base
/////////////////////////
- template<typename Rp, typename Fp>
- struct future_async_shared_state: shared_state<Rp>
+ template<typename Rp>
+ struct future_async_shared_state_base: shared_state<Rp>
{
typedef shared_state<Rp> base_type;
- typedef typename base_type::move_dest_type move_dest_type;
-
boost::thread thr_;
public:
- explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
- thr_(&future_async_shared_state::run, this, boost::forward<Fp>(f))
+ future_async_shared_state_base()
+ {
+ this->set_async();
+ }
+ explicit future_async_shared_state_base(BOOST_THREAD_RV_REF(boost::thread) th) :
+ thr_(boost::move(th))
{
this->set_async();
}
- ~future_async_shared_state()
+ ~future_async_shared_state_base()
{
if (thr_.joinable()) thr_.join();
}
- move_dest_type get()
+ virtual void wait(bool rethrow)
{
if (thr_.joinable()) thr_.join();
- // fixme Is the lock needed during the whole scope?
- //this->wait();
- boost::unique_lock<boost::mutex> lock(this->mutex);
- this->wait_internal(lock);
-
- //return static_cast<move_dest_type>(*(this->result));
- //return boost::move<Rp>(*(this->result));
- return boost::move(*(this->result));
+ this->base_type::wait(rethrow);
+ }
+ };
+
+ /////////////////////////
+ /// future_async_shared_state
+ /////////////////////////
+ template<typename Rp, typename Fp>
+ struct future_async_shared_state: future_async_shared_state_base<Rp>
+ {
+ typedef future_async_shared_state_base<Rp> base_type;
+
+ public:
+ explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
+ base_type(thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)))
+ {
}
+
static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f)
{
try
@@ -848,21 +860,14 @@
};
template<typename Fp>
- struct future_async_shared_state<void, Fp>: public shared_state<void>
+ struct future_async_shared_state<void, Fp>: public future_async_shared_state_base<void>
{
- typedef shared_state<void> base_type;
- boost::thread thr_;
+ typedef future_async_shared_state_base<void> base_type;
public:
explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
- thr_(&future_async_shared_state::run, this, boost::forward<Fp>(f))
- {
- this->set_async();
- }
-
- ~future_async_shared_state()
+ base_type(thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)))
{
- if (thr_.joinable()) thr_.join();
}
static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f)
@@ -886,36 +891,16 @@
};
template<typename Rp, typename Fp>
- struct future_async_shared_state<Rp&, Fp>: shared_state<Rp&>
+ struct future_async_shared_state<Rp&, Fp>: future_async_shared_state_base<Rp&>
{
- typedef shared_state<Rp&> base_type;
- typedef typename base_type::move_dest_type move_dest_type;
-
- boost::thread thr_;
+ typedef future_async_shared_state_base<Rp&> base_type;
public:
explicit future_async_shared_state(BOOST_THREAD_FWD_REF(Fp) f) :
- thr_(&future_async_shared_state::run, this, boost::forward<Fp>(f))
+ base_type(thread(&future_async_shared_state::run, this, boost::forward<Fp>(f)))
{
- this->set_async();
}
- ~future_async_shared_state()
- {
- if (thr_.joinable()) thr_.join();
- }
-
- move_dest_type get()
- {
- if (thr_.joinable()) thr_.join();
- // fixme Is the lock needed during the whole scope?
- //this->wait();
- boost::unique_lock<boost::mutex> lock(this->mutex);
- this->wait_internal(lock);
-
- //return static_cast<move_dest_type>(*(this->result));
- return boost::move(*(this->result));
- }
static void run(future_async_shared_state* that, BOOST_THREAD_FWD_REF(Fp) f)
{
try
@@ -2722,10 +2707,8 @@
}
}
};
-
}
-
#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK
#if defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
template<typename R, typename ...ArgTypes>
@@ -2989,7 +2972,6 @@
// result retrieval
BOOST_THREAD_FUTURE<R> get_future()
{
-
if(!task)
{
boost::throw_exception(task_moved());
@@ -3004,10 +2986,8 @@
boost::throw_exception(future_already_retrieved());
}
//return BOOST_THREAD_FUTURE<R>();
-
}
-
// execution
#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
void operator()(BOOST_THREAD_RV_REF(ArgTypes)... args)
@@ -3055,7 +3035,6 @@
{
task->set_wait_callback(f,this);
}
-
};
#if defined BOOST_THREAD_PROVIDES_FUTURE_CTOR_ALLOCATORS
@@ -3219,11 +3198,7 @@
BOOST_THREAD_FUTURE<R> ret = pt.get_future();
ret.set_async();
-//#if defined BOOST_THREAD_PROVIDES_SIGNATURE_PACKAGED_TASK && defined(BOOST_THREAD_PROVIDES_VARIADIC_THREAD)
-// boost::thread( boost::move(pt), boost::forward<ArgTypes>(args)... ).detach(); // todo forward
-//#else
boost::thread( boost::move(pt) ).detach();
-//#endif
return ::boost::move(ret);
#endif
}
@@ -3434,57 +3409,26 @@
/////////////////////////
template<typename F, typename Rp, typename Fp>
- struct future_async_continuation_shared_state: shared_state<Rp>
+ struct future_async_continuation_shared_state: future_async_shared_state_base<Rp>
{
- typedef shared_state<Rp> base_type;
- typedef typename base_type::move_dest_type move_dest_type;
- typedef weak_ptr<shared_state_base> parent_ptr_type;
-
F parent;
Fp continuation;
- boost::thread thr_;
public:
explicit future_async_continuation_shared_state(
F& f, BOOST_THREAD_FWD_REF(Fp) c
) :
parent(f.future_),
- //continuation(boost::forward<Fp>(c)
- continuation(c),
- thr_()
+ continuation(c)
{
- this->set_async();
- }
-
- ~future_async_continuation_shared_state()
- {
- if (thr_.get_id()==thread::id())
- {
- //BOOST_THREAD_LOG << "ERRORRRRRRRRR ~future_async_continuation_shared_state " << this << " " << thr_.get_id() << BOOST_THREAD_END_LOG;
- return;
- }
- if (thr_.joinable()) {
- thr_.join();
- }
}
void launch_continuation(boost::unique_lock<boost::mutex>& lock)
{
lock.unlock();
- thr_ = thread(&future_async_continuation_shared_state::run, this);
+ this->thr_ = thread(&future_async_continuation_shared_state::run, this);
}
- move_dest_type get()
- {
- if (thr_.joinable()) thr_.join();
- // fixme Is the lock needed during the whole scope?
- //this->wait();
- boost::unique_lock<boost::mutex> lock(this->mutex);
- this->wait_internal(lock);
-
- // fixme use boost::move
- return static_cast<move_dest_type>(*(this->result));
- }
static void run(future_async_continuation_shared_state* that)
{
try
@@ -3505,12 +3449,10 @@
};
template<typename F, typename Fp>
- struct future_async_continuation_shared_state<F, void, Fp>: public shared_state<void>
+ struct future_async_continuation_shared_state<F, void, Fp>: public future_async_shared_state_base<void>
{
- typedef shared_state<void> base_type;
F& parent;
Fp continuation;
- boost::thread thr_;
public:
explicit future_async_continuation_shared_state(
@@ -3518,25 +3460,14 @@
) :
parent(f),
//continuation(boost::forward<Fp>(c)
- continuation(boost::move(c)),
- thr_()
+ continuation(boost::move(c))
{
- this->set_async();
- }
-
- ~future_async_continuation_shared_state()
- {
- if (thr_.get_id()==thread::id())
- {
- return;
- }
- if (thr_.joinable()) thr_.join();
}
void launch_continuation(boost::unique_lock<boost::mutex>& lk)
{
lk.unlock();
- thr_ = thread(&future_async_continuation_shared_state::run, this);
+ this->thr_ = thread(&future_async_continuation_shared_state::run, this);
}
static void run(future_async_continuation_shared_state* that)
@@ -3566,7 +3497,6 @@
template<typename F, typename Rp, typename Fp>
struct future_deferred_continuation_shared_state: shared_state<Rp>
{
- typedef shared_state<Rp> base_type;
F& parent;
Fp continuation;
@@ -3601,7 +3531,6 @@
template<typename F, typename Fp>
struct future_deferred_continuation_shared_state<F,void,Fp>: shared_state<void>
{
- typedef shared_state<void> base_type;
F& parent;
Fp continuation;
@@ -3610,7 +3539,6 @@
F& f, BOOST_THREAD_FWD_REF(Fp) c
):
parent(f),
- //continuation(boost::forward<Fp>(c)
continuation(boost::move(c))
{
this->set_deferred();
@@ -3665,110 +3593,6 @@
return BOOST_THREAD_FUTURE<Rp>(h);
}
-// template <typename F, typename R, typename C>
-// struct future_continuation : shared_state<R>
-// {
-// F& parent;
-// C continuation;
-// launch policy_;
-//
-// future_continuation(boost::unique_lock<boost::mutex>& lk, F& f, BOOST_THREAD_FWD_REF(C) c) :
-// parent(f),
-// continuation(boost::forward<C>(c)),
-// policy_(f.launch_policy())
-// {
-// init_continuation(lk);
-// }
-// future_continuation(boost::unique_lock<boost::mutex>& lk, F& f, BOOST_THREAD_FWD_REF(C) c, launch policy) :
-// parent(f),
-// continuation(boost::forward<C>(c)),
-// policy_(policy)
-// {
-// init_continuation(lk);
-// }
-// ~future_continuation()
-// {}
-//
-// void init_continuation(boost::unique_lock<boost::mutex>& lk)
-// {
-// try
-// {
-// lk.unlock();
-// // fixme what to do depending on inherits_launch_policy_ and policy_?
-// if (int(policy_) & int(launch::deferred))
-// {
-// R val = continuation(parent);
-// next.set_value(boost::move(val));
-// }
-// else
-// {
-// BOOST_THREAD_FUTURE<R> f = async(policy_, continuation, boost::ref(parent));
-// R val = f.get();
-// next.set_value(boost::move(val));
-// }
-// }
-// catch (...)
-// {
-// next.set_exception(boost::current_exception());
-// }
-// }
-// private:
-//
-// future_continuation(future_continuation const&);
-// future_continuation& operator=(future_continuation const&);
-// };
-//#if defined(BOOST_THREAD_RVALUE_REFERENCES_DONT_MATCH_FUNTION_PTR)
-// template <typename F, typename R, typename CR>
-// struct future_continuation<F,R,CR(*)(F&)> : shared_state<R>
-// {
-// F& parent;
-// CR(*continuation)(F&) ;
-// launch policy_;
-//
-// future_continuation(F& f, CR(*c)(F&)) :
-// parent(f),
-// continuation(c),
-// policy_(f.launch_policy()),
-// next()
-// {}
-// future_continuation(F& f, CR(*c)(F&), launch policy) :
-// parent(f),
-// continuation(c),
-// policy_(policy),
-// next()
-// {}
-// ~future_continuation()
-// {}
-//
-// void start_continuation(boost::unique_lock<boost::mutex>& lk)
-// {
-// try
-// {
-// lk.unlock();
-// // fixme what to do depending on inherits_launch_policy_ and policy_?
-// if (int(policy_) & int(launch::deferred))
-// {
-// R val = continuation(parent);
-// next.set_value(boost::move(val));
-// }
-// else
-// {
-// BOOST_THREAD_FUTURE<R> f = async(policy_, continuation, boost::ref(parent));
-// R val = f.get();
-// next.set_value(boost::move(val));
-// }
-// }
-// catch (...)
-// {
-// next.set_exception(boost::current_exception());
-// }
-// }
-// private:
-//
-// future_continuation(future_continuation const&);
-// future_continuation& operator=(future_continuation const&);
-// };
-//#endif
}
////////////////////////////////
Modified: trunk/libs/thread/test/sync/futures/async/async_pass.cpp
==============================================================================
--- trunk/libs/thread/test/sync/futures/async/async_pass.cpp (original)
+++ trunk/libs/thread/test/sync/futures/async/async_pass.cpp 2013-06-02 01:56:13 EDT (Sun, 02 Jun 2013)
@@ -169,6 +169,28 @@
{
try
{
+ boost::shared_future<int> f = boost::async(f0).share();
+ boost::this_thread::sleep_for(ms(300));
+ Clock::time_point t0 = Clock::now();
+ BOOST_TEST(f.get() == 3);
+ Clock::time_point t1 = Clock::now();
+ BOOST_TEST(t1 - t0 < ms(300));
+ }
+ catch (std::exception& ex)
+ {
+ std::cout << __FILE__ << "[" << __LINE__ << "]" << ex.what() << std::endl;
+ BOOST_TEST(false && "exception thrown");
+ }
+ catch (...)
+ {
+ BOOST_TEST(false && "exception thrown");
+ }
+
+ }
+ std::cout << __FILE__ << "[" << __LINE__ << "]" << std::endl;
+ {
+ try
+ {
boost::future<int> f = boost::async(boost::launch::async, f0);
boost::this_thread::sleep_for(ms(300));
Clock::time_point t0 = Clock::now();
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk