Boost logo

Boost Users :

From: svanechel (svanechel_at_[hidden])
Date: 2002-02-22 06:42:20


Hi,

I'm trying to create a mechanism that synchronizes access to
shared
data on a per-operation basis. The idea is that some operations (E.g.
read operations) can proceed concurrently, while others (write
operations) must block all other threads. I thought boost::condition
would do the trick. Since I do not have that much experience in
multithreaded programming it is entirely possible that I do not fully
understand boost::condition or its allowable usage.

I tried to put operations in an "Operation Group". All
operations
belonging to the same group are allowed to execute concurrently,
while all others are blocked (E.g. all read operations are in the
same group). It is also possible that an operation doesn't
specify an
Operation Group, in which case a temporary group is assigned to the
operation for each thread that calls it. Write operations are
constructed this way.

Below is an example of what I mean (OperationLevelLockable is defined
below):

class SomeClass : private OperationLevelLockable
{
public:
int ReadData()
{
        Lock lock( this, READ_OP_GROUP );
        
        return ( mData );
}

void WriteData( int data )
{
        Lock lock( this ); // assign temp group to operation,
                                // so each write
                                 // operation blocks all other accesses
        mData = data;
}

private:
        int mData;
};

I'm developing on Windows 2000 on a multiprocessor machine.

This is the code I use to test my idea:

const int32 OPERATION_GROUP_1 = 1;
const int32 OPERATION_GROUP_2 = 2;
const int NUM_THREADS = 4;

class TestOperationLevelLockable : public MTTestCase
{
public:
        TestOperationLevelLockable() :
                mShared( this ), mCurrentGroup( 0 ),
                mThreadCount( 0 )
        {
                // Create two sets of threads, each performing a
                // different operation
                // When the TestOperationLevelLockable object is
                // constructed all
                 // registered threads are created an start running
                RegisterThread( this,
&TestOperationLevelLockable::Operation1Thread, NUM_THREADS );
                RegisterThread( this,
&TestOperationLevelLockable::Operation2Thread, NUM_THREADS );
        }

        void CallFromGroup( int32 opGroup )
        {
                boost::mutex::scoped_lock lock( mMutex );

                if ( 0 == mCurrentGroup )
                {
                        mCurrentGroup = opGroup;
                }

                if ( ( mCurrentGroup != opGroup ) &&
                     ( mThreadCount < NUM_THREADS ) )
                {
                        ReportFailure( "OperationLevelLockable failed
to lock out operations of different group", __FILE__, __LINE__ );
                }

                ++mThreadCount;
        }

protected:
        class MTObject : public OperationLevelLockable
        {
        public:
                MTObject( TestOperationLevelLockable* target ) :
                        mTarget( target ) {}

                void CallFromGroup( int32 opGroup )
                {
                        mTarget->CallFromGroup( opGroup );
                }
        private:
                TestOperationLevelLockable* mTarget;
        };

        MTObject mShared;
        int32 mCurrentGroup;
        int mThreadCount;
        boost::mutex mMutex;

        

        void DoMTTest()
        {
        }

        void Operation1Thread()
        {
                MTObject::Lock lock( mShared,
                                              OPERATION_GROUP_1 );

                mShared.CallFromGroup( OPERATION_GROUP_1 );

                SleepThread( 30 ); // seconds, to keep the thread busy
        }

        void Operation2Thread()
        {
                MTObject::Lock lock( mShared,
                                              OPERATION_GROUP_2 );

                mShared.CallFromGroup( OPERATION_GROUP_2 );

                SleepThread( 30 ); // seconds, to keep the thread busy
        }
};

And here is OperationLevelLockable:

class OperationLevelLockable
{
        class AccessCounter;
public:
        OperationLevelLockable(){}
        
        typedef Internal::DefaultTMIntType
                        IntType;

        template<typename T>
        struct MakeSharedType
        {
                typedef T volatile
                                                Type;
        };

        class Lock;
        friend class Lock;

        
        class Lock
        {
        public:
                Lock( OperationLevelLockable& obj ) :
                        mCounter( obj.mOpAccessCounter )
                {
                   mCounter.EnterOperation( Lock::TempOpGroup() );
                }

                Lock( OperationLevelLockable& obj, int32 opGroup ) :
                        mCounter( obj.mOpAccessCounter )
                {
                   mCounter.EnterOperation( opGroup );
                }

                ~Lock()
                {
                        mCounter.LeaveOperation();
                }

        private:
                Lock( Lock const& );
                Lock& operator=( Lock const& );

                static IntType TempOpGroup()
                {
                   static volatile IntType opGroup =
                                      Internal::kFIRST_AUTO_GROUP;

                   IntType newGroup =
OperationLevelLockable::AtomicIncrement( opGroup );
                        
                   if ( newGroup >= Internal::kLAST_AUTO_GROUP )
                   {
                     OperationLevelLockable::AtomicAssign( opGroup,
Internal::kFIRST_AUTO_GROUP );
                   }

                   return ( newGroup );
                }

                AccessCounter& mCounter;
        };

static inline IntType AtomicIncrement( volatile IntType& lval );
static inline IntType AtomicDecrement( volatile IntType& lval );
static inline void AtomicAssign( IntType& lval, IntType rVal );
static inline void AtomicAssign( volatile IntType& lval, IntType
rVal );

private:

        class AccessCounter :
                private
boost::base_from_member<boost::recursive_mutex>
        {
        public:

                AccessCounter() :
                        mLock( member ), mRefCnt( 0 ),
                        mOpGroup( Internal::kANY_GROUP ) {}

                bool IsOpGroupActive( IntType opGroup ) const
                {
                   return ( ( Internal::kANY_GROUP == mOpGroup ) ||
                            ( opGroup == mOpGroup ) );
                }

                void LeaveOperation()
                {
                        if ( 0 ==
OperationLevelLockable::AtomicDecrement( mRefCnt ) )
                        {
                           SetOpGroup( Internal::kANY_GROUP );
                           mCanOpProceed.notify_all();
                        }
                }

                void EnterOperation( IntType opGroup )
                {
                   mCanOpProceed.wait( mLock,
                                       CanOpProceed( *this,
                                                     opGroup ) );
                   SetOpGroup( opGroup );
                   OperationLevelLockable::AtomicIncrement( mRefCnt );
                }
                                
        private:

                AccessCounter( AccessCounter const& );
                AccessCounter& operator=( AccessCounter const& );

                struct CanOpProceed
                {
                        CanOpProceed( AccessCounter const& counter,
                                      IntType opGroup ) :
                                mOpGroup( opGroup ),
                                mCounter( counter ) {}

                        bool operator()() const
                        {
                          return (
                             mCounter.IsOpGroupActive( mOpGroup )
                          );
                        }

                private:
                        CanOpProceed& operator=( CanOpProceed
const& );

                int32 mOpGroup;
                AccessCounter const& mCounter;
                };

                void SetOpGroup( IntType opGroup )
                {
                 OperationLevelLockable::AtomicAssign( mOpGroup,
                                                       opGroup );
                }
                                        
                                
        boost::recursive_mutex::scoped_lock mLock;
        boost::condition mCanOpProceed;
        volatile IntType mRefCnt;
        volatile IntType mOpGroup;
        };

        OperationLevelLockable( OperationLevelLockable const& );
        OperationLevelLockable& operator=( OperationLevelLockable
const& );

        AccessCounter mOpAccessCounter;
};

The idea is that the first operation that is executed acquires the
lock and blocks out all threads not belonging to the same Operation
Group. The original blocking works fine. Only threads executing in
the same Operation Group are allowed to continue. When the last
thread of the group leaves the operation in
OperationLevelLockable::AccessCounter::LeaveOperation it releases all
waiting threads and a thread from the other group is allowed to run.
However when this thread finishes no new threads are allowed to run
anymore and the application hangs.

Am I using boost::condition incorrectly. Should I use something else?
It the test code wrong? Are there any other errors in the code that
prevent it from behaving as expected. Should I use another technique
to achieve the same?

I hope there is someone out there who has the hart to read through
the entire posting and help me. I hope I have given enough
information.

Thx.

Sven Van Echelpoel.


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net