Boost logo

Boost Users :

Subject: [Boost-users] Need advice on std::queue syncronization using boost::mutex - maybe a bug in pthread?
From: Ilya Kalujny (kalujny_at_[hidden])
Date: 2008-12-12 07:24:46


Hello All!

I`d like to ask for your kind advice regarding boost::mutex behaviour.

I dont see what i`m doing wrong here. The issue is that at some point a
mutex used to syncronize access to std::queue becomes locked on it's own,
despite being explicitly unlocked before. I can repeat this more or less
consistently. I triple checked the logic behind code and it seems correct to
me.

Here is the code and some explanations (i build and test under windows):

Class itself:

#pragma once

#include <queue>

// simple std::queue based boost::mutex syncronized queue
template <typename T> class SyncedQueue : boost::noncopyable
{
private:

    std::queue<T> queue;

    // this is intended to be locked when queue is empty
    boost::mutex emptyIndicator;

    //this is intended to be locked when queue is being modified
    boost::mutex accessGuard;

public:

    SyncedQueue(const SyncedQueue<T> &)
    {
        //no copy constructor since we got mutexes
    }

    SyncedQueue()
    {

        // Queue is empty at start
        emptyIndicator.lock();
    }

    bool push ( T object )
    {
        accessGuard.lock();
        queue.push(object);

        // if someone has been waiting for data to become available in queue
- let them go on
        emptyIndicator.unlock();

        accessGuard.unlock();

        return true;
    }

    bool pop ( T& object )
    {

        // See if data is available, wait to aquire lock otherwise
        emptyIndicator.lock();

        // OK, somethings in the queue now, we can go on
        accessGuard.lock();

        object = queue.front();
        queue.pop();

        // If there is anything left in the queue - release emptyIndicator
to notify other poppers that they can go on
        if (queue.size() > 0)
        {
            emptyIndicator.unlock();
        }

        accessGuard.unlock();
        return true;
    }

};

Here is the test Code:

#include "stdafx.h"

#include <string>
#include <iostream>
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <include/threadpool.hpp>

#include "SyncedQueue.h"

// global variable for the queue
SyncedQueue< boost::shared_ptr < std::string > > syncedQueue;

void testWriteQueue()
{
    for (int i = 0; i < 100; i++)
    {
        char szBuff[128];
        boost::shared_ptr< std::string > msgPtr;
        std::string tmpString = "Message";
        tmpString += itoa(i,szBuff, 10);
        msgPtr.reset(new std::string(tmpString));
        syncedQueue.push(msgPtr);
        std::cout << "Sent " << msgPtr->c_str() << std::endl;
    }
    // After this thread completes, reader thread sometimes deadlocks!
    std::cout << "Sent all" << std::endl;
}

void testReadQueue()
{
    int recievedmessnum = 0;
    while(recievedmessnum != 100)
    {
        boost::shared_ptr< std::string > recievedMsgPtr;
        // After this writer thread completes, it sometimes sometimes
deadlocks trying to aqurie emptyIndicator!
        syncedQueue.pop(recievedMsgPtr);
        std::cout << "Recieved " << recievedMsgPtr->c_str() << std::endl;
        recievedmessnum++;
    }
    std::cout << "Recieved all" << std::endl;
}

int main(int argc, char** argv)
{
    using namespace boost::threadpool;
    pool tp(2);
    tp.schedule(&testWriteQueue);
    tp.schedule(&testReadQueue);
    tp.wait();
    std::cout << "Test complete" << std::endl;
}

Thanks in advance,
Ilya.



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net