/*************************************************************************** * Copyright (C) 2005 by Oliver Kowalke * * oliver.kowalke@gmx.de * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include typedef ogx::message_queue< int > queue_t; class producer { private: queue_t & q_; boost::barrier & b_; public: producer ( queue_t & q, boost::barrier & b) : q_( q), b_( b) {} void operator()() { b_.wait(); for ( int i = 0; i < 3; ++i) { q_.enqueue( boost::shared_ptr< int >( new int( i) ) ); printf("enqueued: %d\n", i); } } }; class consumer { private: queue_t & q_; boost::barrier & b_; public: consumer ( queue_t & q, boost::barrier & b) : q_( q), b_( b) {} void operator()() { boost::shared_ptr< int > i; b_.wait(); while ( q_.dequeue( i) ) { printf("dequeued: %d\n", * i); } } }; int main( int argc, char *argv[]) { try { boost::thread_group tg1, tg2; queue_t q; boost::barrier b( 4); producer p( q, b); consumer c( q, b); tg1.create_thread( boost::bind< void >( p) ); tg1.create_thread( boost::bind< void >( p) ); tg2.create_thread( boost::bind< void >( c) ); tg2.create_thread( boost::bind< void >( c) ); tg1.join_all(); // all producer threads have finished their jobs q.deactivate(); // deactive the queue tg2.join_all(); // all consumer have finished their jobs, no message should be left inside the queue } catch ( std::exception const& e) { std::cerr << e.what() << std::endl; } catch (...) { std::cerr << "unhandled exception" << std::endl; } return EXIT_SUCCESS; }