Boost logo

Boost Users :

Subject: Re: [Boost-users] wait/notify problem
From: girish hilage (girish_hilage_at_[hidden])
Date: 2010-02-03 09:52:42


Hi Gottlob,

   Please find below code I have now written after your suggestions.
   Let me know if this looks ok.
   I also got help from http://unix.derkeiler.com/Newsgroups/comp.unix.programmer/2005-02/0208.html

# include <iostream>
# include <vector>
# include <string>
# include <boost/thread/thread.hpp>
# include <boost/thread/pthread/condition_variable.hpp>

using namespace std;
using namespace boost;

bool command_from_mgr = 0;
bool response_from_worker = 0;
bool new_job;
bool job_read;
bool job_done;
bool whole_job_completed;

boost::condition_variable cond, cond1;
boost::mutex mut, mut1;

boost::mutex job_string_mutex;
string job_string;

void worker_function (void)
{
    for (;;)
    {
        /* wait for the new job */
        {
            boost::unique_lock<boost::mutex> lock(mut);
            while (!command_from_mgr)
            {
                cond.wait (lock);
            }

            if (new_job == 1)
            {
                new_job = 0;
                command_from_mgr = 0;
            }
            else if (whole_job_completed == 1)
            {
                return;
            }
        }

        /* read the new job */
        job_string_mutex.lock();
        string local_job_string = job_string;
        job_string = "";
        job_string_mutex.unlock();

        /* notify manager about job read */
        {
            boost::lock_guard<boost::mutex> lock(mut1);
            job_read = 1;
            response_from_worker = 1;
            cond1.notify_one ();
        }

        /* execute job */
        system (local_job_string.c_str ());

        /* notify job done successfully */
        {
            boost::lock_guard<boost::mutex> lock(mut1);
            job_done = 1;
            response_from_worker = 1;
            cond1.notify_one ();
        }
    }
}

class MANAGER
{
    private :
        thread_group G;
        int nWorkers;

    public :
        MANAGER (int num) : nWorkers (num)
        {
            for (int i=0; i<nWorkers; i++)
            {
                 G.create_thread (worker_function);
            }
        }

        int run_job (vector <string> &joblist);
        void join_all (void)
        {
            G.join_all ();
        }
};

int MANAGER :: run_job (vector <string> &joblist)
{
    int free_workers = nWorkers;
    vector<string> :: iterator it;

    for (it = joblist.begin (); it != joblist.end (); )
    {
        if (free_workers > 0)
        {
            /* read the job */
            job_string_mutex.lock();
            job_string = *it;
            job_string_mutex.unlock();
            ++it;

            /* notify any one of the free threads about new_job */
            {
                boost::lock_guard<boost::mutex> lock(mut);
                new_job = 1;
                command_from_mgr = 1;
            }
            cond.notify_one ();
        }

        /* wait for job_read, job_done notifications from any worker
thread */
        {
            boost::unique_lock<boost::mutex> lock(mut1);
            while (!response_from_worker)
            {
                cond1.wait (lock);
            }

            if (job_read == 1)
            {
                job_read = 0;
                response_from_worker = 0;

                /* worker got allocated to the job so decrement number
of free_workers */
                free_workers--;
            }

            if (job_done == 1)
            {

                /* worker is idle now so increment number of
free_workers */
                free_workers++;
                job_done = 0;
                response_from_worker = 0;
            }
        }
    }

    /* wait for job_done notifications from all the busy worker threads
*/
    while (free_workers != nWorkers)
    {
        boost::unique_lock<boost::mutex> lock(mut1);
        while (!response_from_worker)
        {
            cond1.wait (lock);
        }

        if (job_done == 1)
        {
            /* worker is idle now so increment number of free_workers */
            free_workers++;
            job_done = 0;
            response_from_worker = 0;
        }
    }

    /* notify all workers about whole_job_completed */
    {
        boost::lock_guard<boost::mutex> lock(mut);
        command_from_mgr = 1;
        whole_job_completed = 1;
        cond.notify_all ();
    }

    join_all ();
    return 0;
}

int main (void)
{
    int nWorkers = 2;
    MANAGER M(nWorkers);
    vector<string> tclist;

    tclist.push_back ("sleep 1");
    tclist.push_back ("sleep 2");
    tclist.push_back ("sleep 3");
    tclist.push_back ("sleep 4");
    tclist.push_back ("sleep 5");
    tclist.push_back ("sleep 6");
    tclist.push_back ("sleep 7");
    tclist.push_back ("sleep 8");
    tclist.push_back ("sleep 9");
    tclist.push_back ("sleep 10");
    tclist.push_back ("sleep 1");
    tclist.push_back ("sleep 2");
    tclist.push_back ("sleep 3");
    tclist.push_back ("sleep 4");
    tclist.push_back ("sleep 5");
    tclist.push_back ("sleep 6");
    tclist.push_back ("sleep 7");
    tclist.push_back ("sleep 8");
    tclist.push_back ("sleep 9");
    tclist.push_back ("sleep 10");

    M.run_job (tclist);
}

Regards,
Girish

--- On Tue, 1/19/10, Gottlob Frege <gottlobfrege_at_[hidden]> wrote:

From: Gottlob Frege <gottlobfrege_at_[hidden]>
Subject: Re: [Boost-users] wait/notify problem
To: boost-users_at_[hidden]
Date: Tuesday, January 19, 2010, 5:09 PM

On Mon, Jan 18, 2010 at 8:00 AM, girish hilage <girish_hilage_at_[hidden]> wrote:

   The code I have now written is as follows (which seems to work fine) :

This code is broken.  I find it hard to read and understand, and it contains 4 different mutexes, which is asking for trouble.  But besides those 'aesthetic' complaints, it really is broken.

(IIUC) It appears that you are trying very hard to make sure someone is waiting on the condition before notifying.  I was tempted to say this can't be done, but you have done it nonetheless.  Although I am not sure it helps.

Please research 'spurious wakeups' of condition variables:
 

void mywait (boost::condition_variable &cond, boost::mutex &mut, boost::mutex &flag_mut, bool &flag)
{
   boost::unique_lock<boost::mutex> lock(mut);

   enable_flag (flag_mut, flag);

   cond.wait (lock);

This cond.wait(lock) may wake up even though cond.notify_one() was NOT called.  This may seem strange, but it is allowed because otherwise implementing conditions becomes very hard.  And by allowing it, it encourages developers to code with conditions properly.  To use a condition you MUST do so in a loop which checks the global *state* that the condition is communicating.  ie in your case 'new job available'.  The standard could have stored state inside the condition variable, but it was harder, and it was also assumed that you can determine the state better on your own, so it is outside the condvar. It also makes it easier for multiple threads to agree - they can all look at the external state (available jobs) and decide if something needs to be done, or if another thread has already done it.

Another way to look at things: spurious wakeups mean that, in effect, notify_one might wake up more than one thread (they still battle for the mutex of course), but notify_one is essentially (worse case) the same as notify_all - ie all the other threads could 'spuriously' wake up.  ie notify_one could be implemented by just calling notify_all (it would be a poor implementation, but valid).  So notify_one is just an optimization (*probably* only one thread will awaken), not a guarantee.
 
So what happens in your code if cond.wait() wakens spuriously?  It appears a job is 'done' (or attempted) even though no job exists.

If you can tell (from the worker thread) that there is no job to do, then make THAT your external state, and wrap the cond.wait() inside a loop:

}

void worker_function (void)

 for (;;)
   { 

         while ( ! /* there is work to be done */)  // note the '!' at the front

            cond.wait(mutex);

      ..... read new job .....

 
         // probably need to hold lock while reading job (from some shared queue or whatever)
         // then unlock so you can do the work *in parallel* with other threads

         mutex.unlock(); // unlock before doing work
 

      ..... do the work .....

         mutex.lock();   // lock again before checking /* there is work to be done */, unlocked inside cond.wait()

   }
}

int main (void)
{
   ..... create worker threads .....

   while (/*there is work to be done */)
   {

                 cond.notify_all();  // there may be more than one job to do, so notify all
 

      /* wait for job_done notification from any of the busy thread */
      mywait (job_done_condition, job_done_mutex, job_done_notification_reached_flag_mutex, job_done_notification_reached_flag);

Why do you wait for the job to be done?
If you send a job to another thread, then do nothing but wait for it to be done, why not just do it yourself?
ie it appears that your code will run like this:

Thread 1          Thread 2
                        wait:
check for job     .....
notify                wake
wait for done:     do job
.....                   keep doing that job
.....                   keep doing that job
.....                   job done!
wake!                wait for new job (ie repeat)

do the 2 threads ever do any real work at the same time, or is one always waiting for the other?

 

   }
}

   I hope this will not create any problems.
   (I have also edited the problem statement in the mail below to make it more clear. Shown by __TEXT_EDITED__.)
Regards,
Girish

I honestly had (still have?) trouble understanding the code, but I don't think it works, and I think it could be much simpler.

Basically, just use the condvars to tell threads 'hey *CHECK* if there is something to do'.  Do NOT try to use condvars to actually communicate state.  Just 'hey check the state'.  Then let the threads decide what to do based on that state.  condvars also nicely handle the lock/unlock/relock that you probably needed to do around the check and wait anyhow.  So the primary purpose of the lock is to protect your state, not for the condvar.  The condvar is just nice enough to know how to work with that.

In your case your state is the state of the job list.  Put a mutex around that, and have the condvar use that mutex as well.  Check the state of the job list to decide what (if anything) to do.

There is only one state (job list) and one mutex and one condvar.

Tony

Tony
-----Inline Attachment Follows-----

_______________________________________________
Boost-users mailing list
Boost-users_at_[hidden]
http://lists.boost.org/mailman/listinfo.cgi/boost-users



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net