Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r86316 - in trunk: boost/interprocess/ipc libs/interprocess/doc libs/interprocess/test
From: igaztanaga_at_[hidden]
Date: 2013-10-15 04:02:10


Author: igaztanaga
Date: 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013)
New Revision: 86316
URL: http://svn.boost.org/trac/boost/changeset/86316

Log:
Fixes #9221 ("message_queue deadlock on linux")

Text files modified:
   trunk/boost/interprocess/ipc/message_queue.hpp | 190 ++++++++++++++++++++++++++-------------
   trunk/libs/interprocess/doc/interprocess.qbk | 10 ++
   trunk/libs/interprocess/test/message_queue_test.cpp | 84 +++++++++++++++++
   3 files changed, 220 insertions(+), 64 deletions(-)

Modified: trunk/boost/interprocess/ipc/message_queue.hpp
==============================================================================
--- trunk/boost/interprocess/ipc/message_queue.hpp Tue Oct 15 01:23:53 2013 (r86315)
+++ trunk/boost/interprocess/ipc/message_queue.hpp 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013) (r86316)
@@ -306,6 +306,8 @@
          m_cur_num_msg(0)
          #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
          ,m_cur_first_msg(0u)
+ ,m_blocked_senders(0u)
+ ,m_blocked_receivers(0u)
          #endif
       { this->initialize_memory(); }
 
@@ -376,17 +378,17 @@
    {
       iterator it_inserted_ptr_end = this->inserted_ptr_end();
       iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
- if(where == it_inserted_ptr_end){
- ++m_cur_num_msg;
- return **it_inserted_ptr_end;
- }
- else if(where == it_inserted_ptr_beg){
+ if(where == it_inserted_ptr_beg){
          //unsigned integer guarantees underflow
          m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
          --m_cur_first_msg;
          ++m_cur_num_msg;
          return *mp_index[m_cur_first_msg];
       }
+ else if(where == it_inserted_ptr_end){
+ ++m_cur_num_msg;
+ return **it_inserted_ptr_end;
+ }
       else{
          size_type pos = where - &mp_index[0];
          size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
@@ -452,7 +454,7 @@
       }
    }
 
- #else
+ #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
 
    typedef msg_hdr_ptr_t *iterator;
 
@@ -482,7 +484,7 @@
       return **pos;
    }
 
- #endif
+ #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
 
    //!Inserts the first free message in the priority queue
    msg_header & queue_free_msg(unsigned int priority)
@@ -507,7 +509,7 @@
             //Check where the free message should be placed
             it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
          }
-
+ assert(0);
       }
       //Insert the free message in the correct position
       return this->insert_at(it);
@@ -577,6 +579,8 @@
    #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
    //Current start offset in the circular index
    size_type m_cur_first_msg;
+ size_type m_blocked_senders;
+ size_type m_blocked_receivers;
    #endif
 };
 
@@ -714,41 +718,67 @@
       throw interprocess_exception(size_error);
    }
 
- bool was_empty = false;
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ bool notify_blocked_receivers = false;
+ #endif
    //---------------------------------------------
    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
    //---------------------------------------------
    {
       //If the queue is full execute blocking logic
       if (p_hdr->is_full()) {
- switch(block){
- case non_blocking :
- return false;
- break;
-
- case blocking :
- do{
- p_hdr->m_cond_send.wait(lock);
- }
- while (p_hdr->is_full());
- break;
-
- case timed :
- do{
- if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
- if(p_hdr->is_full())
- return false;
- break;
+ BOOST_TRY{
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ ++p_hdr->m_blocked_senders;
+ #endif
+ switch(block){
+ case non_blocking :
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ return false;
+ break;
+
+ case blocking :
+ do{
+ p_hdr->m_cond_send.wait(lock);
+ }
+ while (p_hdr->is_full());
+ break;
+
+ case timed :
+ do{
+ if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
+ if(p_hdr->is_full()){
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ return false;
+ }
+ break;
+ }
                   }
- }
- while (p_hdr->is_full());
- break;
- default:
- break;
+ while (p_hdr->is_full());
+ break;
+ default:
+ break;
+ }
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ }
+ BOOST_CATCH(...){
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ BOOST_RETHROW;
          }
+ BOOST_CATCH_END
       }
 
- was_empty = p_hdr->is_empty();
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
+ #endif
       //Insert the first free message in the priority queue
       ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
 
@@ -767,9 +797,13 @@
    //Notify outside lock to avoid contention. This might produce some
    //spurious wakeups, but it's usually far better than notifying inside.
    //If this message changes the queue empty state, notify it to receivers
- if (was_empty){
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ if (notify_blocked_receivers){
       p_hdr->m_cond_recv.notify_one();
    }
+ #else
+ p_hdr->m_cond_recv.notify_one();
+ #endif
 
    return true;
 }
@@ -811,42 +845,70 @@
       throw interprocess_exception(size_error);
    }
 
- bool was_full = false;
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ bool notify_blocked_senders = false;
+ #endif
    //---------------------------------------------
    scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
    //---------------------------------------------
    {
       //If there are no messages execute blocking logic
       if (p_hdr->is_empty()) {
- switch(block){
- case non_blocking :
- return false;
- break;
-
- case blocking :
- do{
- p_hdr->m_cond_recv.wait(lock);
- }
- while (p_hdr->is_empty());
- break;
-
- case timed :
- do{
- if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
- if(p_hdr->is_empty())
- return false;
- break;
+ BOOST_TRY{
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ ++p_hdr->m_blocked_receivers;
+ #endif
+ switch(block){
+ case non_blocking :
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ return false;
+ break;
+
+ case blocking :
+ do{
+ p_hdr->m_cond_recv.wait(lock);
+ }
+ while (p_hdr->is_empty());
+ break;
+
+ case timed :
+ do{
+ if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
+ if(p_hdr->is_empty()){
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ return false;
+ }
+ break;
+ }
                   }
- }
- while (p_hdr->is_empty());
- break;
-
- //Paranoia check
- default:
- break;
+ while (p_hdr->is_empty());
+ break;
+
+ //Paranoia check
+ default:
+ break;
+ }
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ }
+ BOOST_CATCH(...){
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ BOOST_RETHROW;
          }
+ BOOST_CATCH_END
       }
 
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
+ #endif
+
       //There is at least one message ready to pick, get the top one
       ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
 
@@ -861,8 +923,6 @@
       //Copy data to receiver's bufers
       std::memcpy(buffer, top_msg.data(), recvd_size);
 
- was_full = p_hdr->is_full();
-
       //Free top message and put it in the free message list
       p_hdr->free_top_msg();
    } //Lock end
@@ -870,9 +930,13 @@
    //Notify outside lock to avoid contention. This might produce some
    //spurious wakeups, but it's usually far better than notifying inside.
    //If this reception changes the queue full state, notify senders
- if (was_full){
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ if (notify_blocked_senders){
       p_hdr->m_cond_send.notify_one();
    }
+ #else
+ p_hdr->m_cond_send.notify_one();
+ #endif
 
    return true;
 }

Modified: trunk/libs/interprocess/doc/interprocess.qbk
==============================================================================
--- trunk/libs/interprocess/doc/interprocess.qbk Tue Oct 15 01:23:53 2013 (r86315)
+++ trunk/libs/interprocess/doc/interprocess.qbk 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013) (r86316)
@@ -6713,6 +6713,16 @@
 
 [section:release_notes Release Notes]
 
+[section:release_notes_boost_1_56_00 Boost 1.56 Release]
+* Fixed bugs:
+ * [@https://svn.boost.org/trac/boost/ticket/9221 #9221 ("message_queue deadlock on linux")].
+
+* [*ABI breaking]: [@https://svn.boost.org/trac/boost/ticket/9221 #9221] showed
+ that `BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX` option of message queue,
+ was completely broken so a ABI break was necessary to have a working implementation.
+
+[endsect]
+
 [section:release_notes_boost_1_55_00 Boost 1.55 Release]
 
 * Fixed bugs [@https://svn.boost.org/trac/boost/ticket/7156 #7156],

Modified: trunk/libs/interprocess/test/message_queue_test.cpp
==============================================================================
--- trunk/libs/interprocess/test/message_queue_test.cpp Tue Oct 15 01:23:53 2013 (r86315)
+++ trunk/libs/interprocess/test/message_queue_test.cpp 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013) (r86316)
@@ -17,6 +17,7 @@
 #include <boost/interprocess/allocators/node_allocator.hpp>
 #include <boost/interprocess/detail/os_thread_functions.hpp>
 #include <vector>
+#include <iostream>
 #include <cstddef>
 #include <limits>
 #include <memory>
@@ -222,12 +223,12 @@
    return true;
 }
 //]
+
 static const int MsgSize = 10;
 static const int NumMsg = 1000;
 static char msgsend [10];
 static char msgrecv [10];
 
-
 static boost::interprocess::message_queue *pmessage_queue;
 
 void receiver()
@@ -267,6 +268,83 @@
    return true;
 }
 
+
+//////////////////////////////////////////////////////////////////////////////
+//
+// test_multi_sender_receiver is based on Alexander (aalutov's)
+// testcase for ticket #9221. Many thanks.
+//
+//////////////////////////////////////////////////////////////////////////////
+
+static boost::interprocess::message_queue *global_queue = 0;
+//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
+static const int MULTI_NUM_MSG_PER_SENDER = 10000;
+//Message queue message capacity
+static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
+//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
+static const int MULTI_THREAD_COUNT = 10;
+
+static void multisend()
+{
+ char buff;
+ for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
+ global_queue->send(&buff, 1, 0);
+ }
+ global_queue->send(&buff, 0, 0);
+ //std::cout<<"writer thread complete"<<std::endl;
+}
+
+static void multireceive()
+{
+ char buff;
+ size_t size;
+ int received_msgs = 0;
+ unsigned int priority;
+ do {
+ global_queue->receive(&buff, 1, size, priority);
+ ++received_msgs;
+ } while (size > 0);
+ --received_msgs;
+ //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
+}
+
+
+bool test_multi_sender_receiver()
+{
+ bool ret = true;
+ //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
+ try {
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ boost::interprocess::message_queue mq
+ (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
+ global_queue = &mq;
+ std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
+
+ //Launch senders receiver thread
+ for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+ boost::interprocess::ipcdetail::thread_launch
+ (threads[i], &multisend);
+ }
+
+ for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+ boost::interprocess::ipcdetail::thread_launch
+ (threads[MULTI_THREAD_COUNT+i], &multireceive);
+ }
+
+ for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
+ boost::interprocess::ipcdetail::thread_join(threads[i]);
+ //std::cout << "Joined thread " << i << std::endl;
+ }
+ }
+ catch (std::exception &e) {
+ std::cout << "error " << e.what() << std::endl;
+ ret = false;
+ }
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ return ret;
+}
+
+
 int main ()
 {
    if(!test_priority_order()){
@@ -281,6 +359,10 @@
       return 1;
    }
 
+ if(!test_multi_sender_receiver()){
+ return 1;
+ }
+
    return 0;
 }
 


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk