Hi Gottlob,
Please find below code I have now written after your suggestions. Let me know if this looks ok. I also got help from http://unix.derkeiler.com/Newsgroups/comp.unix.programmer/2005-02/0208.html # include <iostream>
# include <vector> # include <string> # include <boost/thread/thread.hpp> # include <boost/thread/pthread/condition_variable.hpp> using namespace std; using namespace boost; bool command_from_mgr = 0; bool response_from_worker = 0; bool new_job; bool job_read; bool job_done; bool whole_job_completed; boost::condition_variable cond, cond1; boost::mutex mut, mut1; boost::mutex job_string_mutex; string job_string; void worker_function (void) { for (;;) { /* wait for the new job */ { boost::unique_lock<boost::mutex> lock(mut); while (!command_from_mgr) { cond.wait (lock); } if (new_job == 1) { new_job = 0; command_from_mgr = 0; } else if (whole_job_completed == 1) { return; } } /* read the new job */ job_string_mutex.lock(); string local_job_string = job_string; job_string = ""; job_string_mutex.unlock(); /* notify manager about job read */ { boost::lock_guard<boost::mutex> lock(mut1); job_read = 1; response_from_worker = 1; cond1.notify_one (); } /* execute job */ system (local_job_string.c_str ()); /* notify job done successfully */ { boost::lock_guard<boost::mutex> lock(mut1); job_done = 1; response_from_worker = 1; cond1.notify_one (); } } } class MANAGER { private : thread_group G; int nWorkers; public : MANAGER (int num) : nWorkers (num) { for (int i=0; i<nWorkers; i++) { G.create_thread (worker_function); } } int run_job (vector <string> &joblist); void join_all (void) { G.join_all (); } }; int MANAGER :: run_job (vector <string> &joblist) { int free_workers = nWorkers; vector<string> :: iterator it; for (it = joblist.begin (); it != joblist.end (); ) { if (free_workers > 0) { /* read the job */ job_string_mutex.lock(); job_string = *it; job_string_mutex.unlock(); ++it; /* notify any one of the free threads about new_job */ { boost::lock_guard<boost::mutex> lock(mut); new_job = 1; command_from_mgr = 1; } cond.notify_one (); } /* wait for job_read, job_done notifications from any worker thread */ { boost::unique_lock<boost::mutex> lock(mut1); while (!response_from_worker) { cond1.wait (lock); } if (job_read == 1) { job_read = 0; response_from_worker = 0; /* worker got allocated to the job so decrement number of free_workers */ free_workers--; } if (job_done == 1) { /* worker is idle now so increment number of free_workers */ free_workers++; job_done = 0; response_from_worker = 0; } } } /* wait for job_done notifications from all the busy worker threads */ while (free_workers != nWorkers) { boost::unique_lock<boost::mutex> lock(mut1); while (!response_from_worker) { cond1.wait (lock); } if (job_done == 1) { /* worker is idle now so increment number of free_workers */ free_workers++; job_done = 0; response_from_worker = 0; } } /* notify all workers about whole_job_completed */ { boost::lock_guard<boost::mutex> lock(mut); command_from_mgr = 1; whole_job_completed = 1; cond.notify_all (); } join_all (); return 0; } int main (void) { int nWorkers = 2; MANAGER M(nWorkers); vector<string> tclist; tclist.push_back ("sleep 1"); tclist.push_back ("sleep 2"); tclist.push_back ("sleep 3"); tclist.push_back ("sleep 4"); tclist.push_back ("sleep 5"); tclist.push_back ("sleep 6"); tclist.push_back ("sleep 7"); tclist.push_back ("sleep 8"); tclist.push_back ("sleep 9"); tclist.push_back ("sleep 10"); tclist.push_back ("sleep 1"); tclist.push_back ("sleep 2"); tclist.push_back ("sleep 3"); tclist.push_back ("sleep 4"); tclist.push_back ("sleep 5"); tclist.push_back ("sleep 6"); tclist.push_back ("sleep 7"); tclist.push_back ("sleep 8"); tclist.push_back ("sleep 9"); tclist.push_back ("sleep 10"); M.run_job (tclist); } Regards, Girish
--- On Tue, 1/19/10, Gottlob Frege <gottlobfrege@gmail.com> wrote:
|