Boost logo

Boost Users :

From: Anteru (newsgroups_at_[hidden])
Date: 2008-04-08 12:43:18


Anthony Williams schrieb:
> It is not possible to detect a spurious wake: if it was, then the
> implementation would resume waiting. However, spurious wakes should be rare
> occurrences.

All right, maybe this could be added to the documentation somewhere.

> Can you show me some code?

Worker threads are supposed to die as soon as a null-job is encountered.
The worker threads call executeJob. It works with queueFull_.notifyAll
() in insertJob, but it fails with notifyOne -- note, this simply pass
through to the Boost::condition counterparts.

It used to work with a home-grown condition implementation, moreover, I
had a prototype in Python and this one worked too. The problem is that
the during the destructor, not all threads die properly (usually one to
two remain, if started with 4 threads on a dual-core) -- I can even
reproduce it nearly always. Adding some debug output usually makes it
work, so it does look like a race condition. I only observed a deadlock
if the following happened:
insertJob called
insertJob called
...
then, one thread remained waiting forever. However, if only one Job gets
inserted at a time it works. Tested on a Dual-Core, so it was really
concurrent. Note: It might be very well that it also deadlocks in more
cases, it's just *damn* difficult to observe it properly.

Only problem I had at the beginning was in the part queueFull_.wait
(lock);

if (jobQueue_.empty ())
{
        continue;
}

as one thread might have been waiting for the mutex_ and fast-tracks
through the executeJob loop without waiting and then it gets its job,
executes it while notifyOne has been called during insertJob which wakes
up a waiting thread /after/ the other thread removed the item, so the
thread has to re-check that the queue has still an item. Without this, a
thread might pop an empty queue.

class ThreadPool
{
private:
        std::queue <IJob::Ptr> jobQueue_;
        std::vector <WorkerThread::Ptr> workers_;
        Condition queueEmpty_, queueFull_;
        Mutex mutex_;
        uint threadCount_;
};

//////////////////////////////////////////////////////////////////////////
void ThreadPool::stopAllThreads ()
{
        for (int i = 0; i < threadCount_; ++i)
        {
                insertJob (IJob::Ptr ());
                waitFinished ();
        }
}

        //////////////////////////////////////////////////////////////////////////
void ThreadPool::insertJob (ThreadPool::IJob::Ptr job)
{
        Lock lock (mutex_);

        jobQueue_.push (job);
        queueFull_.notifyAll ();
// notifyOne leads to a deadlock, notify all
}

//////////////////////////////////////////////////////////////////////////
void ThreadPool::executeJob ()
{
        while (true)
        {
                IJob::Ptr job;

                {
                        Lock lock (mutex_);

                        if (jobQueue_.empty ())
                        {
                                queueFull_.wait (lock);
// Deadlocks here, with jobQueue_.empty () == false !

                                if (jobQueue_.empty ())
                                {
                                        continue;
                                }
                        }

                        job = jobQueue_.front ();
                        jobQueue_.pop ();
                }

                if (job.get () == 0)
                {
                        jobFinished ();
                        return;
                }
                else
                {
                        job->run ();
                        jobFinished ();
                }
        }
}

//////////////////////////////////////////////////////////////////////////
ThreadPool::ThreadPool (const uint threadCount)
        : threadCount_ (threadCount)
{
        for (uint i = 0; i < threadCount; ++i)
        {
                WorkerThread::Ptr w (new WorkerThread (this));
                workers_.push_back (w);
                w->run ();
        }
}

        //////////////////////////////////////////////////////////////////////////
ThreadPool::~ThreadPool ()
{
        waitFinished ();
// Get through, no work waiting
        stopAllThreads ();
        joinAllThreads ();
}

        //////////////////////////////////////////////////////////////////////////
void ThreadPool::joinAllThreads ()
{
        BOOST_FOREACH(Thread::Ptr t, workers_)
        {
                t->join ();
        }
}
        //////////////////////////////////////////////////////////////////////////
void ThreadPool::jobFinished ()
{
        Lock lock (mutex_);

        if (jobQueue_.empty ())
        {
                queueEmpty_.notifyAll ();
        }
}

        //////////////////////////////////////////////////////////////////////////
void ThreadPool::waitFinished ()
{
        Lock lock (mutex_);

        if (jobQueue_.empty ())
        {
                return;
        }
        else
        {
                queueEmpty_.wait (lock);
        }
}

Cheers,
   Anteru


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net