Boost logo

Boost Users :

From: vl01_at_[hidden]
Date: 2022-08-09 21:35:19


Hello List,

I'm using the boost::interprocess::message_queue in a project and to
verify everything is working I created a test case. In that test case
I'm sending and receiving data via threads.

The test passes on Linux, Windows with no sanitizer and also with
AddressSanitizer and ThreadSanitizer. I recently added Apple Silicon
(M1) to the test matrix and now the test run with ThreadSanitizer
enabled fails.

Is boost::interprocess::message_queue thread-safe?
Can the reported TSan error be an false-positive?
If it that case, what test can I run to verify that?

I looked at code of atomic_cas32() and it uses the legacy built-in
__sync_val_compare_and_swap().
Can it be that that built-in isn't supported on ARM?
Is there a newer built-in which should be used?

The reported TSan error:
-- snip
==================
WARNING: ThreadSanitizer: data race (pid=4146)
   Atomic write of size 4 at 0x000104af4028 by thread T2:
     #0 __tsan_atomic32_compare_exchange_val <null>:81245684
(libclang_rt.tsan_osx_dynamic.dylib:arm64e+0x57854)
     #1 boost::interprocess::ipcdetail::atomic_cas32(unsigned int
volatile*, unsigned int, unsigned int) atomic.hpp:581
(tsan_boost:arm64+0x100005b30)

   Previous read of size 4 at 0x000104af4028 by thread T1:
     #0 boost::interprocess::ipcdetail::spin_mutex::try_lock()
mutex.hpp:100 (tsan_boost:arm64+0x100006eb0)
     #1 void
boost::interprocess::ipcdetail::try_based_lock<boost::interprocess::ipcdetail::spin_mutex>(boost::interprocess::ipcdetail::spin_mutex&)
common_algorithms.hpp:62 (tsan_boost:arm64+0x100006e28)

SUMMARY: ThreadSanitizer: data race atomic.hpp:581 in
boost::interprocess::ipcdetail::atomic_cas32(unsigned int volatile*,
unsigned int, unsigned int)
-- snap

Here is a code example which demonstrates the test case
-- snip
#include <boost/interprocess/ipc/message_queue.hpp>

#include <iostream>
#include <vector>
#include <thread>
#include <memory>

struct MessageQueueImpl
{
   MessageQueueImpl()
   {
       boost::interprocess::permissions file_permissions;
       file_permissions.set_permissions(0660);
       queue =
std::make_unique<boost::interprocess::message_queue>(boost::interprocess::create_only,
"tsanqueue", 10, 15, file_permissions);
   }
   std::unique_ptr<boost::interprocess::message_queue> queue;
};

class MessageQueue
{
public:
   MessageQueue()
     : m_impl(std::make_unique<MessageQueueImpl>())
   {}

   void send(const std::vector<uint8_t>& buffer)
   {
     m_impl->queue->send(buffer.data(), buffer.size(), 0);
   }

   void receive(std::vector<uint8_t>& buffer)
   {
     boost::interprocess::message_queue::size_type received_size = 0;
     buffer.resize(m_impl->queue->get_max_msg_size());

     uint32_t prio;
     m_impl->queue->receive(buffer.data(), buffer.size(), received_size,
prio);
     buffer.resize(received_size);
   }

private:
   std::unique_ptr<MessageQueueImpl> m_impl;
};

int main()
{
   auto uut = std::make_unique<MessageQueue>();
   std::thread sender([&uut]{
     for(uint8_t i = 0; i < 3; ++i) {
       const std::vector<uint8_t> send_buffer = {i};
       uut->send(send_buffer);
     }
   });
   std::thread receiver([&uut]{
     for(uint8_t i = 0; i < 3; ++i) {
       std::vector<uint8_t> receive_buffer;
       uint32_t prio;
       uut->receive(receive_buffer);

       std::cout << "buffer received:\n";
       for (const auto i : receive_buffer) {
         std::cout << i << '\n';
       }
     }
   });

   if(sender.joinable()) {
     sender.join();
   }
   if(receiver.joinable()) {
     receiver.join();
   }
   return 0;
}
-- snap

Cheers,
  Jens


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net