Boost logo

Boost Users :

From: Damien McGivern (d_mcgivern_at_[hidden])
Date: 2004-02-12 00:46:16


(First post)
Hi,
 I've just started working with boost and have come
into some trouble with the thread class. I've included
the test code that I'm having the problem with in the
hope that someone could point out to me where I'm
going wrong.

 The code tests the QueueAccessor class
(CommandQueue.hpp) with the test code in
CommandQueueTest.hpp. I created a base class
(Runnable) to aid in creating a thread that runs an
object's method rather than a function. The code
compiles OK and seems to run OK too at first until I
realised that the main thread has somehow become one
of the threads that I created and is in a continuous
loop after the first join() is called

_writer_thread1->join(); // line 21

 As you can probably guess from my code my background
is in Java so I'm not entirely sure if I'm using the
boost::thread correctly. I tested the QueueAccessor
using threads that use a static worker function
instead of an object's method and it seemed to run OK
so I'm assuming that the problem lies with the
Runnable base class.

 I'm using boost version 1.31.0 running Windows XP and
using VS.Net 2003 to compile/run the code.

Any help will be very much appreciated

Cheers
mcgiv

        
        
                
___________________________________________________________
BT Yahoo! Broadband - Free modem offer, sign up online today and save £80 http://btyahoo.yahoo.co.uk

#ifndef __TEST_COMMANDQUEUE2__
#define __TEST_COMMANDQUEUE2__

#include <Universal/CommandQueue.hpp>
#include <Threading/Runnable.hpp>

using namespace Universal;

boost::mutex _iomutex;

class Worker : public Threading::Runnable
{

        public:
                Worker(QueueAccessor* q):Threading::Runnable(), _queue(q), _pause_sec(0){}

                void Run()
                {
                        while(this->_running)
                        {
                                this->Do();
                                if(this->_pause_sec > 0)
                                        this->Pause(this->_pause_sec);
                        }
                }

                virtual void Do() = 0;

        protected:
                QueueAccessor* _queue;
                int _pause_sec;

};

class Writer : public Worker
{

        public:
                Writer(QueueAccessor* q):Worker(q),_index(0){
                        this->_pause_sec = 2;
                }

                void Do()
                {
                        boost::mutex::scoped_lock scoped_lock(_iomutex);
                        QueueItem item("hi there", this->_index++);
                        this->_queue->Push(item);
                        std::cout << "Add" << std::endl;
                }

                short _index;

                void Stop()
                {
                        boost::mutex::scoped_lock scoped_lock(_iomutex);
                        this->_running = false;//Threading::Runnable::Stop();
                        std::cout << "Stopped Writer" << std::endl;
                }

};

class Reader : public Worker
{

        public:
                Reader(QueueAccessor* q):Worker(q){}

                void Do()
                {
                        while(this->_queue->Size() == 0){}
                        boost::mutex::scoped_lock scoped_lock(_iomutex);
                        QueueItem item = this->_queue->Pop();
                        std::cout << "Removed id=" << item._data_len << " Data=" << item._data << std::endl;

                }

                void Stop()
                {
                        boost::mutex::scoped_lock scoped_lock(_iomutex);
                        this->_running = false;//Threading::Runnable::Stop();
                        std::cout << "Stopped Reader" << std::endl;
                }

};

void test()
{

        QueueAccessor* q = new QueueAccessor();

        Reader reader(q);
        Writer writer1(q);
        Writer writer2(q);

        // set up threads
        Threading::boost_thread_starter starter((Threading::Runnable *)&reader);
        boost::thread* _reader_thread = new boost::thread(starter);
        
        Threading::boost_thread_starter starter2((Threading::Runnable *)&writer1);
        boost::thread* _writer_thread1 = new boost::thread(starter2);

        Threading::boost_thread_starter starter3((Threading::Runnable *)&writer2);
        boost::thread* _writer_thread2 = new boost::thread(starter3);

        // start threads
        _writer_thread1->join();
        _writer_thread2->join();
        _reader_thread->join();

        // pause for 5 secs
        boost::xtime xt;
    boost::xtime_get(&xt, boost::TIME_UTC);
    xt.sec += 5;
    boost::thread::sleep(xt);

        reader.Stop();
        writer1.Stop();
        writer2.Stop();

        delete q;

        char c;
        std::cin >> c;

}

#endif __TEST_COMMANDQUEUE2__

#ifndef __IRUNNABLE__
#define __IRUNNABLE__

#include <boost/thread/thread.hpp>
#include <boost/thread/xtime.hpp>

namespace Threading
{

        class Runnable
        {
                public:
                        Runnable():_running(true){}

                        virtual void Run() = 0;
                        virtual void Stop(){_running = false;}

                protected:
                        bool _running;

                        // sleeps thread for x sec
                        void Pause(int sec)
                        {
                                boost::xtime xt;
                                boost::xtime_get(&xt, boost::TIME_UTC);
                                xt.sec += sec;
                                boost::thread::sleep(xt);
                        }

                        void Pause(int sec, int ns)
                        {
                                boost::xtime xt;
                                boost::xtime_get(&xt, boost::TIME_UTC);
                                xt.sec += sec;
                                xt.nsec += ns*1000;
                                boost::thread::sleep(xt);
                        }

        };

        // used to pass the Runnable object to the thread's start function
        struct boost_thread_starter
        {
                boost_thread_starter(Runnable* runnable):_runnable(runnable){}

                // function is called when boost::thread starts
                void operator()()
                {
                        // start the Runnable's run method
                        _runnable->Run();
                }
                
                Runnable* _runnable;
        };

}// end Threading namespace

#endif // __IRUNNABLE__


#ifndef __COMMANDQUEUE__
#define __COMMANDQUEUE__

#include <boost/thread/mutex.hpp>
#include <queue>

namespace Universal
{
        
        struct QueueItem
        {
                /*
                QueueItem(int thread, int value):_thread(thread), _value(value){}
                int _thread;
                int _value;
                //*/
                QueueItem(char *data, short data_len):_data(data), _data_len(data_len){}
                char *_data;
                short _data_len;
                //*/

        };

        // used to sync access to the queue
        class QueueAccessor
        {

                public:
                        QueueAccessor(){}

                        void Push(QueueItem item)
                        {
                                boost::mutex::scoped_lock scoped_lock(_mutex);

                                _queue.push(item);
                        }

                        QueueItem Pop()
                        {
                                boost::mutex::scoped_lock scoped_lock(_mutex);
                                 
                                if( _queue.size() > 0 )
                                {
                                        QueueItem tmp = _queue.front();
                                        _queue.pop();
                                        return tmp;
                                }

                        }

                        int Size()
                        {
                                return _queue.size();
                        }

                protected:
                        std::queue<QueueItem> _queue;
                        boost::mutex _mutex;
        };

}// Universal namespace

#endif // __COMMANDQUEUE__


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net