|
Boost : |
From: terekhov_at_[hidden]
Date: 2001-03-15 16:20:58
--- In boost_at_y..., williamkempf_at_h... wrote:
> --- In boost_at_y..., terekhov_at_y... wrote:
[...]
> > yes, indeed, it is very closely related. but here again the
problem
> > is that signals get consumed in "unfair" fashion by some threads
> > multiple times (as non-detectable spurious wakeups) and some
threads
> > could remain blocked on condition variable (BTW, that violates
> > CV "atomic" semantics with respect to mutex and other threads
> > calling signal/broadcast).
>
> Neither POSIX, nor more importantly Boost.Threads,
> gaurantees "fairness" in this regard. It's a noble goal, and if
> you've a solution to attain it I'll gladly do so, but it's not a
> requirement.
the solutions presented below do not produce spurious wakeups
(neither detectable nor non-detectable) hence, do not have this
defect;
> (BTW, I don't see how the CV "atomic" semantics are
> violated here.)
example:
(count == 0)
t1. thread A: mutex.lock; count++; do cv.wait( mutex ) while count !=
0;....
t2. thread B: mutex.lock; count++; do cv.wait( mutex ) while count !=
0;....
(mutex is free A and B already "blocked" on CV)
t3. thread C: mutex.lock; if (count>1) {cv.broadcast;count=0;}
mutex.unlock;...
Now, POSIX says:
-- a) ...functions atomically release mutex and cause the calling thread to block on the condition variable cond; atomically here means "atomically with respect to access by another thread to the mutex and then the condition variable". That is, if another thread is able to acquire the mutex after the about-to-block thread has released it, then a subsequent call to pthread_cond_signal() or pthread_cond_broadcast() in that thread behaves as if it were issued after the about-to-block thread has blocked. b) The pthread_cond_broadcast() call unblocks all threads currently blocked on the specified condition variable cond. -- cv.broadcast _should_ unblock thread B AND thread A! the behaviour when thread A would consume two signals and thread B would remain blocked IS INCORRECT. > > i've posted multiple solutions (hopefully correct ones) to > > comp.programming.threads/pthread_cond_wait() and WaitForSingleObject > > if you want, i can post it here or send it directly to you.. > > Either would be helpful. Newsgroups: comp.programming.threads Subject: Re: pthread_cond_wait() and WaitForSingleObject() Date: Wed, 28 Feb 2001 16:43:36 +0100 ?John Hickin wrote: ? ? ? Doug Schmidt's web site you will find a paper that analyses several ? different approaches to providing posix-like conditional waits, ? including one using a CRITICAL_SECTIONs, semaphores, and ? SignalObjectAndWait: ? ? http://www.cs.wustl.edu/~schmidt/win32-cv-1.html ? ? I can't say it any better than this web page says it: ? ? ?quote? ? This article illustrates why developing condition variables on Win32 ? platforms is tricky and error-prone. There are several subtle design ? forces that must be addressed by developers. In general, the different ? implementations we've examined vary according to their correctness, ? efficiency, fairness, and portability. No one solution provides all ? these qualities optimally. ? ?/quote? ? ? Note the final sentence. ? ? Regards, John. I would like to add that no solution presented in this paper implements condition variables correctly. I've reported to ACE/pthread-win32 folks several problems such as lost signal, double/nested broadcast deadlock, spurious wakeups,.. Louis Thomas ?lthomas_at_[hidden]? and I tried to find a correct and efficient solution. I think that the following solutions (semaphore/event based) implement condition variables correctly. I would greatly appreciate it if someone else would spend some time verifying the correctness and efficiency of the algorithms below. regards, alexander. ---------- Algorithm 6b ---------- gate - semaphore queue - semaphore counters - CS or mutex ex_mutex - CS or mutex nWaiters - int nSignaled - int nGone - int wait(timeout) { sem_wait gate // pass gate nWaiters++ sem_post gate unlock ex_mutex _bTimedOut=sem_wait(queue, timeout) lock counters if (_bTimedOut ?? (0==nSignaled || 0!=nWaiters-nGone)) { nGone++ // retract our waiter status (ignore overflow) } else { if (_bTimedOut) { sem_wait queue // better now than spurious later _bTimedout=false } nSignaled-- if (0==nSignaled) { sem_post gate // open gate } } unlock counters lock ex_mutex return _bTimedOut?ETIMEDOUT:0 } signal(bAll) { _nSig=0 lock counters // this is safe because nWaiting can only be decremented by a thread that // owns counters and nGone can only be changed by a thread that owns counters. if (nWaiting?nGone) { if (0==nSignaled) { sema_wait gate // close gate if not already closed } if (nGone?0) { nWaiting-=nGone nGone=0 } _nSig=bAll?nWaiting:1 nSignaled+=_nSig nWaiting-=_nSig } unlock counters if (0!=_nSig) { sema_post queue, _nSig } } ---------- Algorithm 8a / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ALL ------ given: semBlockLock - bin.semaphore semBlockQueue - semaphore mtxExternal - mutex or CS mtxUnblockLock - mutex or CS nWaitersGone - int nWaitersBlocked - int nWaitersToUnblock - int wait( timeout ) { [auto: register int result ] // error checking omitted [auto: register int nSignalsWasLeft ] [auto: register int nWaitersWasGone ] sem_wait( semBlockLock ); nWaitersBlocked++; sem_post( semBlockLock ); unlock( mtxExternal ); bTimedOut = sem_wait( semBlockQueue,timeout ); lock( mtxUnblockLock ); if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) { if ( bTimeout ) { // timeout (or canceled) if ( 0 != nWaitersBlocked ) { nWaitersBlocked--; } else { nWaitersGone++; // count spurious wakeups } } if ( 0 == --nWaitersToUnblock ) { if ( 0 != nWaitersBlocked ) { sem_post( semBlockLock ); // open the gate nSignalsWasLeft = 0; // do not open the gate below again } else if ( 0 != (nWaitersWasGone = nWaitersGone) ) { nWaitersGone = 0; } } } else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or spurious semaphore :-) sem_wait( semBlockLock ); nWaitersBlocked -= nWaitersGone; // something is going on here - test of timeouts? :-) sem_post( semBlockLock ); nWaitersGone = 0; } unlock( mtxUnblockLock ); if ( 1 == nSignalsWasLeft ) { if ( 0 != nWaitersWasGone ) { // sem_adjust( semBlockQueue,-nWaitersWasGone ); while ( nWaitersWasGone-- ) { sem_wait( semBlockQueue ); // better now than spurious later } } sem_post( semBlockLock ); // open the gate } lock( mtxExternal ); return ( bTimedOut ) ? ETIMEOUT : 0; } signal(bAll) { [auto: register int result ] [auto: register int nSignalsToIssue] lock( mtxUnblockLock ); if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! if ( 0 == nWaitersBlocked ) { // NO-OP return unlock( mtxUnblockLock ); } if (bAll) { nWaitersToUnblock += nSignalsToIssue=nWaitersBlocked; nWaitersBlocked = 0; } else { nSignalsToIssue = 1; nWaitersToUnblock++; nWaitersBlocked--; } } else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE CONDITION! sem_wait( semBlockLock ); // close the gate if ( 0 != nWaitersGone ) { nWaitersBlocked -= nWaitersGone; nWaitersGone = 0; } if (bAll) { nSignalsToIssue = nWaitersToUnblock = nWaitersBlocked; nWaitersBlocked = 0; } else { nSignalsToIssue = nWaitersToUnblock = 1; nWaitersBlocked--; } } else { // NO-OP return unlock( mtxUnblockLock ); } unlock( mtxUnblockLock ); sem_post( semBlockQueue,nSignalsToIssue ); return result; } ---------- Algorithm 8b / IMPL_SEM,UNBLOCK_STRATEGY == UNBLOCK_ONEBYONE ------ given: semBlockLock - bin.semaphore semBlockQueue - bin.semaphore mtxExternal - mutex or CS mtxUnblockLock - mutex or CS nWaitersGone - int nWaitersBlocked - int nWaitersToUnblock - int wait( timeout ) { [auto: register int result ] // error checking omitted [auto: register int nWaitersWasGone ] [auto: register int nSignalsWasLeft ] sem_wait( semBlockLock ); nWaitersBlocked++; sem_post( semBlockLock ); unlock( mtxExternal ); bTimedOut = sem_wait( semBlockQueue,timeout ); lock( mtxUnblockLock ); if ( 0 != (nSignalsWasLeft = nWaitersToUnblock) ) { if ( bTimeout ) { // timeout (or canceled) if ( 0 != nWaitersBlocked ) { nWaitersBlocked--; nSignalsWasLeft = 0; // do not unblock next waiter below (already unblocked) } else { nWaitersGone = 1; // spurious wakeup pending!! } } if ( 0 == --nWaitersToUnblock ?? if ( 0 != nWaitersBlocked ) { sem_post( semBlockLock ); // open the gate nSignalsWasLeft = 0; // do not open the gate below again } else if ( 0 != (nWaitersWasGone = nWaitersGone) ) { nWaitersGone = 0; } } } else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or spurious semaphore :-) sem_wait( semBlockLock ); nWaitersBlocked -= nWaitersGone; // something is going on here - test of timeouts? :-) sem_post( semBlockLock ); nWaitersGone = 0; } unlock( mtxUnblockLock ); if ( 1 == nSignalsWasLeft ) { if ( 0 != nWaitersWasGone ) { // sem_adjust( semBlockQueue,-1 ); sem_wait( semBlockQueue ); // better now than spurious later } sem_post( semBlockLock ); // open the gate } else if ( 0 != nSignalsWasLeft ) { sem_post( semBlockQueue ); // unblock next waiter } lock( mtxExternal ); return ( bTimedOut ) ? ETIMEOUT : 0; } signal(bAll) { [auto: register int result ] lock( mtxUnblockLock ); if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! if ( 0 == nWaitersBlocked ) { // NO-OP return unlock( mtxUnblockLock ); } if (bAll) { nWaitersToUnblock += nWaitersBlocked; nWaitersBlocked = 0; } else { nWaitersToUnblock++; nWaitersBlocked--; } unlock( mtxUnblockLock ); } else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE CONDITION! sem_wait( semBlockLock ); // close the gate if ( 0 != nWaitersGone ) { nWaitersBlocked -= nWaitersGone; nWaitersGone = 0; } if (bAll) { nWaitersToUnblock = nWaitersBlocked; nWaitersBlocked = 0; } else { nWaitersToUnblock = 1; nWaitersBlocked--; } unlock( mtxUnblockLock ); sem_post( semBlockQueue ); } else { // NO-OP unlock( mtxUnblockLock ); } return result; } ---------- Algorithm 8c / IMPL_EVENT,UNBLOCK_STRATEGY == UNBLOCK_ONEBYONE --------- given: hevBlockLock - auto-reset event hevBlockQueue - auto-reset event mtxExternal - mutex or CS mtxUnblockLock - mutex or CS nWaitersGone - int nWaitersBlocked - int nWaitersToUnblock - int wait( timeout ) { [auto: register int result ] // error checking omitted [auto: register int nSignalsWasLeft ] [auto: register int nWaitersWasGone ] wait( hevBlockLock,INFINITE ); nWaitersBlocked++; set_event( hevBlockLock ); unlock( mtxExternal ); bTimedOut = wait( hevBlockQueue,timeout ); lock( mtxUnblockLock ); if ( 0 != (SignalsWasLeft = nWaitersToUnblock) ) { if ( bTimeout ) { // timeout (or canceled) if ( 0 != nWaitersBlocked ) { nWaitersBlocked--; nSignalsWasLeft = 0; // do not unblock next waiter below (already unblocked) } else { nWaitersGone = 1; // spurious wakeup pending!! } } if ( 0 == --nWaitersToUnblock ) if ( 0 != nWaitersBlocked ) { set_event( hevBlockLock ); // open the gate nSignalsWasLeft = 0; // do not open the gate below again } else if ( 0 != (nWaitersWasGone = nWaitersGone) ) { nWaitersGone = 0; } } } else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or spurious event :-) wait( hevBlockLock,INFINITE ); nWaitersBlocked -= nWaitersGone; // something is going on here - test of timeouts? :-) set_event( hevBlockLock ); nWaitersGone = 0; } unlock( mtxUnblockLock ); if ( 1 == nSignalsWasLeft ) { if ( 0 != nWaitersWasGone ) { reset_event( hevBlockQueue ); // better now than spurious later } set_event( hevBlockLock ); // open the gate } else if ( 0 != nSignalsWasLeft ) { set_event( hevBlockQueue ); // unblock next waiter } lock( mtxExternal ); return ( bTimedOut ) ? ETIMEOUT : 0; } signal(bAll) { [auto: register int result ] lock( mtxUnblockLock ); if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! if ( 0 == nWaitersBlocked ) { // NO-OP return unlock( mtxUnblockLock ); } if (bAll) { nWaitersToUnblock += nWaitersBlocked; nWaitersBlocked = 0; } else { nWaitersToUnblock++; nWaitersBlocked--; } unlock( mtxUnblockLock ); } else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE CONDITION! wait( hevBlockLock,INFINITE ); // close the gate if ( 0 != nWaitersGone ) { nWaitersBlocked -= nWaitersGone; nWaitersGone = 0; } if (bAll) { nWaitersToUnblock = nWaitersBlocked; nWaitersBlocked = 0; } else { nWaitersToUnblock = 1; nWaitersBlocked--; } unlock( mtxUnblockLock ); set_event( hevBlockQueue ); } else { // NO-OP unlock( mtxUnblockLock ); } return result; } ---------- Algorithm 8d / IMPL_EVENT,UNBLOCK_STRATEGY == UNBLOCK_ALL ------ given: hevBlockLock - auto-reset event hevBlockQueueS - auto-reset event // for signals hevBlockQueueB - manual-reset even // for broadcasts mtxExternal - mutex or CS mtxUnblockLock - mutex or CS eBroadcast - int // 0: no broadcast, 1: broadcast, 2: broadcast after signal(s) nWaitersGone - int nWaitersBlocked - int nWaitersToUnblock - int wait( timeout ) { [auto: register int result ] // error checking omitted [auto: register int eWasBroadcast ] [auto: register int nSignalsWasLeft ] [auto: register int nWaitersWasGone ] wait( hevBlockLock,INFINITE ); nWaitersBlocked++; set_event( hevBlockLock ); unlock( mtxExternal ); bTimedOut = waitformultiple( hevBlockQueueS,hevBlockQueueB,timeout,ONE ); lock( mtxUnblockLock ); if ( 0 != (SignalsWasLeft = nWaitersToUnblock) ) { if ( bTimeout ) { // timeout (or canceled) if ( 0 != nWaitersBlocked ) { nWaitersBlocked--; nSignalsWasLeft = 0; // do not unblock next waiter below (already unblocked) } else if ( 1 != eBroadcast ) { nWaitersGone = 1; } } if ( 0 == --nWaitersToUnblock ) { if ( 0 != nWaitersBlocked ) { set_event( hevBlockLock ); // open the gate nSignalsWasLeft = 0; // do not open the gate below again } else { if ( 0 != (eWasBroadcast = eBroadcast) ) { eBroadcast = 0; } if ( 0 != (nWaitersWasGone = nWaitersGone ) { nWaitersGone = 0; } } } else if ( 0 != eBroadcast ) { nSignalsWasLeft = 0; // do not unblock next waiter below (already unblocked) } } else if ( INT_MAX/2 == ++nWaitersGone ) { // timeout/canceled or spurious event :-) wait( hevBlockLock,INFINITE ); nWaitersBlocked -= nWaitersGone; // something is going on here - test of timeouts? :-) set_event( hevBlockLock ); nWaitersGone = 0; } unlock( mtxUnblockLock ); if ( 1 == nSignalsWasLeft ) { if ( 0 != eWasBroadcast ) { reset_event( hevBlockQueueB ); } if ( 0 != nWaitersWasGone ) { reset_event( hevBlockQueueS ); // better now than spurious later } set_event( hevBlockLock ); // open the gate } else if ( 0 != nSignalsWasLeft ) { set_event( hevBlockQueueS ); // unblock next waiter } lock( mtxExternal ); return ( bTimedOut ) ? ETIMEOUT : 0; } signal(bAll) { [auto: register int result ] [auto: register HANDLE hevBlockQueue ] lock( mtxUnblockLock ); if ( 0 != nWaitersToUnblock ) { // the gate is closed!!! if ( 0 == nWaitersBlocked ) { // NO-OP return unlock( mtxUnblockLock ); } if (bAll) { nWaitersToUnblock += nWaitersBlocked; nWaitersBlocked = 0; eBroadcast = 2; hevBlockQueue = hevBlockQueueB; } else { nWaitersToUnblock++; nWaitersBlocked--; return unlock( mtxUnblockLock ); } } else if ( nWaitersBlocked ? nWaitersGone ) { // HARMLESS RACE CONDITION! wait( hevBlockLock,INFINITE ); // close the gate if ( 0 != nWaitersGone ) { nWaitersBlocked -= nWaitersGone; nWaitersGone = 0; } if (bAll) { nWaitersToUnblock = nWaitersBlocked; nWaitersBlocked = 0; eBroadcast = 1; hevBlockQueue = hevBlockQueueB; } else { nWaitersToUnblock = 1; nWaitersBlocked--; hevBlockQueue = hevBlockQueueS; } } else { // NO-OP return unlock( mtxUnblockLock ); } unlock( mtxUnblockLock ); set_event( hevBlockQueue ); return result; }
Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk