Hi Gottlob,
Thanks for your detailed reply.
I will go through it and see if I can implement what you have suggested.
In the mean time, please find below a complete program.
I did not send it in the previous mail just to save on the space.
Let me know if you have any more comments.
-----------------------------------------------------------------------
# include <iostream>
# include <vector>
# include <string>
# include <boost/thread/thread.hpp>
# include <boost/thread/pthread/condition_variable.hpp>
using namespace std;
using namespace boost;
boost::condition_variable new_job_condition;
boost::mutex new_job_mutex;
boost::mutex new_job_notification_reached_flag_mutex;
bool new_job_notification_reached_flag;
boost::condition_variable job_read_condition;
boost::mutex job_read_mutex;
boost::mutex job_read_notification_reached_flag_mutex;
bool job_read_notification_reached_flag;
boost::condition_variable job_done_condition;
boost::mutex job_done_mutex;
boost::mutex job_done_notification_reached_flag_mutex;
bool job_done_notification_reached_flag;
boost::condition_variable worker_exit_notification_condition;
boost::mutex worker_exit_mutex;
boost::mutex worker_exit_notification_flag_mutex;
bool worker_exit_notification_reached_flag;
boost::mutex job_string_mutex;
string job_string;
boost::mutex whole_job_completed_mutex;
bool whole_job_completed_flag;
void enable_flag (boost::mutex &mut, bool &flag)
{
mut.lock ();
flag = 1;
mut.unlock ();
}
void disable_flag (boost::mutex &mut, bool &flag)
{
mut.lock ();
flag = 0;
mut.unlock ();
}
void mywait (boost::condition_variable &cond, boost::mutex &mut, boost::mutex &flag_mut, bool &flag)
{
boost::unique_lock<boost::mutex> lock(mut);
enable_flag (flag_mut, flag);
cond.wait (lock);
}
void notify_until_success (boost::condition_variable &cond, boost::mutex &mut, boost::mutex &flag_mut, bool &flag)
{
for (;;)
{
mut.lock ();
flag_mut.lock ();
if (flag == 1)
{
cond.notify_one();
flag = 0;
flag_mut.unlock ();
mut.unlock ();
break;
}
flag_mut.unlock ();
mut.unlock ();
}
}
void worker_function (void)
{
for (;;)
{
/* wait for the new job */
mywait (new_job_condition, new_job_mutex, new_job_notification_reached_flag_mutex, new_job_notification_reached_flag);
whole_job_completed_mutex.lock();
if (whole_job_completed_flag == 1)
{
whole_job_completed_mutex.unlock();
notify_until_success (worker_exit_notification_condition, worker_exit_mutex, worker_exit_notification_flag_mutex, worker_exit_notification_reached_flag);
return;
}
whole_job_completed_mutex.unlock();
/* read the new job */
job_string_mutex.lock();
string local_job_string = job_string;
job_string = "";
job_string_mutex.unlock();
/* notify job read successfully */
notify_until_success (job_read_condition, job_read_mutex, job_read_notification_reached_flag_mutex, job_read_notification_reached_flag);
/* execute job */
system (local_job_string.c_str ());
/* notify job done successfully */
notify_until_success (job_done_condition, job_done_mutex, job_done_notification_reached_flag_mutex, job_done_notification_reached_flag);
}
}
class MANAGER
{
private :
thread_group G;
int nWorkers;
public :
MANAGER (int num) : nWorkers (num)
{
for (int i=0; i<nWorkers; i++)
{
G.create_thread (worker_function);
}
}
int run_job (vector <string> &joblist);
void join_all (void)
{
G.join_all ();
}
};
int MANAGER :: run_job (vector <string> &joblist)
{
int free_workers = nWorkers;
vector<string> :: iterator it;
for (it = joblist.begin (); it != joblist.end (); ++it)
{
/* read the job */
job_string_mutex.lock();
job_string = *it;
job_string_mutex.unlock();
/* notify any one of the free threads about new_job */
notify_until_success (new_job_condition, new_job_mutex, new_job_notification_reached_flag_mutex, new_job_notification_reached_flag);
/* wait for job_read notification from that thread */
mywait (job_read_condition, job_read_mutex, job_read_notification_reached_flag_mutex, job_read_notification_reached_flag);
/* worker got allocated to the job so decrement number of free_workers */
free_workers--;
if (free_workers > 0)
{
/* enable flag new_job_notification_reached_flag here; as it got disabled in notify_until_success() */
enable_flag (new_job_notification_reached_flag_mutex, new_job_notification_reached_flag);
}
/* if there are free workers and there are jobs available */
if (free_workers > 0 && ((it + 1) != joblist.end ()))
continue;
/* wait for job_done notification from any of the busy thread */
mywait (job_done_condition, job_done_mutex, job_done_notification_reached_flag_mutex, job_done_notification_reached_flag);
/* worker is idle now so increment number of free_workers */
free_workers++;
}
/* at this points all the jobs have exhausted and some workers may be idle (listening for new job) and some may be busy (still doing their job)
* hence wait() for all of them to report job_done
*/
while (free_workers != nWorkers)
{
mywait (job_done_condition, job_done_mutex, job_done_notification_reached_flag_mutex, job_done_notification_reached_flag);
free_workers++;
}
/* at this point all worker threads have done their job */
/* now ask all of them about new_job so that they will unblock and will check for whole_job_completed_flag for exit */
enable_flag (whole_job_completed_mutex, whole_job_completed_flag);
int i=0;
while (i<nWorkers)
{
notify_until_success (new_job_condition, new_job_mutex, new_job_notification_reached_flag_mutex, new_job_notification_reached_flag);
mywait (worker_exit_notification_condition, worker_exit_mutex, worker_exit_notification_flag_mutex, worker_exit_notification_reached_flag);
i++;
/* enable flag new_job_notification_reached_flag here; as it got disabled in notify_until_success() */
enable_flag (new_job_notification_reached_flag_mutex, new_job_notification_reached_flag);
}
join_all ();
return 0;
}
int main (void)
{
int nWorkers = 3;
MANAGER M(nWorkers);
vector<string> tclist;
tclist.push_back ("sleep 1");
tclist.push_back ("sleep 2");
tclist.push_back ("sleep 3");
tclist.push_back ("sleep 4");
tclist.push_back ("sleep 5");
tclist.push_back ("sleep 6");
tclist.push_back ("sleep 7");
tclist.push_back ("sleep 8");
tclist.push_back ("sleep 9");
tclist.push_back ("sleep 10");
tclist.push_back ("sleep 1");
tclist.push_back ("sleep 2");
tclist.push_back ("sleep 3");
tclist.push_back ("sleep 4");
tclist.push_back ("sleep 5");
tclist.push_back ("sleep 6");
tclist.push_back ("sleep 7");
tclist.push_back ("sleep 8");
tclist.push_back ("sleep 9");
tclist.push_back ("sleep 10");
M.run_job (tclist);
}
-----------------------------------------------------------------------
Regards,
Girish
--- On Tue, 1/19/10, Gottlob Frege <gottlobfrege@gmail.com> wrote:
|