|
Boost Users : |
From: Anteru (newsgroups_at_[hidden])
Date: 2008-04-08 12:43:18
Anthony Williams schrieb:
> It is not possible to detect a spurious wake: if it was, then the
> implementation would resume waiting. However, spurious wakes should be rare
> occurrences.
All right, maybe this could be added to the documentation somewhere.
> Can you show me some code?
Worker threads are supposed to die as soon as a null-job is encountered.
The worker threads call executeJob. It works with queueFull_.notifyAll
() in insertJob, but it fails with notifyOne -- note, this simply pass
through to the Boost::condition counterparts.
It used to work with a home-grown condition implementation, moreover, I
had a prototype in Python and this one worked too. The problem is that
the during the destructor, not all threads die properly (usually one to
two remain, if started with 4 threads on a dual-core) -- I can even
reproduce it nearly always. Adding some debug output usually makes it
work, so it does look like a race condition. I only observed a deadlock
if the following happened:
insertJob called
insertJob called
...
then, one thread remained waiting forever. However, if only one Job gets
inserted at a time it works. Tested on a Dual-Core, so it was really
concurrent. Note: It might be very well that it also deadlocks in more
cases, it's just *damn* difficult to observe it properly.
Only problem I had at the beginning was in the part queueFull_.wait
(lock);
if (jobQueue_.empty ())
{
continue;
}
as one thread might have been waiting for the mutex_ and fast-tracks
through the executeJob loop without waiting and then it gets its job,
executes it while notifyOne has been called during insertJob which wakes
up a waiting thread /after/ the other thread removed the item, so the
thread has to re-check that the queue has still an item. Without this, a
thread might pop an empty queue.
class ThreadPool
{
private:
std::queue <IJob::Ptr> jobQueue_;
std::vector <WorkerThread::Ptr> workers_;
Condition queueEmpty_, queueFull_;
Mutex mutex_;
uint threadCount_;
};
//////////////////////////////////////////////////////////////////////////
void ThreadPool::stopAllThreads ()
{
for (int i = 0; i < threadCount_; ++i)
{
insertJob (IJob::Ptr ());
waitFinished ();
}
}
//////////////////////////////////////////////////////////////////////////
void ThreadPool::insertJob (ThreadPool::IJob::Ptr job)
{
Lock lock (mutex_);
jobQueue_.push (job);
queueFull_.notifyAll ();
// notifyOne leads to a deadlock, notify all
}
//////////////////////////////////////////////////////////////////////////
void ThreadPool::executeJob ()
{
while (true)
{
IJob::Ptr job;
{
Lock lock (mutex_);
if (jobQueue_.empty ())
{
queueFull_.wait (lock);
// Deadlocks here, with jobQueue_.empty () == false !
if (jobQueue_.empty ())
{
continue;
}
}
job = jobQueue_.front ();
jobQueue_.pop ();
}
if (job.get () == 0)
{
jobFinished ();
return;
}
else
{
job->run ();
jobFinished ();
}
}
}
//////////////////////////////////////////////////////////////////////////
ThreadPool::ThreadPool (const uint threadCount)
: threadCount_ (threadCount)
{
for (uint i = 0; i < threadCount; ++i)
{
WorkerThread::Ptr w (new WorkerThread (this));
workers_.push_back (w);
w->run ();
}
}
//////////////////////////////////////////////////////////////////////////
ThreadPool::~ThreadPool ()
{
waitFinished ();
// Get through, no work waiting
stopAllThreads ();
joinAllThreads ();
}
//////////////////////////////////////////////////////////////////////////
void ThreadPool::joinAllThreads ()
{
BOOST_FOREACH(Thread::Ptr t, workers_)
{
t->join ();
}
}
//////////////////////////////////////////////////////////////////////////
void ThreadPool::jobFinished ()
{
Lock lock (mutex_);
if (jobQueue_.empty ())
{
queueEmpty_.notifyAll ();
}
}
//////////////////////////////////////////////////////////////////////////
void ThreadPool::waitFinished ()
{
Lock lock (mutex_);
if (jobQueue_.empty ())
{
return;
}
else
{
queueEmpty_.wait (lock);
}
}
Cheers,
Anteru
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net