Boost logo

Boost :

From: terekhov_at_[hidden]
Date: 2001-03-15 16:20:58


--- In boost_at_y..., williamkempf_at_h... wrote:
> --- In boost_at_y..., terekhov_at_y... wrote:
[...]
> > yes, indeed, it is very closely related. but here again the
problem
> > is that signals get consumed in "unfair" fashion by some threads
> > multiple times (as non-detectable spurious wakeups) and some
threads
> > could remain blocked on condition variable (BTW, that violates
> > CV "atomic" semantics with respect to mutex and other threads
> > calling signal/broadcast).
>
> Neither POSIX, nor more importantly Boost.Threads,
> gaurantees "fairness" in this regard. It's a noble goal, and if
> you've a solution to attain it I'll gladly do so, but it's not a
> requirement.

the solutions presented below do not produce spurious wakeups
(neither detectable nor non-detectable) hence, do not have this
defect;

> (BTW, I don't see how the CV "atomic" semantics are
> violated here.)

example:

(count == 0)

t1. thread A: mutex.lock; count++; do cv.wait( mutex ) while count !=
0;....

t2. thread B: mutex.lock; count++; do cv.wait( mutex ) while count !=
0;....

(mutex is free – A and B already "blocked" on CV)

t3. thread C: mutex.lock; if (count>1) {cv.broadcast;count=0;}
mutex.unlock;...

Now, POSIX says:

--
a) ...functions atomically release mutex and cause the calling thread 
to block on the condition variable cond; atomically here means 
"atomically with respect to access by another thread to the 
mutex and then the condition variable". That is, if another thread is 
able to acquire the mutex after the about-to-block thread has 
released 
it, then a subsequent call to pthread_cond_signal() or 
pthread_cond_broadcast() in that thread behaves as if it were 
issued after the about-to-block thread has blocked. 
b) The pthread_cond_broadcast() call unblocks all threads currently 
blocked on the specified condition variable cond. 
--
cv.broadcast _should_ unblock thread B AND thread A!
the behaviour when thread A would consume two signals and
thread B would remain blocked IS INCORRECT.
> > i've posted multiple solutions (hopefully correct ones) to
> > comp.programming.threads/pthread_cond_wait() and 
WaitForSingleObject
> > if you want, i can post it here or send it directly to you..
> 
> Either would be helpful.
              Newsgroups: 
                         comp.programming.threads
                  Subject: 
                         Re: pthread_cond_wait() and 
WaitForSingleObject()
                    Date: 
                         Wed, 28 Feb 2001 16:43:36 +0100
?John Hickin wrote:
?
?
?  Doug Schmidt's web site you will find a paper that analyses several
?  different approaches to providing posix-like conditional waits,
?  including one using a CRITICAL_SECTIONs, semaphores, and
?  SignalObjectAndWait:
?
?    http://www.cs.wustl.edu/~schmidt/win32-cv-1.html
?
?  I can't say it any better than this web page says it:
?
?  ?quote?
?  This article illustrates why developing condition variables on 
Win32
?  platforms is tricky and error-prone. There are several subtle 
design
?  forces that must be addressed by developers. In general, the 
different
?  implementations we've examined vary according to their correctness,
?  efficiency, fairness, and portability. No one solution provides all
?  these qualities optimally.
?  ?/quote?
?
?  Note the final sentence.
?
?  Regards, John.
I would like to add that no solution presented in this paper
implements condition variables correctly. I've reported to 
ACE/pthread-win32 folks several problems such as lost signal, 
double/nested broadcast deadlock, spurious wakeups,..
Louis Thomas ?lthomas_at_[hidden]? and I tried to find
a correct and efficient solution. I think that the following 
solutions (semaphore/event based) implement condition variables 
correctly. I would greatly appreciate it if someone else would 
spend some time verifying the correctness and efficiency of 
the algorithms below.
regards,
alexander.
---------- Algorithm 6b ----------
gate - semaphore
queue - semaphore
counters - CS or mutex
ex_mutex - CS or mutex
nWaiters - int
nSignaled - int
nGone - int
wait(timeout) {
  sem_wait gate // pass gate
  nWaiters++
  sem_post gate
  unlock ex_mutex
  _bTimedOut=sem_wait(queue, timeout)
  lock counters
  if (_bTimedOut ?? (0==nSignaled || 0!=nWaiters-nGone)) {
    nGone++         // retract our waiter status (ignore overflow)
  } else {
    if (_bTimedOut) {
      sem_wait queue  // better now than spurious later
      _bTimedout=false
    }
    nSignaled--
    if (0==nSignaled) {
      sem_post gate // open gate
    }
  }
  unlock counters
  lock ex_mutex
  return _bTimedOut?ETIMEDOUT:0
}
signal(bAll) {
  _nSig=0
  lock counters
  // this is safe because nWaiting can only be decremented by a thread
that
  // owns counters and nGone can only be changed by a thread that owns
counters.
  if (nWaiting?nGone) {
    if (0==nSignaled) {
      sema_wait gate // close gate if not already closed
    }
    if (nGone?0) {
      nWaiting-=nGone
      nGone=0
    }
    _nSig=bAll?nWaiting:1
    nSignaled+=_nSig
    nWaiting-=_nSig
  }
  unlock counters
  if (0!=_nSig) {
    sema_post queue, _nSig
  }
}
---------- Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL
------
given:
semBlockLock - bin.semaphore
semBlockQueue - semaphore
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
  [auto: register int result          ]     // error checking omitted
  [auto: register int nSignalsWasLeft ]
  [auto: register int nWaitersWasGone ]
  sem_wait( semBlockLock );
  nWaitersBlocked++;
  sem_post( semBlockLock );
  unlock( mtxExternal );
  bTimedOut = sem_wait( semBlockQueue,timeout );
  lock( mtxUnblockLock );
  if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) {
    if ( bTimeout ) {                       // timeout (or canceled)
      if ( 0 != nWaitersBlocked ) {
        nWaitersBlocked--;
      }
      else {
        nWaitersGone++;                     // count spurious wakeups
      }
    }
    if ( 0 == --nWaitersToUnblock ) {
      if ( 0 != nWaitersBlocked ) {
        sem_post( semBlockLock );           // open the gate
        nSignalsWasLeft = 0;                // do not open the gate
below again
      }
      else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
        nWaitersGone = 0;
      }
    }
  }
  else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious semaphore :-)
    sem_wait( semBlockLock );
    nWaitersBlocked -= nWaitersGone;        // something is going on
here - test of timeouts? :-)
    sem_post( semBlockLock );
    nWaitersGone = 0;
  }
  unlock( mtxUnblockLock );
  if ( 1 == nSignalsWasLeft ) {
    if ( 0 != nWaitersWasGone ) {
      // sem_adjust( semBlockQueue,-nWaitersWasGone );
      while ( nWaitersWasGone-- ) {
        sem_wait( semBlockQueue );         // better now than spurious
later
      }
    }
    sem_post( semBlockLock );              // open the gate
  }
  lock( mtxExternal );
  return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
  [auto: register int result         ]
  [auto: register int nSignalsToIssue]
  lock( mtxUnblockLock );
  if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
    if ( 0 == nWaitersBlocked ) { // NO-OP
      return unlock( mtxUnblockLock );
    }
    if (bAll) {
      nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked;
      nWaitersBlocked = 0;
    }
    else {
      nSignalsToIssue = 1;
      nWaitersToUnblock++;
      nWaitersBlocked--;
    }
  }
  else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
    sem_wait( semBlockLock ); // close the gate
    if ( 0 != nWaitersGone ) {
      nWaitersBlocked -= nWaitersGone;
      nWaitersGone = 0;
    }
    if (bAll) {
      nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked;
      nWaitersBlocked = 0;
    }
    else {
      nSignalsToIssue = nWaitersToUnblock = 1;
      nWaitersBlocked--;
    }
  }
  else { // NO-OP
    return unlock( mtxUnblockLock );
  }
  unlock( mtxUnblockLock );
  sem_post( semBlockQueue,nSignalsToIssue );
  return result;
}
---------- Algorithm 8b / IMPL_SEM,UNBLOCK_STRATEGY == 
UNBLOCK_ONEBYONE
------
given:
semBlockLock - bin.semaphore
semBlockQueue - bin.semaphore
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
  [auto: register int result          ]     // error checking omitted
  [auto: register int nWaitersWasGone ]
  [auto: register int nSignalsWasLeft ]
  sem_wait( semBlockLock );
  nWaitersBlocked++;
  sem_post( semBlockLock );
  unlock( mtxExternal );
  bTimedOut = sem_wait( semBlockQueue,timeout );
  lock( mtxUnblockLock );
  if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) {
    if ( bTimeout ) {                       // timeout (or canceled)
      if ( 0 != nWaitersBlocked ) {
        nWaitersBlocked--;
        nSignalsWasLeft = 0;                // do not unblock next
waiter below (already unblocked)
      }
      else {
        nWaitersGone = 1;                   // spurious wakeup 
pending!!
      }
    }
    if ( 0 == --nWaitersToUnblock ??
      if ( 0 != nWaitersBlocked ) {
        sem_post( semBlockLock );           // open the gate
        nSignalsWasLeft = 0;                // do not open the gate
below again
      }
      else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
        nWaitersGone = 0;
      }
    }
  }
  else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious semaphore :-)
    sem_wait( semBlockLock );
    nWaitersBlocked -= nWaitersGone;        // something is going on
here - test of timeouts? :-)
    sem_post( semBlockLock );
    nWaitersGone = 0;
  }
  unlock( mtxUnblockLock );
  if ( 1 == nSignalsWasLeft ) {
    if ( 0 != nWaitersWasGone ) {
      // sem_adjust( semBlockQueue,-1 );
      sem_wait( semBlockQueue );           // better now than spurious
later
    }
    sem_post( semBlockLock );              // open the gate
  }
  else if ( 0 != nSignalsWasLeft ) {
    sem_post( semBlockQueue );             // unblock next waiter
  }
  lock( mtxExternal );
  return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
  [auto: register int result ]
  lock( mtxUnblockLock );
  if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
    if ( 0 == nWaitersBlocked ) { // NO-OP
      return unlock( mtxUnblockLock );
    }
    if (bAll) {
      nWaitersToUnblock += nWaitersBlocked;
      nWaitersBlocked = 0;
    }
    else {
      nWaitersToUnblock++;
      nWaitersBlocked--;
    }
    unlock( mtxUnblockLock );
  }
  else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
    sem_wait( semBlockLock ); // close the gate
    if ( 0 != nWaitersGone ) {
      nWaitersBlocked -= nWaitersGone;
      nWaitersGone = 0;
    }
    if (bAll) {
      nWaitersToUnblock = nWaitersBlocked;
      nWaitersBlocked = 0;
    }
    else {
      nWaitersToUnblock = 1;
      nWaitersBlocked--;
    }
    unlock( mtxUnblockLock );
    sem_post( semBlockQueue );
  }
  else { // NO-OP
    unlock( mtxUnblockLock );
  }
  return result;
}
---------- Algorithm 8c / IMPL_EVENT,UNBLOCK_STRATEGY ==
UNBLOCK_ONEBYONE ---------
given:
hevBlockLock - auto-reset event
hevBlockQueue - auto-reset event
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
  [auto: register int result          ]     // error checking omitted
  [auto: register int nSignalsWasLeft ]
  [auto: register int nWaitersWasGone ]
  wait( hevBlockLock,INFINITE );
  nWaitersBlocked++;
  set_event( hevBlockLock );
  unlock( mtxExternal );
  bTimedOut = wait( hevBlockQueue,timeout );
  lock( mtxUnblockLock );
  if ( 0 != (SignalsWasLeft = nWaitersToUnblock) ) {
    if ( bTimeout ) {                       // timeout (or canceled)
      if ( 0 != nWaitersBlocked ) {
        nWaitersBlocked--;
        nSignalsWasLeft = 0;                // do not unblock next
waiter below (already unblocked)
      }
      else {
        nWaitersGone = 1;                   // spurious wakeup 
pending!!
      }
    }
    if ( 0 == --nWaitersToUnblock )
      if ( 0 != nWaitersBlocked ) {
        set_event( hevBlockLock );          // open the gate
        nSignalsWasLeft = 0;                // do not open the gate
below again
      }
      else if ( 0 != (nWaitersWasGone = nWaitersGone) ) {
        nWaitersGone = 0;
      }
    }
  }
  else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious event :-)
    wait( hevBlockLock,INFINITE );
    nWaitersBlocked -= nWaitersGone;        // something is going on
here - test of timeouts? :-)
    set_event( hevBlockLock );
    nWaitersGone = 0;
  }
  unlock( mtxUnblockLock );
  if ( 1 == nSignalsWasLeft ) {
    if ( 0 != nWaitersWasGone ) {
      reset_event( hevBlockQueue );         // better now than 
spurious
later
    }
    set_event( hevBlockLock );              // open the gate
  }
  else if ( 0 != nSignalsWasLeft ) {
    set_event( hevBlockQueue );             // unblock next waiter
  }
  lock( mtxExternal );
  return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
  [auto: register int result ]
  lock( mtxUnblockLock );
  if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
    if ( 0 == nWaitersBlocked ) { // NO-OP
      return unlock( mtxUnblockLock );
    }
    if (bAll) {
      nWaitersToUnblock += nWaitersBlocked;
      nWaitersBlocked = 0;
    }
    else {
      nWaitersToUnblock++;
      nWaitersBlocked--;
    }
    unlock( mtxUnblockLock );
  }
  else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
    wait( hevBlockLock,INFINITE ); // close the gate
    if ( 0 != nWaitersGone ) {
      nWaitersBlocked -= nWaitersGone;
      nWaitersGone = 0;
    }
    if (bAll) {
      nWaitersToUnblock = nWaitersBlocked;
      nWaitersBlocked = 0;
    }
    else {
      nWaitersToUnblock = 1;
      nWaitersBlocked--;
    }
    unlock( mtxUnblockLock );
    set_event( hevBlockQueue );
  }
  else { // NO-OP
    unlock( mtxUnblockLock );
  }
  return result;
}
---------- Algorithm 8d / IMPL_EVENT,UNBLOCK_STRATEGY == UNBLOCK_ALL
------
given:
hevBlockLock - auto-reset event
hevBlockQueueS - auto-reset event  // for signals
hevBlockQueueB - manual-reset even // for broadcasts
mtxExternal - mutex or CS
mtxUnblockLock - mutex or CS
eBroadcast - int                   // 0: no broadcast, 1: broadcast, 
2:
broadcast after signal(s)
nWaitersGone - int
nWaitersBlocked - int
nWaitersToUnblock - int
wait( timeout ) {
  [auto: register int result          ]     // error checking omitted
  [auto: register int eWasBroadcast   ]
  [auto: register int nSignalsWasLeft ]
  [auto: register int nWaitersWasGone ]
  wait( hevBlockLock,INFINITE );
  nWaitersBlocked++;
  set_event( hevBlockLock );
  unlock( mtxExternal );
  bTimedOut = waitformultiple( 
hevBlockQueueS,hevBlockQueueB,timeout,ONE
);
  lock( mtxUnblockLock );
  if ( 0 != (SignalsWasLeft = nWaitersToUnblock) ) {
    if ( bTimeout ) {                       // timeout (or canceled)
      if ( 0 != nWaitersBlocked ) {
        nWaitersBlocked--;
        nSignalsWasLeft = 0;                // do not unblock next
waiter below (already unblocked)
      }
      else if ( 1 != eBroadcast ) {
        nWaitersGone = 1;
      }
    }
    if ( 0 == --nWaitersToUnblock ) {
      if ( 0 != nWaitersBlocked ) {
        set_event( hevBlockLock );           // open the gate
        nSignalsWasLeft = 0;                 // do not open the gate
below again
      }
      else {
        if ( 0 != (eWasBroadcast = eBroadcast) ) {
          eBroadcast = 0;
        }
        if ( 0 != (nWaitersWasGone = nWaitersGone ) {
          nWaitersGone = 0;
        }
      }
    }
    else if ( 0 != eBroadcast ) {
      nSignalsWasLeft = 0;                  // do not unblock next
waiter below (already unblocked)
    }
  }
  else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or
spurious event :-)
    wait( hevBlockLock,INFINITE );
    nWaitersBlocked -= nWaitersGone;        // something is going on
here - test of timeouts? :-)
    set_event( hevBlockLock );
    nWaitersGone = 0;
  }
  unlock( mtxUnblockLock );
  if ( 1 == nSignalsWasLeft ) {
    if ( 0 != eWasBroadcast ) {
      reset_event( hevBlockQueueB );
    }
    if ( 0 != nWaitersWasGone ) {
      reset_event( hevBlockQueueS );        // better now than 
spurious
later
    }
    set_event( hevBlockLock );              // open the gate
  }
  else if ( 0 != nSignalsWasLeft ) {
    set_event( hevBlockQueueS );            // unblock next waiter
  }
  lock( mtxExternal );
  return ( bTimedOut ) ? ETIMEOUT : 0;
}
signal(bAll) {
  [auto: register int    result        ]
  [auto: register HANDLE hevBlockQueue ]
  lock( mtxUnblockLock );
  if ( 0 != nWaitersToUnblock ) { // the gate is closed!!!
    if ( 0 == nWaitersBlocked ) { // NO-OP
      return unlock( mtxUnblockLock );
    }
    if (bAll) {
      nWaitersToUnblock += nWaitersBlocked;
      nWaitersBlocked = 0;
      eBroadcast = 2;
      hevBlockQueue = hevBlockQueueB;
    }
    else {
      nWaitersToUnblock++;
      nWaitersBlocked--;
      return unlock( mtxUnblockLock );
    }
  }
  else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE
CONDITION!
    wait( hevBlockLock,INFINITE ); // close the gate
    if ( 0 != nWaitersGone ) {
      nWaitersBlocked -= nWaitersGone;
      nWaitersGone = 0;
    }
    if (bAll) {
      nWaitersToUnblock = nWaitersBlocked;
      nWaitersBlocked = 0;
      eBroadcast = 1;
      hevBlockQueue = hevBlockQueueB;
    }
    else {
      nWaitersToUnblock = 1;
      nWaitersBlocked--;
      hevBlockQueue = hevBlockQueueS;
    }
  }
  else { // NO-OP
    return unlock( mtxUnblockLock );
  }
  unlock( mtxUnblockLock );
  set_event( hevBlockQueue );
  return result;
}

Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk