|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r86316 - in trunk: boost/interprocess/ipc libs/interprocess/doc libs/interprocess/test
From: igaztanaga_at_[hidden]
Date: 2013-10-15 04:02:10
Author: igaztanaga
Date: 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013)
New Revision: 86316
URL: http://svn.boost.org/trac/boost/changeset/86316
Log:
Fixes #9221 ("message_queue deadlock on linux")
Text files modified:
trunk/boost/interprocess/ipc/message_queue.hpp | 190 ++++++++++++++++++++++++++-------------
trunk/libs/interprocess/doc/interprocess.qbk | 10 ++
trunk/libs/interprocess/test/message_queue_test.cpp | 84 +++++++++++++++++
3 files changed, 220 insertions(+), 64 deletions(-)
Modified: trunk/boost/interprocess/ipc/message_queue.hpp
==============================================================================
--- trunk/boost/interprocess/ipc/message_queue.hpp Tue Oct 15 01:23:53 2013 (r86315)
+++ trunk/boost/interprocess/ipc/message_queue.hpp 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013) (r86316)
@@ -306,6 +306,8 @@
m_cur_num_msg(0)
#if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
,m_cur_first_msg(0u)
+ ,m_blocked_senders(0u)
+ ,m_blocked_receivers(0u)
#endif
{ this->initialize_memory(); }
@@ -376,17 +378,17 @@
{
iterator it_inserted_ptr_end = this->inserted_ptr_end();
iterator it_inserted_ptr_beg = this->inserted_ptr_begin();
- if(where == it_inserted_ptr_end){
- ++m_cur_num_msg;
- return **it_inserted_ptr_end;
- }
- else if(where == it_inserted_ptr_beg){
+ if(where == it_inserted_ptr_beg){
//unsigned integer guarantees underflow
m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
--m_cur_first_msg;
++m_cur_num_msg;
return *mp_index[m_cur_first_msg];
}
+ else if(where == it_inserted_ptr_end){
+ ++m_cur_num_msg;
+ return **it_inserted_ptr_end;
+ }
else{
size_type pos = where - &mp_index[0];
size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
@@ -452,7 +454,7 @@
}
}
- #else
+ #else //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
typedef msg_hdr_ptr_t *iterator;
@@ -482,7 +484,7 @@
return **pos;
}
- #endif
+ #endif //BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
//!Inserts the first free message in the priority queue
msg_header & queue_free_msg(unsigned int priority)
@@ -507,7 +509,7 @@
//Check where the free message should be placed
it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
}
-
+ assert(0);
}
//Insert the free message in the correct position
return this->insert_at(it);
@@ -577,6 +579,8 @@
#if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
//Current start offset in the circular index
size_type m_cur_first_msg;
+ size_type m_blocked_senders;
+ size_type m_blocked_receivers;
#endif
};
@@ -714,41 +718,67 @@
throw interprocess_exception(size_error);
}
- bool was_empty = false;
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ bool notify_blocked_receivers = false;
+ #endif
//---------------------------------------------
scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
//---------------------------------------------
{
//If the queue is full execute blocking logic
if (p_hdr->is_full()) {
- switch(block){
- case non_blocking :
- return false;
- break;
-
- case blocking :
- do{
- p_hdr->m_cond_send.wait(lock);
- }
- while (p_hdr->is_full());
- break;
-
- case timed :
- do{
- if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
- if(p_hdr->is_full())
- return false;
- break;
+ BOOST_TRY{
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ ++p_hdr->m_blocked_senders;
+ #endif
+ switch(block){
+ case non_blocking :
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ return false;
+ break;
+
+ case blocking :
+ do{
+ p_hdr->m_cond_send.wait(lock);
+ }
+ while (p_hdr->is_full());
+ break;
+
+ case timed :
+ do{
+ if(!p_hdr->m_cond_send.timed_wait(lock, abs_time)){
+ if(p_hdr->is_full()){
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ return false;
+ }
+ break;
+ }
}
- }
- while (p_hdr->is_full());
- break;
- default:
- break;
+ while (p_hdr->is_full());
+ break;
+ default:
+ break;
+ }
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ }
+ BOOST_CATCH(...){
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ --p_hdr->m_blocked_senders;
+ #endif
+ BOOST_RETHROW;
}
+ BOOST_CATCH_END
}
- was_empty = p_hdr->is_empty();
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ notify_blocked_receivers = 0 != p_hdr->m_blocked_receivers;
+ #endif
//Insert the first free message in the priority queue
ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
@@ -767,9 +797,13 @@
//Notify outside lock to avoid contention. This might produce some
//spurious wakeups, but it's usually far better than notifying inside.
//If this message changes the queue empty state, notify it to receivers
- if (was_empty){
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ if (notify_blocked_receivers){
p_hdr->m_cond_recv.notify_one();
}
+ #else
+ p_hdr->m_cond_recv.notify_one();
+ #endif
return true;
}
@@ -811,42 +845,70 @@
throw interprocess_exception(size_error);
}
- bool was_full = false;
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ bool notify_blocked_senders = false;
+ #endif
//---------------------------------------------
scoped_lock<interprocess_mutex> lock(p_hdr->m_mutex);
//---------------------------------------------
{
//If there are no messages execute blocking logic
if (p_hdr->is_empty()) {
- switch(block){
- case non_blocking :
- return false;
- break;
-
- case blocking :
- do{
- p_hdr->m_cond_recv.wait(lock);
- }
- while (p_hdr->is_empty());
- break;
-
- case timed :
- do{
- if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
- if(p_hdr->is_empty())
- return false;
- break;
+ BOOST_TRY{
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ ++p_hdr->m_blocked_receivers;
+ #endif
+ switch(block){
+ case non_blocking :
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ return false;
+ break;
+
+ case blocking :
+ do{
+ p_hdr->m_cond_recv.wait(lock);
+ }
+ while (p_hdr->is_empty());
+ break;
+
+ case timed :
+ do{
+ if(!p_hdr->m_cond_recv.timed_wait(lock, abs_time)){
+ if(p_hdr->is_empty()){
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ return false;
+ }
+ break;
+ }
}
- }
- while (p_hdr->is_empty());
- break;
-
- //Paranoia check
- default:
- break;
+ while (p_hdr->is_empty());
+ break;
+
+ //Paranoia check
+ default:
+ break;
+ }
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ }
+ BOOST_CATCH(...){
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ --p_hdr->m_blocked_receivers;
+ #endif
+ BOOST_RETHROW;
}
+ BOOST_CATCH_END
}
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ notify_blocked_senders = 0 != p_hdr->m_blocked_senders;
+ #endif
+
//There is at least one message ready to pick, get the top one
ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
@@ -861,8 +923,6 @@
//Copy data to receiver's bufers
std::memcpy(buffer, top_msg.data(), recvd_size);
- was_full = p_hdr->is_full();
-
//Free top message and put it in the free message list
p_hdr->free_top_msg();
} //Lock end
@@ -870,9 +930,13 @@
//Notify outside lock to avoid contention. This might produce some
//spurious wakeups, but it's usually far better than notifying inside.
//If this reception changes the queue full state, notify senders
- if (was_full){
+ #ifdef BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+ if (notify_blocked_senders){
p_hdr->m_cond_send.notify_one();
}
+ #else
+ p_hdr->m_cond_send.notify_one();
+ #endif
return true;
}
Modified: trunk/libs/interprocess/doc/interprocess.qbk
==============================================================================
--- trunk/libs/interprocess/doc/interprocess.qbk Tue Oct 15 01:23:53 2013 (r86315)
+++ trunk/libs/interprocess/doc/interprocess.qbk 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013) (r86316)
@@ -6713,6 +6713,16 @@
[section:release_notes Release Notes]
+[section:release_notes_boost_1_56_00 Boost 1.56 Release]
+* Fixed bugs:
+ * [@https://svn.boost.org/trac/boost/ticket/9221 #9221 ("message_queue deadlock on linux")].
+
+* [*ABI breaking]: [@https://svn.boost.org/trac/boost/ticket/9221 #9221] showed
+ that `BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX` option of message queue,
+ was completely broken so a ABI break was necessary to have a working implementation.
+
+[endsect]
+
[section:release_notes_boost_1_55_00 Boost 1.55 Release]
* Fixed bugs [@https://svn.boost.org/trac/boost/ticket/7156 #7156],
Modified: trunk/libs/interprocess/test/message_queue_test.cpp
==============================================================================
--- trunk/libs/interprocess/test/message_queue_test.cpp Tue Oct 15 01:23:53 2013 (r86315)
+++ trunk/libs/interprocess/test/message_queue_test.cpp 2013-10-15 04:02:09 EDT (Tue, 15 Oct 2013) (r86316)
@@ -17,6 +17,7 @@
#include <boost/interprocess/allocators/node_allocator.hpp>
#include <boost/interprocess/detail/os_thread_functions.hpp>
#include <vector>
+#include <iostream>
#include <cstddef>
#include <limits>
#include <memory>
@@ -222,12 +223,12 @@
return true;
}
//]
+
static const int MsgSize = 10;
static const int NumMsg = 1000;
static char msgsend [10];
static char msgrecv [10];
-
static boost::interprocess::message_queue *pmessage_queue;
void receiver()
@@ -267,6 +268,83 @@
return true;
}
+
+//////////////////////////////////////////////////////////////////////////////
+//
+// test_multi_sender_receiver is based on Alexander (aalutov's)
+// testcase for ticket #9221. Many thanks.
+//
+//////////////////////////////////////////////////////////////////////////////
+
+static boost::interprocess::message_queue *global_queue = 0;
+//We'll send MULTI_NUM_MSG_PER_SENDER messages per sender
+static const int MULTI_NUM_MSG_PER_SENDER = 10000;
+//Message queue message capacity
+static const int MULTI_QUEUE_SIZE = (MULTI_NUM_MSG_PER_SENDER - 1)/MULTI_NUM_MSG_PER_SENDER + 1;
+//We'll launch MULTI_THREAD_COUNT senders and MULTI_THREAD_COUNT receivers
+static const int MULTI_THREAD_COUNT = 10;
+
+static void multisend()
+{
+ char buff;
+ for (int i = 0; i < MULTI_NUM_MSG_PER_SENDER; i++) {
+ global_queue->send(&buff, 1, 0);
+ }
+ global_queue->send(&buff, 0, 0);
+ //std::cout<<"writer thread complete"<<std::endl;
+}
+
+static void multireceive()
+{
+ char buff;
+ size_t size;
+ int received_msgs = 0;
+ unsigned int priority;
+ do {
+ global_queue->receive(&buff, 1, size, priority);
+ ++received_msgs;
+ } while (size > 0);
+ --received_msgs;
+ //std::cout << "reader thread complete, read msgs: " << received_msgs << std::endl;
+}
+
+
+bool test_multi_sender_receiver()
+{
+ bool ret = true;
+ //std::cout << "Testing multi-sender / multi-receiver " << std::endl;
+ try {
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ boost::interprocess::message_queue mq
+ (boost::interprocess::open_or_create, test::get_process_id_name(), MULTI_QUEUE_SIZE, 1);
+ global_queue = &mq;
+ std::vector<boost::interprocess::ipcdetail::OS_thread_t> threads(MULTI_THREAD_COUNT*2);
+
+ //Launch senders receiver thread
+ for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+ boost::interprocess::ipcdetail::thread_launch
+ (threads[i], &multisend);
+ }
+
+ for (int i = 0; i < MULTI_THREAD_COUNT; i++) {
+ boost::interprocess::ipcdetail::thread_launch
+ (threads[MULTI_THREAD_COUNT+i], &multireceive);
+ }
+
+ for (int i = 0; i < MULTI_THREAD_COUNT*2; i++) {
+ boost::interprocess::ipcdetail::thread_join(threads[i]);
+ //std::cout << "Joined thread " << i << std::endl;
+ }
+ }
+ catch (std::exception &e) {
+ std::cout << "error " << e.what() << std::endl;
+ ret = false;
+ }
+ boost::interprocess::message_queue::remove(test::get_process_id_name());
+ return ret;
+}
+
+
int main ()
{
if(!test_priority_order()){
@@ -281,6 +359,10 @@
return 1;
}
+ if(!test_multi_sender_receiver()){
+ return 1;
+ }
+
return 0;
}
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk