#ifdef HAVE_CONFIG_H #include #endif #include #include #include #include #include #include #include #include typedef ogx::queue< int > queue_t; class producer { private: queue_t & q_; boost::barrier & b_; public: producer ( queue_t & q, boost::barrier & b) : q_( q), b_( b) {} void operator()() { b_.wait(); for ( int i = 0; i < 3; ++i) { q_.enqueue( boost::shared_ptr< int >( new int( i) ) ); printf("enqueued: %d\n", i); } } }; class consumer { private: queue_t & q_; boost::barrier & b_; public: consumer ( queue_t & q, boost::barrier & b) : q_( q), b_( b) {} void operator()() { boost::shared_ptr< int > i; b_.wait(); while ( q_.dequeue( i) ) { printf("dequeued: %d\n", * i); } } }; int main( int argc, char *argv[]) { try { boost::thread_group tg1, tg2; queue_t q; boost::barrier b( 4); producer p( q, b); consumer c( q, b); tg1.create_thread( boost::bind< void >( p) ); tg1.create_thread( boost::bind< void >( p) ); tg2.create_thread( boost::bind< void >( c) ); tg2.create_thread( boost::bind< void >( c) ); tg1.join_all(); // all producer threads have finished their jobs q.deactivate(); // deactive the queue tg2.join_all(); // all consumer have finished their jobs, no message should be left inside the queue } catch ( std::exception const& e) { std::cerr << e.what() << std::endl; } catch (...) { std::cerr << "unhandled exception" << std::endl; } return EXIT_SUCCESS; }