Boost logo

Boost Users :

Subject: Re: [Boost-users] [Thread] Timed join returning true before thread terminated
From: John Rocha (jrr_at_[hidden])
Date: 2012-03-16 14:00:38


Hello Anthony,

I followed your advice to verify where the threads where blocked. I analyzed
the GDB data and I found that the LU thread was blocked on a join() for worker
thread #7 (having sent it an interrupt) and that worker thread #7 was block on
a boost::condition_variable_any.

I then carefully looked at my boost thread source code, for version 1.43 of
boost, and I believe there is a race condition between the boost thread's
interrupt() and convar::wait() methods. Its a very small window, but I've been
able to reproduce the issue with logs.

Can you please read the following to find any flaws with my logic, or confirm
my suspicion? Also, if this is a defect with the library, what should I do
next? Do I raise a defect somewhere?

Thank you,

-=John

Consider the following:

Thread T1 is invoking T2.interrupt(), while thread T2 is invoking T2.wait() on
one of it's condvars.

Suppose the timing was as follows, where on an SMP system T1 and T2 are running
simultaneously (or this could happen with inoportune time slices).

              T2: wait(boost::mutex &m)
              T2: res=0
              T2: detail::interruption_checker check_for_interuption(&cond)
              T2: lock_guard<mutex> guard(thread_info->data_mutex);

T1: interrupt()
T1: detail::thread_data_ptr const local_thread_info=get_thread_info();
T1: if (local_thread_info)
T1: lock_guard<mutex> lk (local_thread_info->data_mutex)

// T1 blocks because T2 has the mutex

              T2: check_for_interruption()
              T2: if (thread_info->interrupt_requested)
              T2: check_for_interruption returns
              T2: thread_info->current_cond=cond
              T2: interruption_checker returns, this releases the data_mutex
                  that T1 is blocked on.

T1: local_thread_info->interrupt_requested=true;
T1: if(local_thread_info->current_cond) // this was set right above
T1: pthread_cond_broadcast(local_thread_info->current_cond)
T1: interrupt() returns

              T2: boost::pthread::pthread_mutex_scoped_lock
internal_lock(&internal_mutex);
              T2: m.unlock();
              T2: res=pthread_cond_wait(&cond,&internal_mutex);

This would result in the condvar blocking and ignoring the interruption
request. Because the condvar brodcast happened AFTER the interrupt check, but
before the other thread actually blocked on the condvar.

I confirmed that a pthread_cond_broadcast(cv) only unblocks other threads that
are CURRENTLY blocked on the cv from the man page as well as from a test
program.

The man page indicates:

        The pthread_cond_broadcast() and pthread_cond_signal() functions shall
        have no effect if there are no threads currently blocked on cond.

And the test I created to verify this was basically

     std::cout << "Calling broadcast for CV\n";
     pthread_cond_broadcast(&cv);

     std::cout << "Calling wait\n";
     pthread_mutex_lock(&mtx);
     pthread_cond_wait(&cv, &mtx);

     std::cout << "Done waiting\n";

I confirmed that the above locks up and never proceeds.

Finally, I changed my test to run over and over while being monitored by
valgrind/helgrind as well as updating the library to add some trace statements,
along with some more trace statement in my original code. I was able to confirm
my hypothesis. Relevant log information is below:

I've done some post processing of the logs as follows:
    - I removed the leading date "2012-Mar-16"
    - I prefixed each with the 'thread name', easier to follow than the TID
    - I stripped out a lot of the 'noise' entries that don't add any relevant
      data

Iteration #5 with /views/TEST/BOOST_INC_LIB_DEBUG/lib
Start the test
wait for it to complete
main : 09:18:37.130338 (32675) main():Concurrency Level: 8
main : 09:18:37.130496 (32675) thread_base_c():Thread SQ (0xbfb26414)
main : 09:18:37.130518 (32675) sq_thread_c():Thread SQ (0xbfb26414)
SQ : 09:18:37.130592 (32676) operator()():Thread SQ is born
....
SQ : 09:18:37.130662 (32676) thread_base_c():Thread Audit (0x808c110)
SQ : 09:18:37.130674 (32676) audit_thread_c():Thread Audit (0x808c110)
SQ : 09:18:37.130700 (32676) thread_base_c():Thread Notify (0x808cd80)
SQ : 09:18:37.130715 (32676) notify_thread_c():Thread Notify (0x808cd80)
Audit : 09:18:37.130739 (32677) operator()():Thread Audit is born
Notify: 09:18:37.130757 (32678) operator()():Thread Notify is born
SQ : 09:18:37.130835 (32676) thread_base_c():Thread LU (0x808cf68)
SQ : 09:18:37.130880 (32676) lookup_thread_c():Thread LU (0x808cf68)
LU : 09:18:37.130937 (32679) operator()():Thread LU is born
....
....
....
LU : 09:18:37.131117 (32679) thread_base_c():Thread Worker#1 (0x808ca28)
LU : 09:18:37.131140 (32679) worker_thread_c():Thread Worker#1 (0x808ca28)
LU : 09:18:37.131188 (32679) thread_base_c():Thread Worker#2 (0x808d8a0)
LU : 09:18:37.131202 (32679) worker_thread_c():Thread Worker#2 (0x808d8a0)
LU : 09:18:37.131227 (32679) thread_base_c():Thread Worker#3 (0x808da90)
LU : 09:18:37.131240 (32679) worker_thread_c():Thread Worker#3 (0x808da90)
LU : 09:18:37.131267 (32679) thread_base_c():Thread Worker#4 (0x808dc90)
LU : 09:18:37.131280 (32679) worker_thread_c():Thread Worker#4 (0x808dc90)
W1 : 09:18:37.131305 (32680) operator()():Thread Worker#1 is born
....
W4 : 09:18:37.131344 (32683) operator()():Thread Worker#4 is born
....
W3 : 09:18:37.131394 (32682) operator()():Thread Worker#3 is born
W2 : 09:18:37.131416 (32681) operator()():Thread Worker#2 is born
....
....
LU : 09:18:37.131690 (32679) thread_base_c():Thread Worker#5 (0x808dcc8)
LU : 09:18:37.131727 (32679) worker_thread_c():Thread Worker#5 (0x808dcc8)
LU : 09:18:37.131775 (32679) thread_base_c():Thread Worker#6 (0x808e0f0)
LU : 09:18:37.131791 (32679) worker_thread_c():Thread Worker#6 (0x808e0f0)
W5 : 09:18:37.131804 (32684) operator()():Thread Worker#5 is born
W6 : 09:18:37.131825 (32685) operator()():Thread Worker#6 is born
....
....
LU : 09:18:37.131900 (32679) thread_base_c():Thread Worker#7 (0x808e250)
LU : 09:18:37.131926 (32679) worker_thread_c():Thread Worker#7 (0x808e250)
LU : 09:18:37.131969 (32679) thread_base_c():Thread Worker#8 (0x808e4e0)
LU : 09:18:37.131990 (32679) worker_thread_c():Thread Worker#8 (0x808e4e0)
LU : 09:18:37.132024 (32679) thread_base_c():Thread Worker#9 (0x808e6e0)
LU : 09:18:37.132038 (32679) worker_thread_c():Thread Worker#9 (0x808e6e0)
W7 : 09:18:37.132062 (32686) operator()():Thread Worker#7 is born
W8 : 09:18:37.132076 (32687) operator()():Thread Worker#8 is born
....
LU : 09:18:37.132110 (32679) thread_base_c():Thread Worker#10 (0x808ebe0)
....
W9 : 09:18:37.132142 (32688) operator()():Thread Worker#9 is born
LU : 09:18:37.132167 (32679) worker_thread_c():Thread Worker#10 (0x808ebe0)
....
W10 : 09:18:37.132228 (32689) operator()():Thread Worker#10 is born
...
...
...
LU : 09:18:37.150406 (32679) shutdown():LU shutting down Worker#6
LU : 09:18:37.150428 (32679) interrupt before lock
LU : 09:18:37.150443 (32679) interrupt after lock
LU : 09:18:37.150462 (32679) interrupt before broadcast
LU : 09:18:37.150475 (32679) interrupt after broadcast
W8 : 09:18:37.150488 (32687) wait after condvar wait
W8 : 09:18:37.150510 (32687) wait before checker
W8 : 09:18:37.150529 (32687) wait after checker
W10 : 09:18:37.150565 (32689) wait after condvar wait
LU : 09:18:37.150594 (32679) shutdown():LU waiting on Worker#6 thread
W6 : 09:18:37.150622 (32685) wait after condvar wait
W7 : 09:18:37.150642 (32686) wait after condvar wait
W9 : 09:18:37.150665 (32688) wait after condvar wait
W8 : 09:18:37.150690 (32687) wait before condvar wait
W10 : 09:18:37.150746 (32689) wait before checker
W10 : 09:18:37.150771 (32689) wait after checker
W10 : 09:18:37.150793 (32689) wait before condvar wait
W6 : 09:18:37.150835 (32685) operator()():Thread Worker#6(0x808e0f0) RX
shutdown interrupt.
W7 : 09:18:37.150904 (32686) wait before checker <<<<<<
W7 : 09:18:37.150923 (32686) wait after checker <<<<<<
LU : 09:18:37.150941 (32679) shutdown():LU shutdown Worker#6 in time
00:00:00.000493

LU : 09:18:37.150970 (32679) shutdown():LU shutting down Worker#7
W9 : 09:18:37.150996 (32688) wait before checker
W9 : 09:18:37.151019 (32688) wait after checker
LU : 09:18:37.151036 (32679) interrupt before lock <<<<<<
LU : 09:18:37.151050 (32679) interrupt after lock <<<<<<
LU : 09:18:37.151061 (32679) interrupt before broadcast <<<<<<
LU : 09:18:37.151073 (32679) interrupt after broadcast <<<<<<
LU : 09:18:37.151085 (32679) shutdown():LU waiting on Worker#7 thread

W7 : 09:18:37.151097 (32686) wait before condvar wait <<<<<<
W9 : 09:18:37.151115 (32688) wait before condvar wait
W10 : 09:18:37.151131 (32689) wait after condvar wait
W10 : 09:18:37.151145 (32689) wait before checker
W10 : 09:18:37.151156 (32689) wait after checker
W8 : 09:18:37.151169 (32687) wait after condvar wait
W10 : 09:18:37.151186 (32689) wait before condvar wait
W8 : 09:18:37.151200 (32687) wait before checker
W8 : 09:18:37.151214 (32687) wait after checker
W8 : 09:18:37.151225 (32687) wait before condvar wait

 From the above, we can see:

At 09:18:37.150923 worker thread number 7 exited the checker code, this was
part of the condvar broadcast done when W6 was shutting down.

W7 doesn't run again until 09:18:37.151097 just before the condvar wait.

However, the LU thread tickles W&'s condvar just before W7 waits on it. This is
at 09:18:37.151061 and 09:18:37.151073

===============================================================================
Updated test program. I moved jrr_debug out of my program and into the boost
thread library so that I could use it there, as well as in my test program. It
was a hack to allow me to integrate logging with my test program.
===============================================================================

#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread.hpp>
#include <boost/foreach.hpp>

using boost::posix_time::time_duration;
using boost::posix_time::microsec_clock;
using boost::posix_time::to_simple_string;
using boost::posix_time::ptime;

//////////////////////////////////////////////////////////////////////
// Simulation of the logging mechanism we have. We don't use printf, it's
// actually a mutex controlled input into a buffe, which is then periodically
// drained by a separate thread into a file. For simplicity I just changed it
// into mutex controlled output.
//

#define EE_LOG_MSG(level, fmt, args...) { \
     /* prefix the format with the function name */ \
     char fbuf[1024]; \
     snprintf(fbuf, sizeof(fbuf), "%s():%s", __FUNCTION__, fmt); \
                                                                               \
     char buf[1024]; \
     snprintf(buf, sizeof(buf), fbuf, ##args); \
     jrr_debug_out(buf); \
}

#define EE_LOG_ECODE(level, err, fmt, args...) { \
     /* prefix the format with the function name */ \
     char fbuf[1024]; \
     snprintf(fbuf, sizeof(fbuf), "%s():%s", __FUNCTION__, fmt); \
                                                                               \
     char buf[1024]; \
     snprintf(buf, sizeof(buf), fbuf, ##args); \
     jrr_debug_out(buf, err); \
}

//////////////////////////////////////////////////////////////////////
// Need to simulate the code that waits for all threads to start up, just use
// simple barriers.
//
// 10 worker threads
// 1 lookup thread
// 1 queue thread
// 1 audit thread
// 1 notify thread
// 1 main thread
//---
// 15 threads
boost::barrier init_done(15);

//////////////////////////////////////////////////////////////////////
// class used to exchange data from one thread to another.
//
// History from this file indicates that it originally used raw pthread_xxxx
// calls, and thenlater switched to boost::mutex/condvar
//
template <class T, class Container = std::deque<T> >
class mt_queue_c {

public:

     explicit mt_queue_c(unsigned int max_size,
                         const Container &cont = Container()) :
         m_c(cont),
         m_max_size(max_size),
         m_drop_total(0),
         m_drop_oflow(0),
         m_queue_enabled(true)
         {}

     virtual ~mt_queue_c() {};

     int enqueue(const T &elem,
                 std::string *ps_error = NULL);

     int dequeue(T &dest,
                 std::string *ps_error = NULL);

     void off();

     void on();

protected:
     Container m_c;
     unsigned int m_max_size;
     unsigned int m_drop_total;
     unsigned int m_drop_oflow;

     boost::timed_mutex m_access_mtx;
     boost::condition_variable_any m_data_cv;

     bool m_queue_enabled;

     void _make_error_string(std::string *p_s_err,
                             const char *p_src,
                             const char *p_func,
                             int errval);
};

// make_error_string()
// Helper function for integrating with the _make_error_string method. It
// automatically pushes on the __FUNCTION__ value.
//
#define make_error_string(p_s_err, p_src, errval) \
         _make_error_string(p_s_err, p_src, __FUNCTION__, errval)

template <class T, class Container> inline void
mt_queue_c<T,Container>::_make_error_string (std::string *p_s_err,
                                              const char *p_src,
                                              const char *p_func,
                                              int errval)
{
     std::stringstream ss_err;

     if (!p_s_err) {
         return;
     }

     if (!p_src) {
         p_src = "bad p_src parm";
     }

     if (!p_func) {
         p_func = "bad p_func parm";
     }

     ss_err << " Error-Response: (" << errval << ") " << strerror(errval);

     *p_s_err = "[";
     *p_s_err += p_func;
     *p_s_err += "()]: ";
     *p_s_err += p_src;
     *p_s_err += ss_err.str();
}

template <class T, class Container> inline int
mt_queue_c<T,Container>::enqueue (const T &elem,
                                   std::string *ps_error /* = NULL */)
{
     const char *p_err = "";
     int ret_err;

     boost::lock_guard<boost::timed_mutex> lock(m_access_mtx);

     // if the max size is non-zero then enforce the size limit
     if (m_max_size && m_c.size() >= m_max_size) {
         p_err = "Queue limit reached. Cannot enqueue this element.";
         ret_err = ENOSPC; // closest match
         goto enqueue_done;
     }

     m_c.push_back(elem);

     m_data_cv.notify_one();

     ret_err = 0;

  enqueue_done:

     if (ret_err) {
         make_error_string(ps_error, p_err, ret_err);

         if (ENOSPC == ret_err) {
             m_drop_oflow++;
         }

         m_drop_total++;
     }

     return (ret_err);
}

template <class T, class Container> inline int
mt_queue_c<T,Container>::dequeue (T &dest,
                                   std::string *ps_error /* = NULL */)
{
     const char *p_err = "";
     int ret_err;

     boost::unique_lock<boost::timed_mutex> lock(m_access_mtx);

     while (!m_c.size() || !m_queue_enabled) {
         m_data_cv.wait(lock);
     }

     if (m_c.empty()) {
         p_err = "Woke up but queue is empty.";
         ret_err = ENOENT; // closest existing match. Error no entity.
         goto dequeue_done;
     }

     dest = m_c.front();
     m_c.pop_front();

     ret_err = 0;

  dequeue_done:

     if (ret_err) {
         make_error_string(ps_error, p_err, ret_err);
     }

     return (ret_err);
}

template <class T, class Container> inline void
mt_queue_c<T,Container>::off ()
{
     boost::lock_guard<boost::timed_mutex> lock(m_access_mtx);

     m_queue_enabled = false;
}

template <class T, class Container> inline void
mt_queue_c<T,Container>::on ()
{
     boost::lock_guard<boost::timed_mutex> lock(m_access_mtx);

     m_queue_enabled = true;

     if (m_c.size()) {
         m_data_cv.notify_all();
     }
}

//////////////////////////////////////////////////////////////////////
// We actually exchange much more complex data. I just simplify it to ints so
// that information is exchanged, and threads run.

typedef mt_queue_c<int> t_msg_queue;

//////////////////////////////////////////////////////////////////////

class thread_base_c {
public:

     thread_base_c(const std::string &name,
                   int prio,
                   int sched) :
         m_name(name),
         m_priority(prio),
         m_sched(sched),
         m_thread()
         {
             EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
         };

     virtual ~thread_base_c () {
         EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
     };

     // Functor. This is the API that the boost thread library will invoke as
     // the new threads main.
     void operator()() {
         EE_LOG_MSG(EE_TRACE, "Thread %s is born", m_name.c_str());

         std::string s_err;

         // set my scheduling and priority levels
         if (!setscheduling()) {
             return;
         }

         // call the threads init routine
         if (!thread_init()) {
             s_err = "Thread:";
             s_err += m_name + " thread_init() method failed.";
             EE_LOG_MSG(EE_ERROR, s_err.c_str());
             thread_shutdown();
             return;
         }

         // call the threads main loop, watching for boost::interrupt which
         // means its time for us to shutdown
         try {
             init_done.wait();

             if (!thread_main()) {
                 s_err = "Thread:";
                 s_err += m_name + " thread_main() method failed.";
                 EE_LOG_MSG(EE_ERROR, s_err.c_str());
                 thread_shutdown();
                 return;
             }
         }
         catch (boost::thread_interrupted &e) {
             // We've received a boost thread interrupt from our parent thread,
             // it's time to shut down, invoke our shutdown routine and then
             // return.

             EE_LOG_MSG(EE_TRACE, "Thread %s(%p) RX shutdown interrupt.",
                        m_name.c_str(), this);

             thread_shutdown();
             return;
         }
     }

     void create_thread() {
         if (m_thread.get_id() != boost::thread::id()) {
             EE_LOG_MSG(EE_ERROR,
                        "Thread already exists for this object: %s (%p)",
                        m_name.c_str(), this);
             return;
         }

         // create a thread using our object and store it
         m_thread = boost::thread(boost::ref(*this));
     }

     void shutdown(const std::string &s_caller) {
         // We're already shutting down. So we don't need another
         // thread_interrupt to tell us to shut down (again).
         boost::this_thread::disable_interruption raii_interrupt_disable;

         EE_LOG_MSG(EE_TRACE, "%s shutting down %s",
                    s_caller.c_str(), m_name.c_str());

         ptime start_time(microsec_clock::local_time());

         m_thread.interrupt();

         EE_LOG_MSG(EE_TRACE, "%s waiting on %s thread",
                    s_caller.c_str(), m_name.c_str());

         m_thread.join();

         time_duration how_long(microsec_clock::local_time() - start_time);

         EE_LOG_MSG(EE_TRACE, "%s shutdown %s in time %s",
                    s_caller.c_str(), m_name.c_str(),
                    to_simple_string(how_long).c_str());
     }

protected:

     virtual bool thread_init() = 0;

     virtual bool thread_main() = 0;

     virtual void thread_shutdown() = 0;

     bool setscheduling() {
         std::stringstream ss_result;
         struct sched_param old_sched;
         struct sched_param new_sched;
         pthread_t tid;
         int errval;
         int policy;

         tid = pthread_self();

         errval = pthread_getschedparam(tid, &policy, &old_sched);
         if (errval) {
             EE_LOG_ECODE(EE_ERROR, errval,
                          "Cannot get scheduling/priority parameters.");
             return (false);
         }

         // initialize new_sched to whats established, and then change to our
         // new values.
         new_sched = old_sched;
         new_sched.sched_priority = m_priority;

         errval = pthread_setschedparam(tid, m_sched, &new_sched);
         if (errval) {
             EE_LOG_ECODE(EE_ERROR, errval,
                          "Cannot set the scheduling/priority parameters.");
             return (false);
         }

         ss_result << "Scheduling and priorities changed for thread '" << m_name
<< "' from: (" << policy << ")" << get_sched_str(policy)
<< "/" << old_sched.sched_priority
<< " to: (" << m_sched << ")" << get_sched_str(m_sched)
<< "/" << m_priority;
         EE_LOG_MSG(EE_DEBUG(0), ss_result.str().c_str());

         return (true);
     }

# define CASE_RETVAL(x) case x: return (#x)
     char const* get_sched_str(int sched_type)
         {
             switch (sched_type) {
                 CASE_RETVAL(SCHED_OTHER);
                 CASE_RETVAL(SCHED_FIFO);
                 CASE_RETVAL(SCHED_RR);
                 CASE_RETVAL(SCHED_BATCH);
             }
             return ("Unknown");
         }

     inline void check_for_shutdown () {
         boost::this_thread::interruption_point();
     }

     std::string m_name; // Name of the object instance. Used in
                                   // debugging and error messages.
     int m_priority; // Scheduling priority of the object thread.
     int m_sched; // Scheduling type of the object thread
                                   // see pthread_setsched() for furhter info.

     // Our Thread
     boost::thread m_thread;

private:
     // don't allow these, because boost::thread cannot be copied
     thread_base_c& operator=(const thread_base_c&);
};

//////////////////////////////////////////////////////////////////////

class worker_thread_c : public thread_base_c {

public:

     worker_thread_c(const std::string name,
                     int prio,
                     int sched,
                     t_msg_queue &sq) :
         thread_base_c(name, prio, sched),
         m_work_q(sq)
         {
             EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
         };

     virtual ~worker_thread_c() {
         EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
     }

     virtual bool thread_init() {
         return (true);
     }

     virtual bool thread_main() {
         while (1) {
             int qval;

             check_for_shutdown();

             m_work_q.dequeue(qval);
// EE_LOG_MSG(EE_TRACE, "RX: %d", qval);
         }

         // we should never break out and return. Returning false will ensure
         // the infra logs an error for this.
         return (false);
     }

     virtual void thread_shutdown() {};

protected:
     t_msg_queue &m_work_q;
};

//////////////////////////////////////////////////////////////////////

class lookup_thread_c : public thread_base_c {

public:

     lookup_thread_c (int prio,
                      int sched,
                      t_msg_queue &sq) :
         thread_base_c("LU", prio, sched),
         m_serial_q(sq),
         m_work_q(40000),
         v_work_thread_pointers()
         {
             EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
         };

     virtual ~lookup_thread_c() {
         EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);

         v_work_thread_pointers.clear();
     }

     virtual bool thread_init() {
         const int num_workers(10);
         std::stringstream ss_worker_name;
         int idx;

         try {
             // spawn the worker threads
             for (idx = 1; idx <= num_workers; ++idx) {

                 // create the threads name
                 ss_worker_name.str("");
                 ss_worker_name << "Worker#" << idx;

                 P_SHR_WORKER_T
                     p_worker(new worker_thread_c(ss_worker_name.str(),
                                                  18, SCHED_RR, m_work_q));
                 p_worker->create_thread();
                 v_work_thread_pointers.push_back(p_worker);
             }

         }
         catch (std::exception &e) {
             EE_LOG_MSG(EE_ERROR,
                        "Failed to allocate/initialize data for look-Up "
                        "thread. Response is: '%s'", e.what());
             return (false);
         }

         return (true);
     }

     virtual bool thread_main() {
         while (1) {
             int sq_val;

             check_for_shutdown();

             m_serial_q.dequeue(sq_val);
// EE_LOG_MSG(EE_TRACE, "RX: %d", sq_val);

             m_work_q.enqueue(sq_val);
         }

         // we should never break out and return. Returning false will ensure
         // the infra logs an error for this.
         return (false);
     }

     virtual void thread_shutdown() {
         EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__);

         // shutdown the down stream queue
         m_work_q.off();

         // tell our children to shutdown
         BOOST_FOREACH(P_SHR_WORKER_T p_worker, v_work_thread_pointers) {
             p_worker->shutdown(m_name);
         }

         EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__);
     }

protected:

     t_msg_queue &m_serial_q;

     t_msg_queue m_work_q;

     typedef boost::shared_ptr<worker_thread_c> P_SHR_WORKER_T;

     std::vector<P_SHR_WORKER_T> v_work_thread_pointers;
};

//////////////////////////////////////////////////////////////////////

class audit_thread_c : public thread_base_c {

public:

     audit_thread_c(int prio,
                    int sched) :
         thread_base_c("Audit", prio, sched)
         {
             EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
         };

     virtual ~audit_thread_c() {
         EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
     }

     virtual bool thread_init() { return (true); };

     virtual bool thread_main() {
         while (1) {
             check_for_shutdown();

             sleep (1);
         }

         // we should never break out and return. Returning false will ensure
         // the infra logs an error for this.
         return (false);
     }

     virtual void thread_shutdown() {};
};

//////////////////////////////////////////////////////////////////////

class notify_thread_c : public thread_base_c {

public:
     notify_thread_c(int prio,
                         int sched) :
     thread_base_c("Notify", prio, sched)
         {
             EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
         };

     virtual ~notify_thread_c() {
         EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);

     }

     virtual bool thread_init() {return (true);};

     virtual bool thread_main() {
         while (1) {
             check_for_shutdown();

             sleep(1);
         }

         // we should never break out and return. Returning false will ensure
         // the infra logs an error for this.
         return (false);
     }

     virtual void thread_shutdown() {};
};

//////////////////////////////////////////////////////////////////////

class sq_thread_c : public thread_base_c {

public:

     sq_thread_c (int prio,
                  int sched,
                  t_msg_queue &serial_in) :
         thread_base_c("SQ", prio, sched),
         m_main_in(serial_in),
         m_serial_q(2000),
         p_audit(),
         p_notify(),
         p_lu()
         {
             EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
         };

     virtual ~sq_thread_c() {
         EE_LOG_MSG(EE_TRACE, "Thread %s (%p)", m_name.c_str(), this);
     }

     virtual bool thread_init() {
         try {
             p_audit.reset(new audit_thread_c(18, SCHED_RR));
             p_audit->create_thread();

             p_notify.reset(new notify_thread_c(18, SCHED_RR));
             p_notify->create_thread();

             p_lu.reset(new lookup_thread_c(18, SCHED_RR, m_serial_q));
             p_lu->create_thread();
         }
         catch (std::exception &ex) {
             EE_LOG_MSG(EE_ERROR,
                        "Failed to allocate/initialize data for SQ thread. "
                        "Response is: '%s'", ex.what());
             return (false);
         }

         return (true);
     }

     virtual bool thread_main() {
         while (1) {
             int sq_val;

             check_for_shutdown();

             m_main_in.dequeue(sq_val);

             m_serial_q.enqueue(sq_val);

// EE_LOG_MSG(EE_TRACE, "%s RX: %d", m_name.c_str(), sq_val);
         }

         // we should never break out and return. Returning false will ensure
         // the infra logs an error for this.
         return (false);
     }

     virtual void thread_shutdown() {
         EE_LOG_MSG(EE_TRACE, "%s starts %s",m_name.c_str(), __FUNCTION__);

         // shut down the downstream queue
         m_serial_q.off();

         // tell the subordinate threads to shut down
         if (p_lu) {
             p_lu->shutdown(m_name);
         } else {
             EE_LOG_MSG(EE_ERROR,"Unexpected null lookup thread pointer "
                        "during shutdown of %s", m_name.c_str());
         }

         if (p_notify) {
             p_notify->shutdown(m_name);
         } else {
             EE_LOG_MSG(EE_ERROR, "Unexpected null notify thread pointer "
                        "during shutdown of %s", m_name.c_str());
         }

         if (p_audit) {
             p_audit->shutdown(m_name);
         } else {
             EE_LOG_MSG(EE_ERROR, "Unexpected null audit thread "
                        "pointer during shutdown of %s", m_name.c_str());
         }

         EE_LOG_MSG(EE_TRACE, "%s end %s",m_name.c_str(), __FUNCTION__);
     }

protected:

     t_msg_queue &m_main_in;

     t_msg_queue m_serial_q;

     // Child thread management variables
     boost::shared_ptr<audit_thread_c> p_audit;

     boost::shared_ptr<notify_thread_c> p_notify;

     boost::shared_ptr<lookup_thread_c> p_lu;
};

int
main ()
{
     t_msg_queue main_out(2000);

     EE_LOG_MSG(EE_TRACE, "Concurrency Level: %d",
                boost::thread::hardware_concurrency());

     sq_thread_c serial_Q_Thread_Main(19, SCHED_RR, main_out);
     serial_Q_Thread_Main.create_thread();

     // the real thread startup has a more complex init synchronization. This is
     // a quick hack. Wait until all threads have reached the barrier, then have
     // main sleep a bit so that all child threads can actually progress past
     // the barrier.
     init_done.wait();
     usleep(10 * 1000);

     EE_LOG_MSG(EE_TRACE,
                "All threads are up, pumping 1000 entries into the queue.");

     for (int idx(0); idx < 1000; ++idx) {
         main_out.enqueue(idx);
     }

     serial_Q_Thread_Main.shutdown(__FUNCTION__);

     return (0);
}

===============================================================================
Diffs of the changes that where made to the 1.43 version of the boost thread
library
===============================================================================
Index: boost-1.43.0/boost/thread/pthread/condition_variable.hpp
===================================================================
*** boost/thread/pthread/.CC/cache/condition_variable.hpp@@/main/server/1 Fri
Mar 16 10:58:00 2012
--- boost/thread/pthread/condition_variable.hpp Fri Mar 16 08:32:13 2012
***************
*** 12,17 ****
--- 12,19 ----

   #include <boost/config/abi_prefix.hpp>

+ extern void jrr_debug_out(const char *);
+
   namespace boost
   {
       inline void condition_variable::wait(unique_lock<mutex>& m)
***************
*** 72,87 ****
--- 74,95 ----
               BOOST_VERIFY(!pthread_cond_destroy(&cond));
           }

+
           template<typename lock_type>
           void wait(lock_type& m)
           {
               int res=0;
               {
+ jrr_debug_out(" wait before checker");
                   detail::interruption_checker check_for_interruption(&cond);
+ jrr_debug_out(" wait after checker");
                   {
                       boost::pthread::pthread_mutex_scoped_lock
internal_lock(&internal_mutex);
                       m.unlock();
+ jrr_debug_out(" wait before condvar wait");
                       res=pthread_cond_wait(&cond,&internal_mutex);
+ jrr_debug_out(" wait after condvar wait");
+
                   }
                   m.lock();
               }

Index: boost-1.43.0/boost/thread/thread.hpp
===================================================================
*** boost/thread/.CC/cache/thread.hpp@@/main/server/1 Fri Mar 16 10:58:00 2012
--- boost/thread/thread.hpp Thu Mar 15 17:45:08 2012
***************
*** 23,27 ****
--- 23,33 ----
   #include <boost/thread/detail/thread_interruption.hpp>
   #include <boost/thread/detail/thread_group.hpp>

+ extern void jrr_debug_out(const char *);
+
+
+ extern void jrr_debug_out(const char *buffer,
+ int err);
+

   #endif

Index: boost-1.43.0/libs/thread/src/pthread/thread.cpp
===================================================================
*** libs/thread/src/pthread/.CC/cache/thread.cpp@@/main/server/1 Fri Mar 16
10:58:00 2012
--- libs/thread/src/pthread/thread.cpp Fri Mar 16 08:04:42 2012
***************
*** 25,30 ****
--- 25,32 ----

   #include "timeconv.inl"

+
+
   namespace boost
   {
       namespace detail
***************
*** 408,423 ****
--- 410,430 ----
           }
       }

+
       void thread::interrupt()
       {
           detail::thread_data_ptr const local_thread_info=get_thread_info();
           if(local_thread_info)
           {
+ jrr_debug_out("interrupt before lock");
               lock_guard<mutex> lk(local_thread_info->data_mutex);
+ jrr_debug_out("interrupt after lock");
               local_thread_info->interrupt_requested=true;
               if(local_thread_info->current_cond)
               {
+ jrr_debug_out("interrupt before broadcast");
                   
BOOST_VERIFY(!pthread_cond_broadcast(local_thread_info->current_cond));
+ jrr_debug_out("interrupt after broadcast");
               }
           }
       }
***************
*** 605,607 ****
--- 612,651 ----

   }
+
+
+
+
+ #include <boost/thread.hpp>
+ #include <boost/date_time/posix_time/posix_time.hpp>
+ #include <sys/syscall.h>
+ #include <iostream>
+
+ using boost::posix_time::time_duration;
+ using boost::posix_time::microsec_clock;
+ using boost::posix_time::to_simple_string;
+ using boost::posix_time::ptime;
+
+ boost::mutex output_ctl;
+
+
+
+ // get the thread id (as seen by 'top' or 'ps'
+ static pid_t gettid() {
+ return syscall(__NR_gettid);
+ }
+
+
+ void jrr_debug_out (const char *buffer) {
+ boost::lock_guard<boost::mutex> lock(output_ctl);
+ std::cout << to_simple_string(microsec_clock().local_time())
+ << " (" << gettid() << ") " << buffer << "\n";
+ }
+
+ void jrr_debug_out (const char *buffer,
+ int err) {
+ boost::lock_guard<boost::mutex> lock(output_ctl);
+ std::cout << to_simple_string(microsec_clock().local_time())
+ << " (" << gettid() << ") " << buffer << "Error=["
+ << strerror(err) << "]\n";
+ }


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net