Boost logo

Boost Users :

Subject: Re: [Boost-users] Adding a thread to a Singleton class
From: Stephen Torri (storri_at_[hidden])
Date: 2010-01-11 18:12:49


On Mon, 2010-01-11 at 12:32 -0500, Gottlob Frege wrote:
> First question, and this is always the first question:
> Have you actually *measured* where the performance problems are?

No. To set the stage I build this library with old form of logging done
on the same thread that did the processing. With debugging on it
naturally took longer than the release. The debug build was done without
optimizations therefore processing instructions took longer than their
optimized counterpart. The debug build also suffered an IO penalty from
having to write its data to disk.

> You create the thread in the constructor of Trace, which is called via
> the first call to Singleton<Trace>::Instance(), which I assume is for
> the first call to log something.

You are correct in assuming that the function Instance() is used to grab
a copy of the singleton for logging data to the file.

> Does this potentially happen before main() is called?
> Typically creating threads before main() isn't recommended. You might
> need to somehow delay that.

Since this is a library I do not believe it is called before main but I
cannot give you a reason why believe that. So since its a conjecture we
cannot go on that. Is there a way to do a trace of the code as it runs
to see if the constructor is called before main?

> >
> > template <typename T>
> > class Singleton : boost::noncopyable
> > {
> > public:
> >
> >
> > static void init ()
> > {
> > m_obj.reset ( new T () );
>
> If you ever want to log from the allocator, you will probably want to
> somehow avoid using 'new' here.
> ie reserve some memory for T using something like:
>
> template <typename T>
> struct storage_for
> {
> aligned_storage<sizeof(MyCriticalSection),
> alignment_of<MyCriticalSection>::value> data;
> };
>
> storage_for<T> storage;
>
> then in place new:
>
> m_obj.reset( new(&storage) T() );

Where is "MyCriticalSection" declared? Is that my internal class called
Trace_State which actually does the logging?

> Does this 'work' or do you need to know T?

I am not sure what you are asking here?

> > template <typename T> boost::scoped_ptr<T> myproject::trace::Singleton<T>::m_obj ( 0 );
> > template <typename T> boost::once_flag myproject::trace::Singleton<T>::m_flag = BOOST_ONCE_INIT;
> >
> >
> > void Trace::add_Trace ( boost::shared_ptr<interface::ITraceable> trace )
> > {
>
> is boost::shared_ptr<interface::ITraceable> == interface::ITraceable::ptr_t ?
>
> > // Add ITraceable object to the queue
>
> add *pointer to* ITraceable object!

Yes. I forgot to make the function definition to have
interface::ITraceable::ptr_t instead of the
boost::shared_ptr<interface::ITraceable>. In the ITraceable
declaration is a typedef for:

        typedef boost::shared_ptr<ITraceable> ptr_t;

> > void Trace::threadMain()
> > {
> > boost::unique_lock<boost::mutex> lock ( m_lock );
> >
> > while ( m_hasWork )
> > {
> > while ( m_queue.size() == 0 )
> > {
> > m_condition.wait ( lock );
> > }
> >
>
> *** note that the mutex is now locked here ***
> (condition.wait() unlocks while waiting, then relocks before returning)

Ok. I am used to multi-threaded programming in C# but not sure if I had
the semantics right here. What I was hoping to happen was:

        WHILE ( there is work to be done )
          {
                // I have work to do
                WHILE ( input queue is empty )
                 {
                        // wait for work
                 }

                 // process the work to be done
           }

Is that what I have done?

> > this->processTraces();
> > }
> > }
> >
> > void Trace::processTraces()
> > {
> > QueueType local_queue;
> >
> > // Lock queue and copy
>
> *** The queue is ALREADY locked (see above) ***

You are right. It is locked because we are still in the scope of
threadMain. Now this will cause a problem. I wanted m_queue to be open
for writing after making a copy while the internal thread handled
writing the contents of its copied queue to disk. Is there a better way
to write threadMain and processTraces?

> > {
> > boost::lock_guard<boost::mutex> lock ( m_lock );
> > std::copy ( m_queue.begin(), m_queue.end(), local_queue.begin() );
>
> *** don't you need to also empty m_queue? ***

Alas another lapse in thinking. Yes I do need to empty m_queue.

> And last but not least - I am concerned that ITraceable is *shared*.
> Typically when logging something, you capture a 'snapshot' of values
> in time, and log that out. With a shared snapshot, the caller may
> actually change values *after* 'logging' them. Maybe in an attempt to
> reuse the ITraceable. ie
>
> client code:
>
> do_something();
> tracer = new Traceable(....);
> log(tracer);
> do_something_else();
> tracer->foo = 12;
> log(tracer);
>
> So maybe your interface should *take* the pointer (via auto_ptr or
> unique_ptr or etc) instead of sharing.
> Alternatively, you could copy the ITraceable when placing it into the
> queue (maybe via a 'clone' method - I'm assuming ITraceable is
> polymorphic with various derivations possible? Otherwise just make it
> Traceable, no 'I'nterface, and pass by value...)

So change addTrace to something like after adding a clone method:

void Trace::addTrace ( interface::ITraceable::ptr_t trace )
{
        {
                boost::lock_guard<boost::mutex> lock ( m_lock );
                m_queue.push_back ( trace->clone() );
        }
}

> Oh, and the destructor:
>
> Trace::~Trace()
> {
> // stop thread
> m_hasWork = false;
> }
>
> m_hasWork = false is not protected by a lock, so other CPUs won't
> necessarily see it be set in the order you might be expecting.
> Oh wait, actually, you are not *waiting* for the other thread to end.
> So the destructor leaves, and the memory for Trace is deleted/reused,
> etc, and then at some point later the thread wakes up and tries to
> read it! BOOOM?
>
> And actually, the thread probably will NOT wake up - it will probably
> still be waiting on the condition.wait() call. So you need to signal
> that.
>
> Trace::~Trace()
> {
> // stop thread
> {
> boost::lock_guard<boost::mutex> lock ( m_lock );
>
> // this needs to be inside the lock (or at least set with
> an atomic_release memory barrier)
> // otherwise the thread might wake up from the condition
> notification,
> // and yet not see m_hasWork == false.
> m_hasWork = false;
> }
>
> m_condition.notify_one();
>
> // use join (whatever the syntax) to wait for thread to finish
> join(...);
> }

I appreciate the comments. I will keep appending this email with the
updated code so to further help our discussion. Is there a preferred way
to attach code? I was thinking that just adding it as an attachment
would not allow you to easily make comments.

Stephen

------------------------- Singleton_T.h -------------------------------
#ifndef SINGLETON_H
#define SINGELTON_H

#include <boost/utility.hpp>
#include <boost/thread/once.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/type_traits/aligned_storage.hpp>

namespace myproject { namespace trace {

// template <typename T>
// struct storage_for
// {
// boost::aligned_storage<sizeof(T), boost::alignment_of<T>::value> data;
// };

    template <typename T>
    class Singleton : boost::noncopyable
    {
    public:

      static T& Instance()
      {
        boost::call_once ( init, m_flag );
        return *m_obj;
      }

      static void init ()
      {
        //m_obj.reset ( new (&storage) T() );
        m_obj.reset ( new T () );
      }

    protected:

      ~Singleton(){}
      Singleton(){}

    private:

      static boost::scoped_ptr<T> m_obj;
      static boost::once_flag m_flag;
      //storage_for<T> storage;
    };

  } // namespace trace
} // namespace trace

template <typename T> boost::scoped_ptr<T> myproject::trace::Singleton<T>::m_obj ( 0 );
template <typename T> boost::once_flag myproject::trace::Singleton<T>::m_flag = BOOST_ONCE_INIT;

#endif // define SINGELTON_H

---------------------- Trace.h -----------------------
#ifndef TRACE_H
#define TRACE_H

#include <boost/cstdint.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
#include <boost/utility.hpp>

#include <vector>

#include "NumberTraceable_T.h"
#include "Singleton_T.h"
#include "StringTraceable.h"
#include "Trace_State.h"

namespace myproject { namespace trace {

    //
    // Turn Trace into a singleton class that contains a worker thread
    // - Wait for data
    // - If data exists then process
    // - Else wait
    // Add method for adding a ITraceable object. This will lock the logging vector, add
    // element then signal the boost condition variable
    class Trace : public Singleton<Trace> {

      friend class Singleton<Trace>;

    public:

      void add_Trace ( interface::ITraceable::ptr_t trace );

    private:

      Trace();

      // Worker thread functions
      void threadMain();
      void processTraces();

      // Synchronization variables
      boost::mutex m_lock;
      boost::condition_variable m_condition;

      // Data variables
      typedef std::vector < myproject::interface::ITraceable::ptr_t > QueueType;
      QueueType m_queue;

      // Thread variables
      boost::shared_ptr<boost::thread> m_thread;
      volatile bool m_hasWork;

      // Logging
      Trace_State m_state;
    };

  } /* namespace trace */
} /* namespace myproject */

#endif /* TRACE_H */

--------------------- Trace.cpp -------------------------------
#include "Trace.h"
#include "myproject/errors/Internal_Exception.h"

#include <boost/bind.hpp>
#include <boost/format.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>

#include <sstream>

#ifndef WIN32
#include <unistd.h>
#else
#include <windows.h>
#endif /* WIN32 */

namespace myproject { namespace trace {

    Trace::Trace ()
      : m_hasWork ( true ),
        m_state ( "Trace", "Trace" )
    {
      // start thread
      m_thread = boost::shared_ptr<boost::thread> ( new boost::thread ( boost::bind ( &Trace::threadMain, this ) ) );
    }

    Trace::~Trace()
    {
      // stop thread
      {
        boost::lock_guard<boost::mutex> lock ( m_lock );
        m_hasWork = false;
      }

      m_condition.notify_one();

      m_thread.join();
    }

    void Trace::add_Trace ( interface::ITraceable::ptr_t trace )
    {
          // lock queue
          {
            boost::lock_guard<boost::mutex> lock ( m_lock );
            m_queue.push_back ( trace->clone() );
          }
            
          m_condition.notify_one();
    }

    void Trace::threadMain()
    {
      boost::unique_lock<boost::mutex> lock ( m_lock );

      while ( m_hasWork )
        {
          while ( m_queue.size() == 0 )
            {
              m_condition.wait ( lock );
            }

          this->processTraces();
        }
    }

    void Trace::processTraces()
    {
      QueueType local_queue;

      std::copy ( m_queue.begin(), m_queue.end(), local_queue.begin() );
      m_queue.clear();

      for ( QueueType::iterator pos = local_queue.begin();
            pos != local_queue.end();
            pos++ )
        {
          m_state.write_Message ( (*pos)->get_Level(), (*pos) );
        }
    }

  } /* namespace trace */
} /* namespace myproject */

------------------ ITraceable.h ---------------------------
#ifndef ITRACEABLE_H
#define ITRACEABLE_H

#include <boost/cstdint.hpp>
#include <boost/shared_ptr.hpp>

#include <string>

namespace myproject { namespace interface {

    class ITraceable {

    public:

      typedef boost::shared_ptr<ITraceable> ptr_t;

      virtual ITraceable::ptr_t clone () = 0;

      virtual std::string get_Formatted_String ( void ) = 0;
    };

  } // namespace trace
} // namespace myproject

#endif // ITRACEABLE_H


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net