|
Boost : |
From: Peter Dimov (pdimov_at_[hidden])
Date: 2006-10-26 16:46:15
Peter Dimov wrote:
>>> (This redundant wakeup may not be visible in a benchmark, but it
>>> can steal CPU from other busy non-contending threads in a real
>>> application.)
>
> To test this, I added "free" threads to the benchmark. These compete
> on a different mutex. With my latest experiment, this reveals
> odd/intriguing patterns of the form
>
> 2R+4W+0F: 29
> 2R+4W+4F: 18
>
> The additional four free threads improve performance!
That was because I had an inefficiency in the algorithm, of course. Still,
the free threads do seem to help the scheduler sometimes.
I finally managed to match the "naive" implementation with a "fancy"
semaphore-based scheme. The basic algorithm is that each unlock unblocks one
waiting thread; the twist is that if the thread that wakes up is a reader,
it then unblocks all waiters. When a writer wakes up, it doesn't unblock
anyone, since it is on its way to exclusive access anyway. I haven't
reviewed the code for subtle correctness issues, though; no time for that
now. :-/
Here's the code:
#ifndef BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED
#define BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED
// MS compatible compilers support #pragma once
#if defined(_MSC_VER) && (_MSC_VER >= 1020)
# pragma once
#endif
// Copyright (c) 2005, 2006 Peter Dimov
//
// Distributed under the Boost Software License, Version 1.0.
// See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <algorithm>
#include <cassert>
#include <limits.h>
#include <windows.h>
inline bool atomic_compare_exchange( long * destination, long * comparand,
long exchange )
{
long ov = *comparand;
long nv = InterlockedCompareExchange( destination, exchange, ov );
if( ov == nv )
{
return true;
}
else
{
*comparand = nv;
return false;
}
}
inline bool atomic_compare_exchange_void( void * destination, void *
comparand, void const * exchange )
{
return atomic_compare_exchange( (long*)destination, (long*)comparand,
*(long const*)exchange );
}
class rw_mutex
{
private:
rw_mutex( rw_mutex const & );
rw_mutex & operator=( rw_mutex const & );
private:
struct state
{
unsigned writer: 1;
unsigned readers: 31;
};
state state_;
HANDLE sema_wp_; // writer pending
HANDLE sema_rw_; // readers+writers blocked
long retries_[ 6 ];
long blocked_;
public:
rw_mutex(): blocked_( 0 )
{
state st = { 0 };
state_ = st;
sema_wp_ = CreateSemaphore( 0, 0, LONG_MAX, 0 );
sema_rw_ = CreateSemaphore( 0, 0, LONG_MAX, 0 );
std::fill( retries_, retries_ + 6, 0 );
}
~rw_mutex()
{
CloseHandle( sema_wp_ );
CloseHandle( sema_rw_ );
}
void unblock_one()
{
if( (volatile const long&)blocked_ > 0 )
{
long k = InterlockedExchangeAdd( &blocked_, -1 );
if( k > 0 )
{
ReleaseSemaphore( sema_rw_, 1, 0 );
}
else
{
// undo
InterlockedIncrement( &blocked_ );
}
}
}
void unblock_all()
{
if( (volatile const long&)blocked_ > 0 )
{
long k = InterlockedExchange( &blocked_, 0 );
if( k > 0 )
{
ReleaseSemaphore( sema_rw_, 1, 0 );
}
}
}
void rdlock()
{
state st = state_;
for( ;; )
{
state xchg = st;
if( st.writer )
{
InterlockedIncrement( &blocked_ );
WaitForSingleObject( sema_rw_, INFINITE );
unblock_all();
st = state_;
}
else
{
xchg.readers = st.readers + 1;
if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
{
return; // got read lock
}
else
{
InterlockedIncrement( &retries_[ 0 ] );
}
}
}
}
void lock()
{
state st = state_;
for( ;; )
{
state xchg = st;
if( st.writer )
{
InterlockedIncrement( &blocked_ );
WaitForSingleObject( sema_rw_, INFINITE );
st = state_;
}
else if( st.readers )
{
xchg.writer = 1;
if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
{
WaitForSingleObject( sema_wp_, INFINITE );
return;
}
else
{
InterlockedIncrement( &retries_[ 2 ] );
}
}
else // free
{
xchg.writer = 1;
if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
{
return;
}
else
{
InterlockedIncrement( &retries_[ 3 ] );
}
}
}
}
void rdunlock()
{
state st = state_;
for( ;; )
{
state xchg = st;
assert( st.readers > 0 );
--xchg.readers;
if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
{
break;
}
else
{
InterlockedIncrement( &retries_[ 4 ] );
}
}
if( st.writer && st.readers == 1 ) // we were the last reader and a
writer is waiting
{
ReleaseSemaphore( sema_wp_, 1, 0 );
}
else
{
unblock_one();
}
}
void unlock()
{
state st = state_;
for( ;; )
{
state xchg = st;
assert( st.readers == 0 );
xchg.writer = 0;
//xchg.blocked = 0;
if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
{
break;
}
else
{
InterlockedIncrement( &retries_[ 5 ] );
}
}
unblock_one();
}
public:
long retries( int ix )
{
return InterlockedExchange( &retries_[ ix ], 0 );
}
public:
// lock classes
class scoped_read_lock
{
private:
rw_mutex & mx_;
scoped_read_lock( scoped_read_lock const & );
scoped_read_lock & operator=( scoped_read_lock const & );
public:
scoped_read_lock( rw_mutex & mx ): mx_( mx )
{
mx_.rdlock();
}
~scoped_read_lock()
{
mx_.rdunlock();
}
};
class scoped_write_lock
{
private:
rw_mutex & mx_;
scoped_write_lock( scoped_write_lock const & );
scoped_write_lock & operator=( scoped_write_lock const & );
public:
scoped_write_lock( rw_mutex & mx ): mx_( mx )
{
mx_.lock();
}
~scoped_write_lock()
{
mx_.unlock();
}
};
};
#endif // #ifndef BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED
Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk