Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r79523 - in trunk/boost/asio/detail: . impl
From: chris_at_[hidden]
Date: 2012-07-15 00:30:52


Author: chris_kohlhoff
Date: 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
New Revision: 79523
URL: http://svn.boost.org/trac/boost/changeset/79523

Log:
Avoid calling work_finished() if a completion handler creates more work.
Text files modified:
   trunk/boost/asio/detail/atomic_count.hpp | 3
   trunk/boost/asio/detail/impl/task_io_service.ipp | 133 +++++++++++++++++++++------------------
   trunk/boost/asio/detail/task_io_service.hpp | 6 -
   3 files changed, 77 insertions(+), 65 deletions(-)

Modified: trunk/boost/asio/detail/atomic_count.hpp
==============================================================================
--- trunk/boost/asio/detail/atomic_count.hpp (original)
+++ trunk/boost/asio/detail/atomic_count.hpp 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
@@ -31,10 +31,13 @@
 
 #if !defined(BOOST_HAS_THREADS) || defined(BOOST_ASIO_DISABLE_THREADS)
 typedef long atomic_count;
+inline void increment(atomic_count& a, long b) { a += b; }
 #elif defined(BOOST_ASIO_HAS_STD_ATOMIC)
 typedef std::atomic<long> atomic_count;
+inline void increment(atomic_count& a, long b) { a += b; }
 #else // defined(BOOST_ASIO_HAS_STD_ATOMIC)
 typedef boost::detail::atomic_count atomic_count;
+inline void increment(atomic_count& a, long b) { while (b > 0) ++a, --b; }
 #endif // defined(BOOST_ASIO_HAS_STD_ATOMIC)
 
 } // namespace detail

Modified: trunk/boost/asio/detail/impl/task_io_service.ipp
==============================================================================
--- trunk/boost/asio/detail/impl/task_io_service.ipp (original)
+++ trunk/boost/asio/detail/impl/task_io_service.ipp 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
@@ -30,48 +30,67 @@
 namespace asio {
 namespace detail {
 
+struct task_io_service::thread_info
+{
+ event* wakeup_event;
+ op_queue<operation> private_op_queue;
+ long private_outstanding_work;
+ thread_info* next;
+};
+
 struct task_io_service::task_cleanup
 {
   ~task_cleanup()
   {
+ if (this_thread_->private_outstanding_work > 0)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work);
+ }
+ this_thread_->private_outstanding_work = 0;
+
     // Enqueue the completed operations and reinsert the task at the end of
     // the operation queue.
     lock_->lock();
     task_io_service_->task_interrupted_ = true;
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
     task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
   }
 
   task_io_service* task_io_service_;
   mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
+ thread_info* this_thread_;
 };
 
 struct task_io_service::work_cleanup
 {
   ~work_cleanup()
   {
- task_io_service_->work_finished();
+ if (this_thread_->private_outstanding_work > 1)
+ {
+ boost::asio::detail::increment(
+ task_io_service_->outstanding_work_,
+ this_thread_->private_outstanding_work - 1);
+ }
+ else if (this_thread_->private_outstanding_work < 1)
+ {
+ task_io_service_->work_finished();
+ }
+ this_thread_->private_outstanding_work = 0;
 
 #if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- if (!ops_->empty())
+ if (!this_thread_->private_op_queue.empty())
     {
       lock_->lock();
- task_io_service_->op_queue_.push(*ops_);
+ task_io_service_->op_queue_.push(this_thread_->private_op_queue);
     }
 #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
   }
 
   task_io_service* task_io_service_;
   mutex::scoped_lock* lock_;
- op_queue<operation>* ops_;
-};
-
-struct task_io_service::thread_info
-{
- event* wakeup_event;
- op_queue<operation>* private_op_queue;
- thread_info* next;
+ thread_info* this_thread_;
 };
 
 task_io_service::task_io_service(
@@ -131,19 +150,14 @@
   thread_info this_thread;
   event wakeup_event;
   this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = &private_op_queue;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
   this_thread.next = 0;
   thread_call_stack::context ctx(this, this_thread);
 
   mutex::scoped_lock lock(mutex_);
 
   std::size_t n = 0;
- for (; do_run_one(lock, this_thread, private_op_queue, ec); lock.lock())
+ for (; do_run_one(lock, this_thread, ec); lock.lock())
     if (n != (std::numeric_limits<std::size_t>::max)())
       ++n;
   return n;
@@ -161,14 +175,13 @@
   thread_info this_thread;
   event wakeup_event;
   this_thread.wakeup_event = &wakeup_event;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
   this_thread.next = 0;
   thread_call_stack::context ctx(this, this_thread);
 
   mutex::scoped_lock lock(mutex_);
 
- return do_run_one(lock, this_thread, private_op_queue, ec);
+ return do_run_one(lock, this_thread, ec);
 }
 
 std::size_t task_io_service::poll(boost::system::error_code& ec)
@@ -182,12 +195,7 @@
 
   thread_info this_thread;
   this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
-#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = &private_op_queue;
-#else // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
- this_thread.private_op_queue = 0;
-#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ this_thread.private_outstanding_work = 0;
   this_thread.next = 0;
   thread_call_stack::context ctx(this, this_thread);
 
@@ -199,12 +207,11 @@
   // queue now.
   if (one_thread_)
     if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
 #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
 
   std::size_t n = 0;
- for (; do_poll_one(lock, private_op_queue, ec); lock.lock())
+ for (; do_poll_one(lock, this_thread, ec); lock.lock())
     if (n != (std::numeric_limits<std::size_t>::max)())
       ++n;
   return n;
@@ -221,8 +228,7 @@
 
   thread_info this_thread;
   this_thread.wakeup_event = 0;
- op_queue<operation> private_op_queue;
- this_thread.private_op_queue = 0;
+ this_thread.private_outstanding_work = 0;
   this_thread.next = 0;
   thread_call_stack::context ctx(this, this_thread);
 
@@ -234,11 +240,10 @@
   // queue now.
   if (one_thread_)
     if (thread_info* outer_thread_info = ctx.next_by_key())
- if (outer_thread_info->private_op_queue)
- op_queue_.push(*outer_thread_info->private_op_queue);
+ op_queue_.push(outer_thread_info->private_op_queue);
 #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
 
- return do_poll_one(lock, private_op_queue, ec);
+ return do_poll_one(lock, this_thread, ec);
 }
 
 void task_io_service::stop()
@@ -261,8 +266,22 @@
 
 void task_io_service::post_immediate_completion(task_io_service::operation* op)
 {
+#if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+ if (one_thread_)
+ {
+ if (thread_info* this_thread = thread_call_stack::contains(this))
+ {
+ ++this_thread->private_outstanding_work;
+ this_thread->private_op_queue.push(op);
+ return;
+ }
+ }
+#endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
+
   work_started();
- post_deferred_completion(op);
+ mutex::scoped_lock lock(mutex_);
+ op_queue_.push(op);
+ wake_one_thread_and_unlock(lock);
 }
 
 void task_io_service::post_deferred_completion(task_io_service::operation* op)
@@ -272,11 +291,8 @@
   {
     if (thread_info* this_thread = thread_call_stack::contains(this))
     {
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(op);
- return;
- }
+ this_thread->private_op_queue.push(op);
+ return;
     }
   }
 #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -296,11 +312,8 @@
     {
       if (thread_info* this_thread = thread_call_stack::contains(this))
       {
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(ops);
- return;
- }
+ this_thread->private_op_queue.push(ops);
+ return;
       }
     }
 #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
@@ -324,11 +337,8 @@
 #if defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
   if (thread_info* this_thread = thread_call_stack::contains(this))
   {
- if (this_thread->private_op_queue)
- {
- this_thread->private_op_queue->push(op);
- return;
- }
+ this_thread->private_op_queue.push(op);
+ return;
   }
 #endif // defined(BOOST_HAS_THREADS) && !defined(BOOST_ASIO_DISABLE_THREADS)
 
@@ -361,7 +371,7 @@
 
 std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
     task_io_service::thread_info& this_thread,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ const boost::system::error_code& ec)
 {
   while (!stopped_)
   {
@@ -384,13 +394,13 @@
         else
           lock.unlock();
 
- task_cleanup on_exit = { this, &lock, &private_op_queue };
+ task_cleanup on_exit = { this, &lock, &this_thread };
         (void)on_exit;
 
         // Run the task. May throw an exception. Only block if the operation
         // queue is empty and we're not polling, otherwise we want to return
         // as soon as possible.
- task_->run(!more_handlers, private_op_queue);
+ task_->run(!more_handlers, this_thread.private_op_queue);
       }
       else
       {
@@ -402,7 +412,7 @@
           lock.unlock();
 
         // Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
         (void)on_exit;
 
         // Complete the operation. May throw an exception. Deletes the object.
@@ -425,7 +435,8 @@
 }
 
 std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
- op_queue<operation>& private_op_queue, const boost::system::error_code& ec)
+ task_io_service::thread_info& this_thread,
+ const boost::system::error_code& ec)
 {
   if (stopped_)
     return 0;
@@ -437,13 +448,13 @@
     lock.unlock();
 
     {
- task_cleanup c = { this, &lock, &private_op_queue };
+ task_cleanup c = { this, &lock, &this_thread };
       (void)c;
 
       // Run the task. May throw an exception. Only block if the operation
       // queue is empty and we're not polling, otherwise we want to return
       // as soon as possible.
- task_->run(false, private_op_queue);
+ task_->run(false, this_thread.private_op_queue);
     }
 
     o = op_queue_.front();
@@ -465,7 +476,7 @@
     lock.unlock();
 
   // Ensure the count of outstanding work is decremented on block exit.
- work_cleanup on_exit = { this, &lock, &private_op_queue };
+ work_cleanup on_exit = { this, &lock, &this_thread };
   (void)on_exit;
 
   // Complete the operation. May throw an exception. Deletes the object.

Modified: trunk/boost/asio/detail/task_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/task_io_service.hpp (original)
+++ trunk/boost/asio/detail/task_io_service.hpp 2012-07-15 00:30:50 EDT (Sun, 15 Jul 2012)
@@ -142,13 +142,11 @@
 
   // Run at most one operation. May block.
   BOOST_ASIO_DECL std::size_t do_run_one(mutex::scoped_lock& lock,
- thread_info& this_thread, op_queue<operation>& private_op_queue,
- const boost::system::error_code& ec);
+ thread_info& this_thread, const boost::system::error_code& ec);
 
   // Poll for at most one operation.
   BOOST_ASIO_DECL std::size_t do_poll_one(mutex::scoped_lock& lock,
- op_queue<operation>& private_op_queue,
- const boost::system::error_code& ec);
+ thread_info& this_thread, const boost::system::error_code& ec);
 
   // Stop the task and all idle threads.
   BOOST_ASIO_DECL void stop_all_threads(mutex::scoped_lock& lock);


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk