Hello All!
I`d like to ask for your kind advice regarding boost::mutex behaviour.
I dont see what i`m doing wrong here. The issue is that at some point a mutex used to syncronize access to std::queue becomes locked on it's own, despite being explicitly unlocked before. I can repeat this more or less consistently. I triple checked the logic behind code and it seems correct to me.
Here is the code and some explanations (i build and test under windows):
Class itself:
#pragma once
#include <queue>
// simple std::queue based boost::mutex syncronized queue
template <typename T> class SyncedQueue : boost::noncopyable
{
private:
std::queue<T> queue;
// this is intended to be locked when queue is empty
boost::mutex emptyIndicator;
//this is intended to be locked when queue is being modified
boost::mutex accessGuard;
public:
SyncedQueue(const SyncedQueue<T> &)
{
//no copy constructor since we got mutexes
}
SyncedQueue()
{
// Queue is empty at start
emptyIndicator.lock();
}
bool push ( T object )
{
accessGuard.lock();
queue.push(object);
// if someone has been waiting for data to become available in queue - let them go on
emptyIndicator.unlock();
accessGuard.unlock();
return true;
}
bool pop ( T& object )
{
// See if data is available, wait to aquire lock otherwise
emptyIndicator.lock();
// OK, somethings in the queue now, we can go on
accessGuard.lock();
object = queue.front();
queue.pop();
// If there is anything left in the queue - release emptyIndicator to notify other poppers that they can go on
if (queue.size() > 0)
{
emptyIndicator.unlock();
}
accessGuard.unlock();
return true;
}
};
Here is the test Code:
#include "stdafx.h"
#include <string>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <include/threadpool.hpp>
#include "SyncedQueue.h"
// global variable for the queue
SyncedQueue< boost::shared_ptr < std::string > > syncedQueue;
void testWriteQueue()
{
for (int i = 0; i < 100; i++)
{
char szBuff[128];
boost::shared_ptr< std::string > msgPtr;
std::string tmpString = "Message";
tmpString += itoa(i,szBuff, 10);
msgPtr.reset(new std::string(tmpString));
syncedQueue.push(msgPtr);
std::cout << "Sent " << msgPtr->c_str() << std::endl;
}
// After this thread completes, reader thread sometimes deadlocks!
std::cout << "Sent all" << std::endl;
}
void testReadQueue()
{
int recievedmessnum = 0;
while(recievedmessnum != 100)
{
boost::shared_ptr< std::string > recievedMsgPtr;
// After this writer thread completes, it sometimes sometimes deadlocks trying to aqurie emptyIndicator!
syncedQueue.pop(recievedMsgPtr);
std::cout << "Recieved " << recievedMsgPtr->c_str() << std::endl;
recievedmessnum++;
}
std::cout << "Recieved all" << std::endl;
}
int main(int argc, char** argv)
{
using namespace boost::threadpool;
pool tp(2);
tp.schedule(&testWriteQueue);
tp.schedule(&testReadQueue);
tp.wait();
std::cout << "Test complete" << std::endl;
}
Thanks in advance,
Ilya.