I am wanting to write library code that will allow a user to create an "event" which is associated with a thread, a class instance, and a member function of that class. The user can then "post" data (generalised number of arguments) - which results in the class instance's member function being called in the context of the associated thread.
#include <boost/function.hpp>#include <boost/bind.hpp>#include <boost/enable_shared_from_this.hpp>#include <boost/thread/thread.hpp>#include <boost/thread/mutex.hpp>#include <boost/thread/condition.hpp>#include <list>#include <stdio.h>// cross-thread event queue. multiple threads can post jobs, one or many threads can execute// jobs.class EventQueue{public:typedef boost::function<void()> Job;// puts a jon into the queuevoid post(Job job){boost::mutex::scoped_lock lock(mtx_);jobs_.push_back(job);cnd_.notify_one();}// pulls one job from the queue, returns false when stoppedbool pull(Job* job){boost::mutex::scoped_lock lock(mtx_);for(;;) { // handle spurious wake-upswhile(!stop_ && jobs_.empty())cnd_.wait(lock);if(!jobs_.empty() && 2 != stop_) {job->swap(jobs_.front()); // move efficiently, avoiding *job = jobs.front()jobs_.pop_front();return true;}else if(stop_) {return false;}}}// make pull() return falsevoid stop(bool cancel_jobs){boost::mutex::scoped_lock lock(mtx_);stop_ = 1 + cancel_jobs; // 1 - complete jobs, 2 - cancel jobscnd_.notify_all();}EventQueue() : stop_() {}~EventQueue() { this->stop(true); }private:boost::mutex mtx_;boost::condition cnd_;typedef std::list<Job> Jobs;Jobs jobs_;int stop_;};struct JobX// micro-oprimization, embed the reference counter into the job to avoid an extra memory// allocation by boost::shared_ptr ctor: boost::enable_shared_from_this<JobX>{void foo(){printf("%p foo()\n", this);}void bar(int a){printf("%p bar(%d)\n", this, a);}};void anotherThread(EventQueue* queue){EventQueue::Job job;// wait and execute jobs till stoppedwhile(queue->pull(&job))job(); // execute the job}int main(){EventQueue queue;// start another thread and pass an argumentboost::thread another_thread(boost::bind(anotherThread, &queue));// post jobs, allocate in this thread, deallocate in the otherboost::shared_ptr<JobX> job_0(new JobX);queue.post(boost::bind(&JobX::foo, job_0));// post several jobs to the same object, deallocated when no longer in useboost::shared_ptr<JobX> job_1(new JobX);queue.post(boost::bind(&JobX::foo, job_1));queue.post(boost::bind(&JobX::bar, job_1, 1)); // pass an extra arg 1queue.post(boost::bind(&JobX::bar, job_1, 2)); // pass an extra arg 2
// stop the queue and let it complete all jobsqueue.stop(false);another_thread.join();}