Boost logo

Boost :

From: Peter Dimov (pdimov_at_[hidden])
Date: 2006-10-26 16:46:15


Peter Dimov wrote:
>>> (This redundant wakeup may not be visible in a benchmark, but it
>>> can steal CPU from other busy non-contending threads in a real
>>> application.)
>
> To test this, I added "free" threads to the benchmark. These compete
> on a different mutex. With my latest experiment, this reveals
> odd/intriguing patterns of the form
>
> 2R+4W+0F: 29
> 2R+4W+4F: 18
>
> The additional four free threads improve performance!

That was because I had an inefficiency in the algorithm, of course. Still,
the free threads do seem to help the scheduler sometimes.

I finally managed to match the "naive" implementation with a "fancy"
semaphore-based scheme. The basic algorithm is that each unlock unblocks one
waiting thread; the twist is that if the thread that wakes up is a reader,
it then unblocks all waiters. When a writer wakes up, it doesn't unblock
anyone, since it is on its way to exclusive access anyway. I haven't
reviewed the code for subtle correctness issues, though; no time for that
now. :-/

Here's the code:

#ifndef BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED
#define BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED

// MS compatible compilers support #pragma once

#if defined(_MSC_VER) && (_MSC_VER >= 1020)
# pragma once
#endif

// Copyright (c) 2005, 2006 Peter Dimov
//
// Distributed under the Boost Software License, Version 1.0.
// See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

#include <algorithm>
#include <cassert>
#include <limits.h>
#include <windows.h>

inline bool atomic_compare_exchange( long * destination, long * comparand,
long exchange )
{
    long ov = *comparand;
    long nv = InterlockedCompareExchange( destination, exchange, ov );

    if( ov == nv )
    {
        return true;
    }
    else
    {
        *comparand = nv;
        return false;
    }
}

inline bool atomic_compare_exchange_void( void * destination, void *
comparand, void const * exchange )
{
    return atomic_compare_exchange( (long*)destination, (long*)comparand,
*(long const*)exchange );
}

class rw_mutex
{
private:

    rw_mutex( rw_mutex const & );
    rw_mutex & operator=( rw_mutex const & );

private:

    struct state
    {
        unsigned writer: 1;
        unsigned readers: 31;
    };

    state state_;

    HANDLE sema_wp_; // writer pending
    HANDLE sema_rw_; // readers+writers blocked

    long retries_[ 6 ];

    long blocked_;

public:

    rw_mutex(): blocked_( 0 )
    {
        state st = { 0 };

        state_ = st;

        sema_wp_ = CreateSemaphore( 0, 0, LONG_MAX, 0 );
        sema_rw_ = CreateSemaphore( 0, 0, LONG_MAX, 0 );

        std::fill( retries_, retries_ + 6, 0 );
    }

    ~rw_mutex()
    {
        CloseHandle( sema_wp_ );
        CloseHandle( sema_rw_ );
    }

    void unblock_one()
    {
        if( (volatile const long&)blocked_ > 0 )
        {
            long k = InterlockedExchangeAdd( &blocked_, -1 );

            if( k > 0 )
            {
                ReleaseSemaphore( sema_rw_, 1, 0 );
            }
            else
            {
                // undo
                InterlockedIncrement( &blocked_ );
            }
        }
    }

    void unblock_all()
    {
        if( (volatile const long&)blocked_ > 0 )
        {
            long k = InterlockedExchange( &blocked_, 0 );

            if( k > 0 )
            {
                ReleaseSemaphore( sema_rw_, 1, 0 );
            }
        }
    }

    void rdlock()
    {
        state st = state_;

        for( ;; )
        {
            state xchg = st;

            if( st.writer )
            {
                InterlockedIncrement( &blocked_ );
                WaitForSingleObject( sema_rw_, INFINITE );
                unblock_all();
                st = state_;
            }
            else
            {
                xchg.readers = st.readers + 1;

                if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
                {
                    return; // got read lock
                }
                else
                {
                    InterlockedIncrement( &retries_[ 0 ] );
                }
            }
        }
    }

    void lock()
    {
        state st = state_;

        for( ;; )
        {
            state xchg = st;

            if( st.writer )
            {
                InterlockedIncrement( &blocked_ );
                WaitForSingleObject( sema_rw_, INFINITE );
                st = state_;
            }
            else if( st.readers )
            {
                xchg.writer = 1;

                if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
                {
                    WaitForSingleObject( sema_wp_, INFINITE );
                    return;
                }
                else
                {
                    InterlockedIncrement( &retries_[ 2 ] );
                }
            }
            else // free
            {
                xchg.writer = 1;

                if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
                {
                    return;
                }
                else
                {
                    InterlockedIncrement( &retries_[ 3 ] );
                }
            }
        }
    }

    void rdunlock()
    {
        state st = state_;

        for( ;; )
        {
            state xchg = st;

            assert( st.readers > 0 );

            --xchg.readers;

            if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
            {
                break;
            }
            else
            {
                InterlockedIncrement( &retries_[ 4 ] );
            }
        }

        if( st.writer && st.readers == 1 ) // we were the last reader and a
writer is waiting
        {
            ReleaseSemaphore( sema_wp_, 1, 0 );
        }
        else
        {
            unblock_one();
        }
    }

    void unlock()
    {
        state st = state_;

        for( ;; )
        {
            state xchg = st;

            assert( st.readers == 0 );

            xchg.writer = 0;
            //xchg.blocked = 0;

            if( atomic_compare_exchange_void( &state_, &st, &xchg ) )
            {
                break;
            }
            else
            {
                InterlockedIncrement( &retries_[ 5 ] );
            }
        }

        unblock_one();
    }

public:

    long retries( int ix )
    {
        return InterlockedExchange( &retries_[ ix ], 0 );
    }

public:

    // lock classes

    class scoped_read_lock
    {
    private:

        rw_mutex & mx_;

        scoped_read_lock( scoped_read_lock const & );
        scoped_read_lock & operator=( scoped_read_lock const & );

    public:

        scoped_read_lock( rw_mutex & mx ): mx_( mx )
        {
            mx_.rdlock();
        }

        ~scoped_read_lock()
        {
            mx_.rdunlock();
        }
    };

    class scoped_write_lock
    {
    private:

        rw_mutex & mx_;

        scoped_write_lock( scoped_write_lock const & );
        scoped_write_lock & operator=( scoped_write_lock const & );

    public:

        scoped_write_lock( rw_mutex & mx ): mx_( mx )
        {
            mx_.lock();
        }

        ~scoped_write_lock()
        {
            mx_.unlock();
        }
    };
};

#endif // #ifndef BOOST_DETAIL_RW_MUTEX_HPP_INCLUDED


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk