#ifndef __TEST_COMMANDQUEUE2__ #define __TEST_COMMANDQUEUE2__ #include "Universal/CommandQueue.hpp" #include "Threading/Runnable.hpp" #include using namespace Universal; boost::mutex _iomutex; class Worker : public Threading::Runnable { public: Worker(QueueAccessor* q):Threading::Runnable(), _queue(q), _pause_sec(0){} void Run() { while(ShouldContinue()) { this->Do(); if(this->_pause_sec > 0) this->Pause(this->_pause_sec); } } virtual void Do() = 0; protected: QueueAccessor* _queue; int _pause_sec; }; class Writer : public Worker { public: Writer(QueueAccessor* q):Worker(q),_index(0){ this->_pause_sec = 2; } void Do() { boost::mutex::scoped_lock scoped_lock(_iomutex); QueueItem item("hi there", this->_index++); this->_queue->Push(item); std::cout << "Add" << std::endl; } void Stop() { Threading::Runnable::Stop(); boost::mutex::scoped_lock scoped_lock(_iomutex); std::cout << "Stopped Writer" << std::endl; } private: short _index; }; class Reader : public Worker { public: Reader(QueueAccessor* q):Worker(q){} void Do() { if (this->_queue->Size() > 0) { boost::mutex::scoped_lock scoped_lock(_iomutex); QueueItem item = this->_queue->Pop(); std::cout << "Removed id=" << item._data_len << " Data=" << item._data << std::endl; } } void Stop() { Threading::Runnable::Stop(); boost::mutex::scoped_lock scoped_lock(_iomutex); std::cout << "Stopped Reader" << std::endl; } }; void test() { QueueAccessor* q = new QueueAccessor(); Reader reader(q); Writer writer1(q); Writer writer2(q); // set up threads Threading::boost_thread_starter starter((Threading::Runnable *)&reader); boost::thread* _reader_thread = new boost::thread(starter); Threading::boost_thread_starter starter2((Threading::Runnable *)&writer1); boost::thread* _writer_thread1 = new boost::thread(starter2); Threading::boost_thread_starter starter3((Threading::Runnable *)&writer2); boost::thread* _writer_thread2 = new boost::thread(starter3); // pause for 5 secs boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += 5; boost::thread::sleep(xt); reader.Stop(); writer1.Stop(); writer2.Stop(); _writer_thread1->join(); _writer_thread2->join(); _reader_thread->join(); std::cout << "Worker threads have exited\n"; delete q; char c; std::cin >> c; } #endif __TEST_COMMANDQUEUE2__