//OriginalSource: // http://article.gmane.org/gmane.comp.lib.boost.user/59193 // // cross-thread event queue. multiple threads can post jobs, one or many threads can execute // jobs. #include #include #include #include #include #include #include #include //Above from https://svn.boost.org/svn/boost/sandbox/variadic_templates class EventQueue ; // typedef generalised callback types template struct cbn { typedef void (T::*type)(boost::shared_ptr...); // member function of the form 'T::fn(shared_ptr)' }; typedef boost::function EventQueueJob; //----------------------------------------------------------------------- // the event class which keeps track of our job and the queue we want to post the job to class event { EventQueue &queue_; // the event queue which will process the job EventQueueJob job_; event(EventQueue &queue, EventQueueJob &job) : queue_(queue), job_(job) { } // event creation friends template friend boost::shared_ptr create_event (boost::shared_ptr that, typename cbn::type cb, EventQueue &queue); // event post friends template friend void post_to_event(boost::shared_ptr &ev, boost::shared_ptr&... data); }; //----------------------------------------------------------------------- class EventQueue { public: typedef EventQueueJob Job; boost::mutex mtx_; boost::condition cnd_; typedef std::list Jobs; Jobs jobs_; int stop_; // puts a job into the queue void post(Job job) { boost::mutex::scoped_lock lock(mtx_); jobs_.push_back(job); cnd_.notify_one(); } public: // pulls one job from the queue, returns false when stopped bool pull(Job* job) { boost::mutex::scoped_lock lock(mtx_); for(;;) { // handle spurious wake-ups while(!stop_ && jobs_.empty()) cnd_.wait(lock); if(!jobs_.empty() && 2 != stop_) { job->swap(jobs_.front()); // move efficiently, avoiding *job = jobs.front() jobs_.pop_front(); return true; } else if(stop_) { return false; } } } // make pull() return false void stop(bool cancel_jobs) { boost::mutex::scoped_lock lock(mtx_); stop_ = 1 + cancel_jobs; // 1 - complete jobs, 2 - cancel jobs cnd_.notify_all(); } EventQueue() : stop_() {} ~EventQueue() { this->stop(true); } // event post friends template friend void post_to_event(boost::shared_ptr &ev, boost::shared_ptr&... data); }; //----------------------------------------------------------------------- // helper functions which create new events template struct indices_create_event ; template struct indices_create_event > { template static boost::shared_ptr _(boost::shared_ptr that, typename cbn::type cb, EventQueue &queue) { return boost::shared_ptr(new event(queue, boost::bind(cb, that, boost::arg()...))); } }; template boost::shared_ptr create_event(boost::shared_ptr that, typename cbn::type cb, EventQueue &queue) { // encapsulate an object and it's member function using bind, placeholder for argument to come typedef boost::mpl::package_range_c indices; return indices_create_event::_(that,cb,queue); } //----------------------------------------------------------------------- // helper functions which post events onto the event's queue template void post_to_event(boost::shared_ptr &ev, boost::shared_ptr&... data) { // bind the data to the job ev->queue_.post(boost::bind(ev->job_, data...)); } //----------------------------------------------------------------------- struct JobX // micro-oprimization, embed the reference counter into the job to avoid an extra memory // allocation by boost::shared_ptr ctor : boost::enable_shared_from_this { void foo() { printf("%p foo()\n", this); } void bar(int a) { printf("%p bar(%d)\n", this, a); } }; void anotherThread(EventQueue* queue) { EventQueue::Job job; // wait and execute jobs till stopped while(queue->pull(&job)) job(); // execute the job } int main() { EventQueue queue; // start another thread and pass an argument boost::thread another_thread(boost::bind(anotherThread, &queue)); // post jobs, allocate in this thread, deallocate in the other boost::shared_ptr job_0(new JobX); queue.post(boost::bind(&JobX::foo, job_0)); // post several jobs to the same object, deallocated when no longer in use boost::shared_ptr job_1(new JobX); queue.post(boost::bind(&JobX::foo, job_1)); queue.post(boost::bind(&JobX::bar, job_1, 1)); // pass an extra arg 1 queue.post(boost::bind(&JobX::bar, job_1, 2)); // pass an extra arg 2 // stop the queue and let it complete all jobs queue.stop(false); another_thread.join(); }