|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r79523 - in trunk/boost/asio/detail: . impl
From: chris_at_[hidden]
Date: 2012-07-15 00:30:52
Author: chris_kohlhoff
Date: 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
New Revision: 79523
URL: http://svn.boost.org/trac/boost/changeset/79523
Log:
Avoid calling work_finished() if a completion handler creates more work.
Text files modified:
trunk/boost/asio/detail/atomic_count.hpp | 3
trunk/boost/asio/detail/impl/task_io_service.ipp | 133 +++++++++++++++++++++------------------
trunk/boost/asio/detail/task_io_service.hpp | 6 -
3 files changed, 77 insertions(+), 65 deletions(-)
Modified: trunk/boost/asio/detail/atomic_count.hpp
==============================================================================
--- trunk/boost/asio/detail/atomic_count.hpp (original)
+++ trunk/boost/asio/detail/atomic_count.hpp 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
@@ -31,10 +31,13 @@
#if !defined(BOOST_HAS_THREADS) || defined(BOOST_ASIO_DISABLE_THREADS)
typedef long atomic_count;
+inline void increment(atomic_count& a, long b) { a += b; }
#elif defined(BOOST_ASIO_HAS_STD_ATOMIC)
typedef std::atomic<long> atomic_count;
+inline void increment(atomic_count& a, long b) { a += b; }
#else // defined(BOOST_ASIO_HAS_STD_ATOMIC)
typedef boost::detail::atomic_count atomic_count;
+inline void increment(atomic_count& a, long b) { while (b > 0) ++a, --b; }
#endif // defined(BOOST_ASIO_HAS_STD_ATOMIC)
} // namespace detail
Modified: trunk/boost/asio/detail/impl/task_io_service.ipp
==============================================================================
--- trunk/boost/asio/detail/impl/task_io_service.ipp (original)
+++ trunk/boost/asio/detail/impl/task_io_service.ipp 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
@@ -30,48 +30,67 @@
namespace asio {
namespace detail {
+struct task_io_service::thread_info
+{
+ event* wakeup_event;
+ op_queue<operation> private_op_queue;
+ long private_outstanding_work;
+ thread_info* next;
+};
+
struct task_io_service::task_cleanup
{
~task_cleanup()
{
+ if (this_thread_->private_outstanding_work > 0)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work);
+ }
+ this_thread_->private_outstanding_work = 0;
+
// Enqueue the completed operations and reinsert the task at the end of
// the operation queue.
lock_->lock();
task_io_service_->task_interrupted_ = true;
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
}
task_io_service* task_io_service_;
mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
+ thread_info* this_thread_;
};
struct task_io_service::work_cleanup
{
~work_cleanup()
{
- task_io_service_->work_finished();
+ if (this_thread_->private_outstanding_work > 1)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work - 1);
+ }
+ else if (this_thread_->private_outstanding_work < 1)
+ {
+ task_io_service_->work_finished();
+ }
+ this_thread_->private_outstanding_work = 0;
#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- if (!ops_->empty())
+ if (!this_thread_->private_op_queue.empty())
{
lock_->lock();
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
}
task_io_service* task_io_service_;
mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
-};
-
-struct task_io_service::thread_info
-{
- event* wakeup_event;
- op_queue<operation>* private_op_queue;
- thread_info* next;
+ thread_info* this_thread_;
};
task_io_service::task_io_service(
@@ -131,19 +150,14 @@
thread_info this_thread;
event wakeup_event;
this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = &private_op_queue;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
std::size_t n = 0;
- for (; do_run_one(lock, this_thread, private_op_queue, ec); lock.lock())
+ for (; do_run_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
@@ -161,14 +175,13 @@
thread_info this_thread;
event wakeup_event;
this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
mutex::scoped_lock lock(mutex_);
- return do_run_one(lock, this_thread, private_op_queue, ec);
+ return do_run_one(lock, this_thread, ec);
}
std::size_t task_io_service::poll(boost::system::error_code& ec)
@@ -182,12 +195,7 @@
thread_info this_thread;
this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = &private_op_queue;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
@@ -199,12 +207,11 @@
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
std::size_t n = 0;
- for (; do_poll_one(lock, private_op_queue, ec); lock.lock())
+ for (; do_poll_one(lock, this_thread, ec); lock.lock())
if (n != (std::numeric_limits<std::size_t>::max)())
++n;
return n;
@@ -221,8 +228,7 @@
thread_info this_thread;
this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
this_thread.next = 0;
thread_call_stack::context ctx(this, this_thread);
@@ -234,11 +240,10 @@
// queue now.
if (one_thread_)
if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- return do_poll_one(lock, private_op_queue, ec);
+ return do_poll_one(lock, this_thread, ec);
}
void task_io_service::stop()
@@ -261,8 +266,22 @@
void task_io_service::post_immediate_completion(task_io_service::operation* op)
{
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (one_thread_)
+ {
+ if (thread_info* this_thread = thread_call_stack::contains(this))
+ {
+ ++this_thread->private_outstanding_work;
+ this_thread->private_op_queue.push(op);
+ return;
+ }
+ }
+#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+
work_started();
- post_deferred_completion(op);
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
}
void task_io_service::post_deferred_completion(task_io_service::operation* op)
@@ -272,11 +291,8 @@
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(op);
- return;
- }
+ this_thread->private_op_queue.push(op);
+ return;
}
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -296,11 +312,8 @@
{
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(ops);
- return;
- }
+ this_thread->private_op_queue.push(ops);
+ return;
}
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -324,11 +337,8 @@
#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
if (thread_info* this_thread = thread_call_stack::contains(this))
{
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(op);
- return;
- }
+ this_thread->private_op_queue.push(op);
+ return;
}
#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -361,7 +371,7 @@
std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
task_io_service::thread_info& this_thread,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ const boost::system::error_code& ec)
{
while (!stopped_)
{
@@ -384,13 +394,13 @@
else
lock.unlock();
- task_cleanup on_exit = { this, &lock, &private_op_queue };
+ task_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
- task_->run(!more_handlers, private_op_queue);
+ task_->run(!more_handlers, this_thread.private_op_queue);
}
else
{
@@ -402,7 +412,7 @@
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
@@ -425,7 +435,8 @@
}
std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ task_io_service::thread_info& this_thread,
+ const boost::system::error_code& ec)
{
if (stopped_)
return 0;
@@ -437,13 +448,13 @@
lock.unlock();
{
- task_cleanup c = { this, &lock, &private_op_queue };
+ task_cleanup c = { this, &lock, &this_thread };
(void)c;
// Run the task. May throw an exception. Only block if the operation
// queue is empty and we're not polling, otherwise we want to return
// as soon as possible.
- task_->run(false, private_op_queue);
+ task_->run(false, this_thread.private_op_queue);
}
o = op_queue_.front();
@@ -465,7 +476,7 @@
lock.unlock();
// Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
(void)on_exit;
// Complete the operation. May throw an exception. Deletes the object.
Modified: trunk/boost/asio/detail/task_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/task_io_service.hpp (original)
+++ trunk/boost/asio/detail/task_io_service.hpp 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
@@ -142,13 +142,11 @@
// Run at most one operation. May block.
BOOST_ASIO_DECL std::size_t do_run_one(mutex::scoped_lock& lock,
- thread_info& this_thread, op_queue<operation>& private_op_queue,
- const boost::system::error_code& ec);
+ thread_info& this_thread, const boost::system::error_code& ec);
// Poll for at most one operation.
BOOST_ASIO_DECL std::size_t do_poll_one(mutex::scoped_lock& lock,
- op_queue<operation>& private_op_queue,
- const boost::system::error_code& ec);
+ thread_info& this_thread, const boost::system::error_code& ec);
// Stop the task and all idle threads.
BOOST_ASIO_DECL void stop_all_threads(mutex::scoped_lock& lock);
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk