Boost logo

Boost :

From: Roland Schwarz (roland.schwarz_at_[hidden])
Date: 2005-09-11 14:07:10


While reviewing the current implementation of boost
thread in an effort to split the code into platform specific
parts for easier maintanace, I looked into the current
implementation of condition variables too.

Altough I know the past discussions about this topic
to some degree, nevertheless I like to drop in another
solution for review.

The idea is based on the PulseEvent and SetEvent
solution of "Strategies for Implementing POSIX
Condition Variables on Win32" from Douglas C. Schmidt
http://www.cs.wustl.edu/~schmidt/win32-cv-1.html .

The most "concise and intuitive" solution there is described
as suffering from the so called "lost wakeup bug".

Nevertheless I used this as a starting point.
To overcome the lost wakeup bug I introduced the concept
of a gate, and using stateful (manual reset) events.
The gate is closed whenever a broadcast or signal
is underway. On entry to wait the gate also has to be
passed through, while simultaneously successful
entries are recorded.

The trick is that the gate can be closed by one
thread (the signaling one) and reopened by another
(the waiting one). This can be achieved by using a
Semaphore instead of a mutex for the gate implementation.
Semaphores can be set/reset from different threads.

Altough I think I was careful enough, it might well be
that I made an mistake. Also since the idea is so simple
and it needs so less code for implementation,
I am wondering whether someone else came up with
this solution. I would be very glad if someone could take
a look at the code and show me its pitfalls.

For my draft I used VC7.1 on W2K. The code is
without error handling yet. The main() implements a small
test, simulating a queue that is served by three threads.

Code follows:

#include <windows.h>

#include <boost/thread/thread.hpp>
using namespace boost;

typedef CRITICAL_SECTION pthread_mutex_t;

void pthread_mutex_init(pthread_mutex_t* mutex)
{ InitializeCriticalSection(mutex); }

void pthread_mutex_destroy(pthread_mutex_t* mutex)
{ DeleteCriticalSection(mutex); }

void pthread_mutex_lock(pthread_mutex_t* mutex)
{ EnterCriticalSection(mutex); }

void pthread_mutex_unlock(pthread_mutex_t* mutex)
{ LeaveCriticalSection(mutex); }

// the condition variable implementation starts here
// -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8--
#define SIGNAL 0
#define BROADCAST 1
typedef struct
{
    HANDLE events_[2];
    HANDLE gate_;
    int waiters_;
} pthread_cond_t;

void pthread_cond_init(pthread_cond_t *cv)
{
    cv->events_[SIGNAL]
        = CreateEvent(NULL,FALSE,FALSE,NULL);
    cv->events_[BROADCAST]
        = CreateEvent(NULL,TRUE,FALSE,NULL);
    // use a semaphore instead of mutex since
    // they can be used cross thread
    cv->gate_ = CreateSemaphore(NULL,1,1,NULL);
    cv->waiters_ = 0;
}

void pthread_cond_destroy(pthread_cond_t* cv)
{
    CloseHandle(cv->events_[SIGNAL]);
    CloseHandle(cv->events_[BROADCAST]);
    CloseHandle(cv->gate_);
}

void pthread_cond_wait(
    pthread_cond_t* cv, pthread_mutex_t* external_mutex)
{
    // we may only enter when no wakeups active
    // this will prevent the lost wakeup
    WaitForSingleObject(cv->gate_, INFINITE);
    ++cv->waiters_; // count waiters passing through
    ReleaseSemaphore(cv->gate_,1,NULL);

    LeaveCriticalSection(external_mutex);
    switch(WaitForMultipleObjects(
        2, cv->events_,FALSE,INFINITE)) {
        // on unblocking of the thread the gate is closed
        case WAIT_OBJECT_0+SIGNAL:
            --cv->waiters_; // one is leaving
            ReleaseSemaphore(cv->gate_,1,NULL);//reopen
            // no need to reset event, it is automatic
            break;
        case WAIT_OBJECT_0+BROADCAST:
            if (0 == --cv->waiters_) {
                ResetEvent(
                    cv->events_[BROADCAST]);
                 // last leaving, reopen the gate
                ReleaseSemaphore(cv->gate_,1,NULL);
            }
            break;
    }
    EnterCriticalSection(external_mutex);
}

void pthread_cond_broadcast (pthread_cond_t *cv)
{
    WaitForSingleObject(cv->gate_, INFINITE);//close gate
    if (cv->waiters_) // if no waiters just reopen gate
        SetEvent (cv->events_[BROADCAST]); // wake all
    else
        ReleaseSemaphore(cv->gate_,1,NULL);
}

void pthread_cond_signal (pthread_cond_t *cv)
{
    WaitForSingleObject(cv->gate_, INFINITE);//close gate
    if (cv->waiters_) // if no waiters just reopen gate
        SetEvent (cv->events_[SIGNAL]); // wake one
    else
        ReleaseSemaphore(cv->gate_,1,NULL);
}
// -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8-- -->8--
// the condition variable implementation ends here

// a small example for testing follows:
// 3 worker threads try to consume input and update output
// main thread generates input and then waits until all
// input has been processed by checking the output

pthread_mutex_t m;
pthread_cond_t ci, co;
bool run;
int input, output, total;
int done1, done2, done3;

void do_work1()
{
    pthread_mutex_lock(&m);
    while (run) {
        while (0 == input && run)
            pthread_cond_wait(&ci, &m);
        if (!run) break;
        --input;
        ++output;
        ++done1;
        pthread_cond_signal(&co);
    }
    pthread_mutex_unlock(&m);
}
void do_work2()
{
    pthread_mutex_lock(&m);
    while (run) {
        while (0 == input && run)
            pthread_cond_wait(&ci, &m);
        if (!run) break;
        --input;
        ++output;
        ++done2;
        pthread_cond_signal(&co);
    }
    pthread_mutex_unlock(&m);
}
void do_work3()
{
    pthread_mutex_lock(&m);
    while (run) {
        while (0 == input && run)
            pthread_cond_wait(&ci, &m);
        if (!run) break;
        --input;
        ++output;
        ++done3;
        pthread_cond_signal(&co);
    }
    pthread_mutex_unlock(&m);
}

int main(int argc, char* argv)
{
    pthread_mutex_init(&m);
    pthread_cond_init(&ci);
    pthread_cond_init(&co);
    done1 = done2 = done3 = 0;
    total = input = output = 0;
    run = true;

    boost::thread t1(do_work1);
    boost::thread t2(do_work2);
    boost::thread t3(do_work3);

    for (int n = 0; n<1000; ++n) {
        pthread_mutex_lock(&m);
        input+=5;
        total+=5;
        pthread_cond_broadcast(&ci);
        pthread_mutex_unlock(&m);
    }

    pthread_mutex_lock(&m);
    while (output < total)
        pthread_cond_wait(&co, &m);
    run = false;
    pthread_cond_broadcast(&ci);
    pthread_mutex_unlock(&m);

    t1.join();
    t2.join();
    t3.join();

    pthread_cond_destroy(&co);
    pthread_cond_destroy(&ci);
    pthread_mutex_destroy(&m);
    return 0;
}

Regards,
Roland


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk