Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r86160 - in trunk/boost/sync/detail: . event
From: andrey.semashev_at_[hidden]
Date: 2013-10-04 15:02:11


Author: andysem
Date: 2013-10-04 15:02:11 EDT (Fri, 04 Oct 2013)
New Revision: 86160
URL: http://svn.boost.org/trac/boost/changeset/86160

Log:
Reworked futex auto-reset event to eliminate missed posts problem.

Text files modified:
   trunk/boost/sync/detail/event/event_futex.hpp | 173 ++++++++++++++++++++++++++++++---------
   trunk/boost/sync/detail/futex.hpp | 6
   2 files changed, 136 insertions(+), 43 deletions(-)

Modified: trunk/boost/sync/detail/event/event_futex.hpp
==============================================================================
--- trunk/boost/sync/detail/event/event_futex.hpp Fri Oct 4 06:46:53 2013 (r86159)
+++ trunk/boost/sync/detail/event/event_futex.hpp 2013-10-04 15:02:11 EDT (Fri, 04 Oct 2013) (r86160)
@@ -1,6 +1,7 @@
 // event.hpp, futex-based event
 //
 // Copyright (C) 2013 Tim Blechmann
+// Copyright (C) 2013 Andrey Semashev
 //
 // Distributed under the Boost Software License, Version 1.0. (See
 // accompanying file LICENSE_1_0.txt or copy at
@@ -30,47 +31,101 @@
     BOOST_DELETED_FUNCTION(auto_reset_event(auto_reset_event const&));
     BOOST_DELETED_FUNCTION(auto_reset_event& operator= (auto_reset_event const&));
 
+private:
+ // State bits are divided into post count and waiter count. Post counter is needed to wake
+ // the correct number of threads blocked on the event in case if multiple concurrent posts are made.
+ enum
+ {
+ post_count_lowest_bit = 22u,
+ post_count_one = 1u << post_count_lowest_bit,
+ post_count_mask = 0u - post_count_one,
+ wait_count_mask = (~0u) ^ post_count_mask
+ };
+
 public:
- auto_reset_event() BOOST_NOEXCEPT :
- m_state(0)
+ auto_reset_event() BOOST_NOEXCEPT : m_state(0)
     {
     }
 
     void post() BOOST_NOEXCEPT
     {
- if (m_state.exchange(1, detail::atomic_ns::memory_order_release) == 0)
+ unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
+ unsigned int waiters, posts;
+ while (true)
         {
- sync::detail::linux_::futex_signal(reinterpret_cast< int* >(&m_state));
+ waiters = old_state & wait_count_mask;
+ posts = old_state >> post_count_lowest_bit;
+ if (waiters >= posts)
+ {
+ if (m_state.compare_exchange_weak(old_state, old_state + post_count_one, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
+ break;
+
+ detail::pause();
+ }
+ else
+ return; // the event is already set (enough times so that all waiters are released and the event is still left signalled)
         }
+
+ if (waiters > 0)
+ sync::detail::linux_::futex_signal(reinterpret_cast< int* >(&m_state));
     }
 
     void wait() BOOST_NOEXCEPT
     {
- while (m_state.exchange(0, detail::atomic_ns::memory_order_acq_rel) == 0)
+ // Try the fast path first
+ if (this->try_wait())
+ return;
+
+ // Add one waiter
+ unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel);
+ while (true)
         {
- again:
- const int status = sync::detail::linux_::futex_wait(reinterpret_cast< int* >(&m_state), 0);
- if (status != 0)
+ unsigned int posts = old_state >> post_count_lowest_bit;
+ if (posts == 0)
             {
- const int err = errno;
- switch (err)
+ again:
+ const int status = sync::detail::linux_::futex_wait(reinterpret_cast< int* >(&m_state), old_state);
+ if (status != 0)
                 {
- case EINTR: // signal received
- goto again; // skip xchg
-
- case EWOULDBLOCK: // another thread changed the state, retry
- continue;
-
- default:
- BOOST_ASSERT(false);
+ const int err = errno;
+ switch (err)
+ {
+ case EINTR: // signal received
+ goto again;
+
+ case EWOULDBLOCK: // another thread changed the state
+ break;
+
+ default:
+ BOOST_ASSERT(false);
+ }
                 }
+
+ old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
+ posts = old_state >> post_count_lowest_bit;
+ if (posts == 0)
+ goto again;
             }
+
+ // Remove one post and one waiter from the counters
+ if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
+ break;
         }
     }
 
     bool try_wait() BOOST_NOEXCEPT
     {
- return m_state.exchange(0, detail::atomic_ns::memory_order_acq_rel) != 0;
+ unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
+
+ for (unsigned int posts = old_state >> post_count_lowest_bit; posts > 0; posts = old_state >> post_count_lowest_bit)
+ {
+ if (m_state.compare_exchange_weak(old_state, old_state - post_count_one, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
+ return true;
+
+ detail::pause();
+ }
+
+ return false;
     }
 
     template <typename Duration>
@@ -89,36 +144,74 @@
 private:
     bool do_wait_for(const struct timespec & timeout)
     {
- while (m_state.exchange(0, detail::atomic_ns::memory_order_acq_rel) == 0)
+ // Try the fast path first
+ if (this->try_wait())
+ return true;
+
+ // Add one waiter
+ unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel);
+ while (true)
         {
- again:
- const int status = sync::detail::linux_::futex_timedwait(reinterpret_cast< int* >(&m_state), 0, &timeout);
- if (status != 0)
+ unsigned int posts = old_state >> post_count_lowest_bit;
+ if (posts == 0)
             {
- const int err = errno;
- switch (err)
+ again:
+ const int status = sync::detail::linux_::futex_timedwait(reinterpret_cast< int* >(&m_state), old_state, &timeout);
+ if (status != 0)
                 {
- case ETIMEDOUT:
- return false;
-
- case EINTR: // signal received
- goto again; // skip xchg
-
- case EWOULDBLOCK: // another thread changed the state, retry
- continue;
-
- default:
- BOOST_ASSERT(false);
+ const int err = errno;
+ switch (err)
+ {
+ case ETIMEDOUT:
+ old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
+ while (true)
+ {
+ posts = old_state >> post_count_lowest_bit;
+ if (posts == 0)
+ {
+ // Remove one waiter
+ if (m_state.compare_exchange_weak(old_state, old_state - 1u, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
+ return false;
+ }
+ else
+ {
+ // Remove one post and one waiter from the counters
+ if (m_state.compare_exchange_weak(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
+ return true;
+ }
+
+ detail::pause();
+ }
+ break;
+
+ case EINTR: // signal received
+ goto again;
+
+ case EWOULDBLOCK: // another thread changed the state
+ break;
+
+ default:
+ BOOST_ASSERT(false);
+ }
                 }
+
+ old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
+ posts = old_state >> post_count_lowest_bit;
+ if (posts == 0)
+ goto again;
             }
+
+ // Remove one post and one waiter from the counters
+ if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
+ break;
         }
 
         return true;
     }
 
 private:
- BOOST_STATIC_ASSERT_MSG(sizeof(detail::atomic_ns::atomic<int>) == sizeof(int), "Boost.Sync: unexpected size of atomic<int>");
- detail::atomic_ns::atomic<int> m_state;
+ BOOST_STATIC_ASSERT_MSG(sizeof(detail::atomic_ns::atomic< unsigned int >) == sizeof(int), "Boost.Sync: unexpected size of atomic< unsigned int >");
+ detail::atomic_ns::atomic< unsigned int > m_state;
 };
 
 class manual_reset_event
@@ -209,8 +302,8 @@
     }
 
 private:
- BOOST_STATIC_ASSERT_MSG(sizeof(detail::atomic_ns::atomic<int>) == sizeof(int), "Boost.Sync: unexpected size of atomic<int>");
- detail::atomic_ns::atomic<int> m_state;
+ BOOST_STATIC_ASSERT_MSG(sizeof(detail::atomic_ns::atomic< int >) == sizeof(int), "Boost.Sync: unexpected size of atomic< int >");
+ detail::atomic_ns::atomic< int > m_state;
 };
 
 }

Modified: trunk/boost/sync/detail/futex.hpp
==============================================================================
--- trunk/boost/sync/detail/futex.hpp Fri Oct 4 06:46:53 2013 (r86159)
+++ trunk/boost/sync/detail/futex.hpp 2013-10-04 15:02:11 EDT (Fri, 04 Oct 2013) (r86160)
@@ -91,8 +91,8 @@
     );
 }
 
-//! Wakes one thread waiting on the futex
-BOOST_FORCEINLINE int futex_signal(int* pval) BOOST_NOEXCEPT
+//! Wakes the specified number of threads waiting on the futex
+BOOST_FORCEINLINE int futex_signal(int* pval, int count = 1) BOOST_NOEXCEPT
 {
     return futex_invoke
     (
@@ -102,7 +102,7 @@
 #else
         FUTEX_WAKE,
 #endif
- 1
+ count
     );
 }
 


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk