Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r79897 - in trunk/boost/interprocess: detail ipc
From: igaztanaga_at_[hidden]
Date: 2012-08-07 05:11:27


Author: igaztanaga
Date: 2012-08-07 05:11:27 EDT (Tue, 07 Aug 2012)
New Revision: 79897
URL: http://svn.boost.org/trac/boost/changeset/79897

Log:
Reimplemented `message_queue` with a circular buffer index
Text files modified:
   trunk/boost/interprocess/detail/workaround.hpp | 11 +
   trunk/boost/interprocess/ipc/message_queue.hpp | 303 ++++++++++++++++++++++++++++++++-------
   2 files changed, 258 insertions(+), 56 deletions(-)

Modified: trunk/boost/interprocess/detail/workaround.hpp
==============================================================================
--- trunk/boost/interprocess/detail/workaround.hpp (original)
+++ trunk/boost/interprocess/detail/workaround.hpp 2012-08-07 05:11:27 EDT (Tue, 07 Aug 2012)
@@ -27,7 +27,7 @@
          #define BOOST_INTERPROCESS_POSIX_PROCESS_SHARED
       #endif
    #endif
-
+
    #if defined(_POSIX_BARRIERS) && ((_POSIX_BARRIERS - 0) > 0)
       #define BOOST_INTERPROCESS_POSIX_BARRIERS
    #endif
@@ -39,7 +39,7 @@
       #endif
    //Some platforms have a limited (name length) named semaphore support
    #elif (defined(__FreeBSD__) && (__FreeBSD__ >= 4)) || defined(__APPLE__)
- #define BOOST_INTERPROCESS_POSIX_NAMED_SEMAPHORES
+ #define BOOST_INTERPROCESS_POSIX_NAMED_SEMAPHORES
    #endif
 
    #if ((defined _V6_ILP32_OFFBIG) &&(_V6_ILP32_OFFBIG - 0 > 0)) ||\
@@ -130,6 +130,13 @@
    #define BOOST_INTERPROCESS_TIMEOUT_WHEN_LOCKING_DURATION_MS 10000
 #endif
 
+//Other switches
+//BOOST_INTERPROCESS_MSG_QUEUE_USES_CIRC_INDEX
+//message queue uses a circular queue as index instead of an array (better performance)
+//Boost version < 1.52 uses an array, so undef this if you want to communicate
+//with processes compiled with those versions.
+#define BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX
+
 #include <boost/interprocess/detail/config_end.hpp>
 
 #endif //#ifndef BOOST_INTERPROCESS_DETAIL_WORKAROUND_HPP

Modified: trunk/boost/interprocess/ipc/message_queue.hpp
==============================================================================
--- trunk/boost/interprocess/ipc/message_queue.hpp (original)
+++ trunk/boost/interprocess/ipc/message_queue.hpp 2012-08-07 05:11:27 EDT (Tue, 07 Aug 2012)
@@ -99,7 +99,7 @@
 
    //!Sends a message stored in buffer "buffer" with size "buffer_size" in the
    //!message queue with priority "priority". If the message queue is full
- //!the sender is blocked. Throws interprocess_error on error.*/
+ //!the sender is blocked. Throws interprocess_error on error.
    void send (const void *buffer, size_type buffer_size,
               unsigned int priority);
 
@@ -161,7 +161,7 @@
    //!Returns false on error. Never throws
    static bool remove(const char *name);
 
- /// @cond
+ /// @cond
    private:
    typedef boost::posix_time::ptime ptime;
    bool do_receive(block_t block,
@@ -227,9 +227,27 @@
 //!
 //!-> offset_ptr<msg_hdr_t> index [max_num_msg]
 //! An array of pointers with size "max_num_msg" called index. Each pointer
-//! points to a preallocated message. The elements of this array are
+//! points to a preallocated message. Elements of this array are
 //! reordered in runtime in the following way:
 //!
+//! IF BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is defined:
+//!
+//! When the current number of messages is "cur_num_msg", the array
+//! is treated like a circular buffer. Starting from position "cur_first_msg"
+//! "cur_num_msg" in a circular way, pointers point to inserted messages and the rest
+//! point to free messages. Those "cur_num_msg" pointers are
+//! ordered by the priority of the pointed message and by insertion order
+//! if two messages have the same priority. So the next message to be
+//! used in a "receive" is pointed by index [(cur_first_msg + cur_num_msg-1)%max_num_msg]
+//! and the first free message ready to be used in a "send" operation is
+//! [cur_first_msg] if circular buffer is extended from front,
+//! [(cur_first_msg + cur_num_msg)%max_num_msg] otherwise.
+//!
+//! This transforms the index in a circular buffer with an embedded free
+//! message queue.
+//!
+//! ELSE (BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX is NOT defined):
+//!
 //! When the current number of messages is "cur_num_msg", the first
 //! "cur_num_msg" pointers point to inserted messages and the rest
 //! point to free messages. The first "cur_num_msg" pointers are
@@ -237,11 +255,12 @@
 //! if two messages have the same priority. So the next message to be
 //! used in a "receive" is pointed by index [cur_num_msg-1] and the first free
 //! message ready to be used in a "send" operation is index [cur_num_msg].
-//! This transforms index in a fixed size priority queue with an embedded free
+//!
+//! This transforms the index in a fixed size priority queue with an embedded free
 //! message queue.
 //!
 //!-> struct message_t
-//! {
+//! {
 //! msg_hdr_t header;
 //! char[max_msg_size] data;
 //! } messages [max_num_msg];
@@ -252,7 +271,7 @@
 template<class VoidPointer>
 class mq_hdr_t
    : public ipcdetail::priority_functor<VoidPointer>
-{
+{
    typedef VoidPointer void_pointer;
    typedef msg_hdr_t<void_pointer> msg_header;
    typedef typename boost::intrusive::
@@ -269,17 +288,16 @@
    //!Constructor. This object must be constructed in the beginning of the
    //!shared memory of the size returned by the function "get_mem_size".
    //!This constructor initializes the needed resources and creates
- //!the internal structures like the priority index. This can throw.*/
+ //!the internal structures like the priority index. This can throw.
    mq_hdr_t(size_type max_num_msg, size_type max_msg_size)
       : m_max_num_msg(max_num_msg),
          m_max_msg_size(max_msg_size),
          m_cur_num_msg(0)
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ ,m_cur_first_msg(0u)
+ #endif
       { this->initialize_memory(); }
 
- //!Returns the inserted message with top priority
- msg_header * top_msg()
- { return mp_index[m_cur_num_msg-1].get(); }
-
    //!Returns true if the message queue is full
    bool is_full() const
       { return m_cur_num_msg == m_max_num_msg; }
@@ -292,24 +310,196 @@
    void free_top_msg()
       { --m_cur_num_msg; }
 
- //!Returns the first free msg of the free message queue
- msg_header * free_msg()
- { return mp_index[m_cur_num_msg].get(); }
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+
+ typedef msg_hdr_ptr_t *iterator;
+
+ size_type end_pos() const
+ {
+ const size_type space_until_bufend = m_max_num_msg - m_cur_first_msg;
+ return space_until_bufend > m_cur_num_msg
+ ? m_cur_first_msg + m_cur_num_msg : m_cur_num_msg - space_until_bufend;
+ }
+
+ //!Returns the inserted message with top priority
+ msg_header &top_msg()
+ {
+ size_type pos = this->end_pos();
+ return *mp_index[pos ? --pos : m_max_num_msg - 1];
+ }
+
+ //!Returns the inserted message with bottom priority
+ msg_header &bottom_msg()
+ { return *mp_index[m_cur_first_msg]; }
+
+ iterator inserted_ptr_begin() const
+ { return &mp_index[m_cur_first_msg]; }
+
+ iterator inserted_ptr_end() const
+ { return &mp_index[this->end_pos()]; }
+
+ iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
+ {
+ iterator begin(this->inserted_ptr_begin()), end(this->inserted_ptr_end());
+ if(end < begin){
+ iterator idx_end = &mp_index[m_max_num_msg];
+ iterator ret = std::lower_bound(begin, idx_end, value, func);
+ if(idx_end == ret){
+ iterator idx_beg = &mp_index[0];
+ ret = std::lower_bound(idx_beg, end, value, func);
+ //sanity check, these cases should not call lower_bound (optimized out)
+ assert(ret != end);
+ assert(ret != begin);
+ return ret;
+ }
+ else{
+ return ret;
+ }
+ }
+ else{
+ return std::lower_bound(begin, end, value, func);
+ }
+ }
+
+ msg_header & insert_at(iterator where)
+ {
+ iterator inserted_ptr_end = this->inserted_ptr_end();
+ iterator inserted_ptr_beg = this->inserted_ptr_begin();
+ if(where == inserted_ptr_end){
+ ++m_cur_num_msg;
+ return **inserted_ptr_end;
+ }
+ else if(where == inserted_ptr_beg){
+ //unsigned integer guarantees underflow
+ m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
+ --m_cur_first_msg;
+ ++m_cur_num_msg;
+ return *mp_index[m_cur_first_msg];
+ }
+ else{
+ size_type pos = where - &mp_index[0];
+ size_type circ_pos = pos >= m_cur_first_msg ? pos - m_cur_first_msg : pos + (m_max_num_msg - m_cur_first_msg);
+ //Check if it's more efficient to move back or move front
+ if(circ_pos < m_cur_num_msg/2){
+ //The queue can't be full so m_cur_num_msg == 0 or m_cur_num_msg <= pos
+ //indicates two step insertion
+ if(!pos){
+ pos = m_max_num_msg;
+ where = &mp_index[m_max_num_msg-1];
+ }
+ else{
+ --where;
+ }
+ const bool unique_segment = m_cur_first_msg && m_cur_first_msg <= pos;
+ size_type first_segment_beg = unique_segment ? m_cur_first_msg : 1u;
+ size_type first_segment_end = pos;
+ size_type second_segment_beg = unique_segment || !m_cur_first_msg ? m_max_num_msg : m_cur_first_msg;
+ size_type second_segment_end = m_max_num_msg;
+ const msg_hdr_ptr_t backup = *(&mp_index[0] + (unique_segment ? first_segment_beg : second_segment_beg) - 1);
+
+ //First segment
+ if(!unique_segment){
+ std::copy( &mp_index[0] + second_segment_beg
+ , &mp_index[0] + second_segment_end
+ , &mp_index[0] + second_segment_beg - 1);
+ mp_index[m_max_num_msg-1] = mp_index[0];
+ }
+ std::copy( &mp_index[0] + first_segment_beg
+ , &mp_index[0] + first_segment_end
+ , &mp_index[0] + first_segment_beg - 1);
+ *where = backup;
+ m_cur_first_msg = m_cur_first_msg ? m_cur_first_msg : m_max_num_msg;
+ --m_cur_first_msg;
+ ++m_cur_num_msg;
+ return **where;
+ }
+ else{
+ //The queue can't be full so end_pos < m_cur_first_msg
+ //indicates two step insertion
+ const size_type end_pos = this->end_pos();
+ const bool unique_segment = pos < end_pos;
+ size_type first_segment_beg = pos;
+ size_type first_segment_end = unique_segment ? end_pos : m_max_num_msg-1;
+ size_type second_segment_beg = 0u;
+ size_type second_segment_end = unique_segment ? 0u : end_pos;
+ const msg_hdr_ptr_t backup = *inserted_ptr_end;
+
+ //First segment
+ if(!unique_segment){
+ std::copy_backward( &mp_index[0] + second_segment_beg
+ , &mp_index[0] + second_segment_end
+ , &mp_index[0] + second_segment_end + 1);
+ mp_index[0] = mp_index[m_max_num_msg-1];
+ }
+ std::copy_backward( &mp_index[0] + first_segment_beg
+ , &mp_index[0] + first_segment_end
+ , &mp_index[0] + first_segment_end + 1);
+ *where = backup;
+ ++m_cur_num_msg;
+ return **where;
+ }
+ }
+ }
+
+ #else
+
+ typedef msg_hdr_ptr_t *iterator;
+
+ //!Returns the inserted message with top priority
+ msg_header &top_msg()
+ { return *mp_index[m_cur_num_msg-1]; }
+
+ //!Returns the inserted message with bottom priority
+ msg_header &bottom_msg()
+ { return *mp_index[0]; }
+
+ iterator inserted_ptr_begin() const
+ { return &mp_index[0]; }
+
+ iterator inserted_ptr_end() const
+ { return &mp_index[m_cur_num_msg]; }
+
+ iterator lower_bound(const msg_hdr_ptr_t & value, priority_functor<VoidPointer> func)
+ { return std::lower_bound(this->inserted_ptr_begin(), this->inserted_ptr_end(), value, func); }
+
+ msg_header & insert_at(iterator pos)
+ {
+ const msg_hdr_ptr_t backup = *inserted_ptr_end();
+ std::copy_backward(pos, inserted_ptr_end(), inserted_ptr_end()+1);
+ *pos = backup;
+ ++m_cur_num_msg;
+ return **pos;
+ }
+
+ #endif
 
    //!Inserts the first free message in the priority queue
- void queue_free_msg()
- {
- //Get free msg
- msg_hdr_ptr_t free = mp_index[m_cur_num_msg];
+ msg_header & queue_free_msg(unsigned int priority)
+ {
       //Get priority queue's range
- msg_hdr_ptr_t *it = &mp_index[0], *it_end = &mp_index[m_cur_num_msg];
- //Check where the free message should be placed
- it = std::lower_bound(it, it_end, free, static_cast<priority_functor<VoidPointer>&>(*this));
- //Make room in that position
- std::copy_backward(it, it_end, it_end+1);
+ iterator it (inserted_ptr_begin()), it_end(inserted_ptr_end());
+ //Optimize for non-priority usage
+ if(m_cur_num_msg && priority > this->bottom_msg().priority){
+ //Check for higher priority than all stored messages
+ if(priority > this->top_msg().priority){
+ it = it_end;
+ }
+ else{
+ //Since we don't now which free message we will pick
+ //build a dummy header for searches
+ msg_header dummy_hdr;
+ dummy_hdr.priority = priority;
+
+ //Get free msg
+ msg_hdr_ptr_t dummy_ptr(&dummy_hdr);
+
+ //Check where the free message should be placed
+ it = this->lower_bound(dummy_ptr, static_cast<priority_functor<VoidPointer>&>(*this));
+ }
+
+ }
       //Insert the free message in the correct position
- *it = free;
- ++m_cur_num_msg;
+ return this->insert_at(it);
    }
 
    //!Returns the number of bytes needed to construct a message queue with
@@ -345,7 +535,7 @@
 
       //Pointer to the first message header
       msg_header *msg_hdr = reinterpret_cast<msg_header*>
- (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
+ (reinterpret_cast<char*>(this)+r_hdr_size+r_index_size);
 
       //Initialize the pointer to the index
       mp_index = index;
@@ -373,6 +563,10 @@
    interprocess_condition m_cond_recv;
    //Condition block senders when the queue is full
    interprocess_condition m_cond_send;
+ #if defined(BOOST_INTERPROCESS_MSG_QUEUE_CIRCULAR_INDEX)
+ //Current start offset in the circular index
+ size_type m_cur_first_msg;
+ #endif
 };
 
 
@@ -403,7 +597,7 @@
             new (mptr) mq_hdr_t<VoidPointer>(m_maxmsg, m_maxmsgsize);
          }
          BOOST_CATCH(...){
- return false;
+ return false;
          }
          BOOST_CATCH_END
       }
@@ -530,30 +724,32 @@
                      break;
                   }
                }
+
                while (p_hdr->is_full());
             break;
             default:
             break;
          }
       }
-
+
       //Get the first free message from free message queue
- ipcdetail::msg_hdr_t<VoidPointer> *free_msg = p_hdr->free_msg();
- if (free_msg == 0) {
- throw interprocess_exception("boost::interprocess::message_queue corrupted");
- }
+ //ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->first_free_msg_hdr();
+
+// bool was_empty = p_hdr->is_empty();
+ //Insert the first free message in the priority queue
+ ipcdetail::msg_hdr_t<VoidPointer> &free_msg_hdr = p_hdr->queue_free_msg(priority);
+
+ //Sanity check, free msgs are always cleaned when received
+ assert(free_msg_hdr.priority == 0);
+ assert(free_msg_hdr.len == 0);
 
       //Copy control data to the free message
- free_msg->priority = priority;
- free_msg->len = buffer_size;
+ free_msg_hdr.priority = priority;
+ free_msg_hdr.len = buffer_size;
 
       //Copy user buffer to the message
- std::memcpy(free_msg->data(), buffer, buffer_size);
+ std::memcpy(free_msg_hdr.data(), buffer, buffer_size);
 
-// bool was_empty = p_hdr->is_empty();
- //Insert the first free message in the priority queue
- p_hdr->queue_free_msg();
-
       //If this message changes the queue empty state, notify it to receivers
 // if (was_empty){
          p_hdr->m_cond_recv.notify_one();
@@ -635,20 +831,19 @@
          }
       }
 
- //Thre is at least message ready to pick, get the top one
- ipcdetail::msg_hdr_t<VoidPointer> *top_msg = p_hdr->top_msg();
-
- //Paranoia check
- if (top_msg == 0) {
- throw interprocess_exception("boost::interprocess::message_queue corrupted");
- }
+ //There is at least one message ready to pick, get the top one
+ ipcdetail::msg_hdr_t<VoidPointer> &top_msg = p_hdr->top_msg();
 
       //Get data from the message
- recvd_size = top_msg->len;
- priority = top_msg->priority;
+ recvd_size = top_msg.len;
+ priority = top_msg.priority;
+
+ //Some cleanup to ease debugging
+ top_msg.len = 0;
+ top_msg.priority = 0;
 
       //Copy data to receiver's bufers
- std::memcpy(buffer, top_msg->data(), recvd_size);
+ std::memcpy(buffer, top_msg.data(), recvd_size);
 
 // bool was_full = p_hdr->is_full();
 
@@ -666,20 +861,20 @@
 
 template<class VoidPointer>
 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg() const
-{
+{
    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
    return p_hdr ? p_hdr->m_max_num_msg : 0; }
 
 template<class VoidPointer>
 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_max_msg_size() const
-{
+{
    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
- return p_hdr ? p_hdr->m_max_msg_size : 0;
+ return p_hdr ? p_hdr->m_max_msg_size : 0;
 }
 
 template<class VoidPointer>
 inline typename message_queue_t<VoidPointer>::size_type message_queue_t<VoidPointer>::get_num_msg()
-{
+{
    ipcdetail::mq_hdr_t<VoidPointer> *p_hdr = static_cast<ipcdetail::mq_hdr_t<VoidPointer>*>(m_shmem.get_user_address());
    if(p_hdr){
       //---------------------------------------------
@@ -688,7 +883,7 @@
       return p_hdr->m_cur_num_msg;
    }
 
- return 0;
+ return 0;
 }
 
 template<class VoidPointer>


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk