Boost logo

Boost Users :

From: Kirit Sælensminde (kirit.saelensminde_at_[hidden])
Date: 2007-05-09 01:25:25


I'm not entirely sure what's going on, but I seem to be having some
trouble with the predicate wait() where the predicate is true but the
waiting thread is still blocked. I'm sure I'm doing something daft so
here is a description with the relevant code in the hope that somebody
can point it out to me. I'm using MSVC 7.1 and Boost 1.34.0 on a
dual-core computer.

What I've done is to build on the basic boost::thread primitives to make
a pool of worker objects that can have tasks allocated to them. A good
starting point is the queuing of work done through this code (a member
of class Worker):

boost::shared_ptr< Join< void > > operator()( boost::function0< void > f ) {
        boost::shared_ptr< Join< void > > join( new Join< void > );
        queue( join, f );
        return join;
}

Join< void > is really a future for a task that has no return value.
Join< T > (where a T is returned by the task) is implemented in terms of
this primitive.

The queue() function adds the job to a queue and then signals its worker
thread that there is something to do:

  void FSLib::Worker::queue( boost::shared_ptr< Join< void > > join,
boost::function0< void > f ) {
        boost::mutex::scoped_lock lock( m_mutex );
        m_queue.push_back( std::make_pair( join, f ) );
        m_control.notify_all();
}

The boost::thread is initialised with a static function which waits for
items to appear in its queue, performs the function and then notifies
the Join object of the results. The function looks like this:

void FSLib::Worker::execute() {
        FSLib::Exceptions::StructuredHandler handler;
        do {
                t_queue job;
                { // Find a job to perform
                        boost::mutex::scoped_lock lock( m_mutex );
                        if ( m_queue.empty() )
                                m_control.wait( lock );
                        if ( !m_terminate && !m_queue.empty() )
                                job.swap( m_queue );
                }
                for ( t_queue::const_iterator j( job.begin() ); j != job.end() &&
!m_terminate; ++j ) {
                        // Execute job
                        try {
                                j->second();
                        } catch ( FSLib::Exceptions::Exception &e ) {
                                boost::mutex::scoped_lock lock( m_mutex );
                                j->first->m_exception = toString( e );
                        } catch ( ... ) {
                                boost::mutex::scoped_lock lock( m_mutex );
                                j->first->m_exception = L"An unknown exception was caught";
                        }
                        // Notify joins
                        boost::mutex::scoped_lock lock( m_mutex );
                        j->first->m_completed = true;
                        j->first->m_control.notify_all();
                }
        } while ( !m_terminate );
}

(I suspect that I shouldn't be catching all exceptions, but in any case
no exceptions are thrown when the problem occurs.)

The critical part is at the end. It sets a flag on the Join object and
then notifies it's condition object. The Join< void > class looks like this:

template<>
class F3UTIL_DECLSPEC Join< void > {
protected:
        Join();
public:
        virtual ~Join();

        void wait();
        Nullable< wstring > exception();
private:
        volatile bool m_completed;
        Nullable< wstring > m_exception;

        boost::mutex m_mutex;
        boost::condition m_control;

        friend void Worker::execute();
        friend boost::shared_ptr< Join< void > > Worker::operator()(
boost::function0< void > f );
};

The flag is marked volatile which according to Microsoft's documentation
ensures that writes to memory happen immediately and the flag will be
read from memory every time it is accessed.

When the result of the operation is needed then wait() or exception() is
called (wait() simply calls exception() but throws a new exception if
one was caught). Here is exception():

Nullable< wstring > FSLib::Worker::Join< void >::exception() {
        boost::mutex::scoped_lock lock( m_mutex );
        m_control.wait( lock, boost::lambda::bind( &Join< void >::m_completed,
this ) );
        return m_exception;
}

What happens is that once in every few thousand operations it looks like
the signals get confused. The waiting thread is stuck inside the
condition's wait member even though the predicate is true (it doesn't
make any difference if I re-write the exception() member with an
explicit while loop). All the worker threads are at the beginning of
execute() with empty queues.

Because the condition object has already been signalled by the worker
thread the Join object gets stuck forever.

I think I've got all of the mutexes in the right places, but I think I
must be missing something important. I guess there is also a small
chance that this is a bug in the boost::thread library (I'm using 1.34.0).

I could work around it by sharing a condition between all of the joins.
This would massively reduce the probability of this hanging the
application (as happens now), but I don't think it would fundamentally
solve the problem. Another workaround would be to use the timed wait()
functions, but I can't see from the documentation how I'd actually
manage to set an xtime up such that it would wake up after a few hundred
milliseconds.

Another workaround would involve a busy wait - checking maybe every
hundred milliseconds, but I'm really loathe to do that.

As I say, I'm sure I've missed something, but can't for the life of me
work out what it is.

K


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net