Boost logo

Boost :

From: Matthew Vogt (mvogt_at_[hidden])
Date: 2004-02-19 18:18:21


This is a reworking of the wrapper to make any object active (that is, to
execute all its methods in its own thread of execution).

Changes since the previous post:
1. Extensible - the proxy implementation method can now be extended to any
number of parameters.

2. Future values - where the client can, if desired, postpone the wait to
acquire the result of invoking a method on an active method by explicitly
capturing a handle to the result for later inspection.

3. Policy for void proxies - the implementer of the class which wraps a non-
active class, can specify whether a client must wait for a void method
to complete exceution or not. This would be useful if a wrapped method had
side effects that a client would expect to see after invoking the method.

Comments welcome.

Matt

(Sorry this is so long, I can only post in text mode via gmane)

// test_active_object.cpp --------------------------------------------------

#include "active_object.hpp"

using std::cin;
using std::endl;

using ActiveWrapper::active; // Active-object wrapper
using ActiveWrapper::future; // Future value encapsulator
using ActiveWrapper::output; // std::cout locking object
using ActiveWrapper::sleep; // 3-second pause to make effects observable

// An example class to test active behaviour
struct Object
{
  int non_void_with_param(int i)
  {
    { output() << "\tInside Object::non_void_with_param" << endl; }
    sleep();
    return (i*i);
  }
  
  const char* non_void_without_param(void)
  {
    { output() << "\tInside Object::non_void_without_param" << endl; }
    sleep();
    return "Hello";
  }
  
  void void_with_param(double)
  {
    { output() << "\tInside Object::void_with_param" << endl; }
    sleep();
  }
  
  void void_without_param(void)
  {
    { output() << "\tInside Object::void_without_param" << endl; }
    sleep();
  }
};

// An active version of the Object class
struct ActiveObject : private active<Object>
{
  ActiveObject() :
    object(),
    non_void_with_param(&object, &Object::non_void_with_param, this),
    non_void_without_param(&object, &Object::non_void_without_param, this),
    void_with_param(&object, &Object::void_with_param, this),
    void_without_param(&object, &Object::void_without_param, this)
  {
  }

  // Proxy declarations for all methods of the active object
  proxy<int (int)> non_void_with_param;
  proxy<const char* (void)> non_void_without_param;
  proxy<void (double), true> void_with_param;
  proxy<void (void)> void_without_param;

private:
  Object object;
};

int main(int, char**)
{
  {
    ActiveObject object;
  
    { output() << "Invoking methods from main thread" << endl; }
    
    // Implicit wait until task is completed by acquiring result
    int i = object.non_void_with_param(5);
    { output() << "Result of non_void_with_param: " << i << endl; }
    
    // Explicitly bypass wait for result
    future<const char*> c = object.non_void_without_param();
    { output() << "Returned from non_void_without_param" << endl; }
    
    // Block until task completion by accessing result
    { output() << "Result of non_void_without_param: " << c << endl; }
    
    // Block until task completion, due to proxy policy parameter
    object.void_with_param(3.1415927);
    { output() << "Returned from void_with_param" << endl; }
    
    // Explicitly bypass completion-wait policy
    future<void> v = object.void_with_param(1e2);
    { output() << "Not waiting for void_with_param" << endl; }
    
    // Non-blocking call due to default policy
    object.void_without_param();
    { output() << "Returned from void_without_param" << endl; }
    
    // Block until active object thread closes on destruction
    { output() << "Waiting for completion of all tasks" << endl; }
  }
  
  { output() << "Press <Enter> to continue..." << endl; }
  cin.get();
  return 0;
}

// active_object.hpp -------------------------------------------------------

#include <iomanip>
#include <iostream>
#include <queue>

#include <boost/any.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/optional.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/xtime.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/utility.hpp>

namespace ActiveWrapper {

  // Lock the output stream to prevent interleaving
  struct output
  {
    output() :
      lock(mutex) {}
      
    static unsigned count;
    
  private:
    static boost::mutex mutex;
    boost::mutex::scoped_lock lock;
  };

  unsigned output::count;
  boost::mutex output::mutex;
  
  template <typename T>
  std::ostream& operator<< (const output& out, T t)
  {
    ++out.count;
    return std::cout << std::setw(2) << std::setfill(' ')
                     << out.count << ": " << t;
  }
  
  // Pause to demonstrate effect
  int sleep(void)
  {
    boost::xtime time;
    boost::xtime_get(&time, boost::TIME_UTC);
    time.sec += 3;
    boost::thread::sleep(time);
  }

  namespace detail {
    
    // A generic thread-safe queue
    template <typename T>
    struct safe_queue
    {
      safe_queue(void) :
        terminated(false) { }
    
      bool enqueue (T t)
      {
        if (terminated)
          return false;
            
        boost::mutex::scoped_lock lock(mutex);
        
        queue.push(t);
        
        not_empty.notify_one();
        return true;
      }
      
      boost::optional<T> dequeue(void)
      {
        boost::mutex::scoped_lock lock(mutex);
        
        if (terminated)
          return boost::optional<T>();
          
        while (queue.empty())
        {
          not_empty.wait(lock);
          if (terminated)
            return boost::optional<T>();
        }
            
        T t = queue.front();
        queue.pop();
        return t;
      }
    
      void terminate(void)
      {
        boost::mutex::scoped_lock lock(mutex);
        
        terminated = true;
        not_empty.notify_all();
      }
      
    private:
      bool terminated;
      std::queue<T> queue;
      boost::mutex mutex;
      boost::condition not_empty;
    };
  
    // Object used to wait for the completion of a task
    struct synch_type
    {
      synch_type(boost::optional<boost::any*> a,
                 boost::mutex* m,
                 boost::condition* c) :
        result(a),
        mutex(m),
        condition(c),
        completed(false) {}
        
      ~synch_type()
      {
        if (result)
          delete *result;
          
        delete mutex;
        delete condition;
      }
      
      void wait_for_completion(void)
      {
        boost::mutex::scoped_lock lock(*mutex);
  
        if (completed == false)
        {
          if (result)
          {
            while ((*result)->empty())
              (*condition).wait(lock);
          }
          else
          {
            (*condition).wait(lock);
          }
        
          completed = true;
        }
      }
        
      void notify(void)
      {
        boost::mutex::scoped_lock lock(*mutex);
        (*condition).notify_all();
        
        completed = true;
      }
      
      boost::optional<boost::any*> get_result(void)
      {
        return result;
      }
      
    private:
      boost::optional<boost::any*> result;
      boost::mutex* mutex;
      boost::condition* condition;
      bool completed;
    };
    
    // A reference-counted pointer to a synch type
    typedef boost::shared_ptr<synch_type> synch_ptr;
    
    // A type to enforce waiting policy on synchronisation
    struct waiter_type
    {
      waiter_type(synch_ptr s, bool force) :
        synch(s), force_wait(force), waited_for_completion(false) {}
      
      ~waiter_type()
      {
        if (force_wait)
        {
          wait_for_completion();
        }
      }
      
      void wait_for_completion(void)
      {
        if (waited_for_completion == false)
        {
          synch->wait_for_completion();
          waited_for_completion = true;
        }
      }
  
      boost::optional<boost::any*> result(void)
      {
        waited_for_completion = true;
        return synch->get_result();
      }
      
    private:
      synch_ptr synch;
      bool force_wait;
      bool waited_for_completion;
    };
    
  }
  
  // An encapsulation of a result that might require waiting for
  template <typename T>
  struct future
  {
    future(boost::shared_ptr<detail::waiter_type> w) :
      waiter(w) {}
  
    operator T& (void)
    {
      if (!value)
      {
        waiter->wait_for_completion();
        
        boost::optional<boost::any*> result = waiter->result();
        value = boost::any_cast<T>(*(*result));
      }
      return *value;
    }
      
  protected:
    boost::optional<T> value;
    boost::shared_ptr<detail::waiter_type> waiter;
  };
  
  template <>
  struct future<void>
  {
    future(boost::shared_ptr<detail::waiter_type> w) :
      waiter(w) {};
      
  protected:
    boost::shared_ptr<detail::waiter_type> waiter;
  };
  
  // Forward reference required by proxy_impl
  template <typename base>
  class active;

  namespace detail {
  
    // The type of tasks that are executed by the active object wrapper
    typedef boost::function<boost::any (void)> task_type;
    
    // A task and associated notification machinery
    typedef boost::tuple<task_type, boost::shared_ptr<synch_type> >
      task_descriptor;
                         
    // A thead safe queue of tasks to be performed
    typedef safe_queue<task_descriptor> task_queue;

    // Dispatch to task queue, returning a facility to wait for the result
    template <typename result_type>
    future<result_type> invoke(task_queue* queue,
                               bool force_wait,
                               task_type task)
    {
      synch_ptr synch(new synch_type(new boost::any(),
                                     new boost::mutex(),
                                     new boost::condition()));
  
      task_descriptor td(task, synch);
                                           
      queue->enqueue(td);
  
      boost::shared_ptr<waiter_type> waiter
        (new waiter_type(synch, force_wait));
        
      return future<result_type>(waiter);
    }
  
    template <>
    future<void> invoke<void>(task_queue* queue,
                              bool force_wait,
                              task_type task)
    {
      synch_ptr synch(new synch_type(boost::optional<boost::any*>(),
                                     new boost::mutex(),
                                     new boost::condition()));
  
      task_descriptor td(task, synch);
                                           
      queue->enqueue(td);
  
      boost::shared_ptr<waiter_type> waiter
        (new waiter_type(synch, force_wait));
        
      return future<void>(waiter);
    }
  
    template <typename base, typename signature, bool force_wait>
    class proxy_impl;
    
    // Handler for non-void functions with one parameter
    template <typename base, typename R, typename T1, bool force_wait>
    class proxy_impl<base, R (T1), force_wait>
    {
      typedef R result_type;
      typedef T1 arg1_type;
          
      typedef result_type (base::*method_type)(arg1_type);
      typedef boost::function<result_type (arg1_type)> wrapper_type;

      typedef boost::function<future<result_type> (task_type)>
        invocation_type;
  
      static boost::any wrap(wrapper_type task, arg1_type arg1)
      {
        return task(arg1);
      }
  
      invocation_type invocation;
      wrapper_type task;
      
    public:
    
      proxy_impl(base* object, method_type method, active<base>* active)
      {
        task = boost::bind(method, object, _1);
        invocation = boost::bind(&invoke<result_type>,
                                 active->queue(),
                                 force_wait,
                                 _1);
      }
      
      future<result_type> operator()(arg1_type arg1)
      {
        return invocation(boost::bind(&wrap, task, arg1));
      }
    };
  
    // Handler for void functions with one parameter
    template <typename base, typename T1, bool force_wait>
    class proxy_impl<base, void (T1), force_wait>
    {
      typedef void result_type;
      typedef T1 arg1_type;
          
      typedef result_type (base::*method_type)(arg1_type);
      typedef boost::function<result_type (arg1_type)> wrapper_type;
  
      typedef boost::function<future<result_type> (task_type)>
        invocation_type;
  
      static boost::any wrap(wrapper_type task, arg1_type arg1)
      {
        task(arg1);
        return boost::any();
      }
  
      invocation_type invocation;
      wrapper_type task;
      
    public:
    
      proxy_impl(base* object, method_type method, active<base>* active)
      {
        task = boost::bind(method, object, _1);
        invocation = boost::bind(&invoke<result_type>,
                                 active->queue(),
                                 force_wait,
                                 _1);
      }
      
      future<result_type> operator()(arg1_type arg1)
      {
        return invocation(boost::bind(&wrap, task, arg1));
      }
    };
  
    // Handler for non-void functions with no parameters
    template <typename base, typename R, bool force_wait>
    class proxy_impl<base, R (void), force_wait>
    {
      typedef R result_type;
          
      typedef result_type (base::*method_type)(void);
      typedef boost::function<result_type (void)> wrapper_type;
      
      typedef boost::function<future<result_type> (task_type)>
        invocation_type;
  
      static boost::any wrap(wrapper_type task)
      {
        return task();
      }
  
      invocation_type invocation;
      wrapper_type task;
      
    public:
    
      proxy_impl(base* object, method_type method, active<base>* active)
      {
        task = boost::bind(method, object);
        invocation = boost::bind(&invoke<result_type>,
                                 active->queue(),
                                 force_wait,
                                 _1);
      }
      
      future<result_type> operator()(void)
      {
        return invocation(boost::bind(&wrap, task));
      }
    };
    
    // Handler for void functions without parameters
    template <typename base, bool force_wait>
    class proxy_impl<base, void (void), force_wait>
    {
      typedef void result_type;
          
      typedef result_type (base::*method_type)(void);
      typedef boost::function<result_type (void)> wrapper_type;
      
      typedef boost::function<future<result_type> (task_type)>
        invocation_type;
  
      static boost::any wrap(wrapper_type task)
      {
        task();
        return boost::any();
      }
  
      invocation_type invocation;
      wrapper_type task;
      
    public:
    
      proxy_impl(base* object, method_type method, active<base>* active)
      {
        task = boost::bind(method, object);
        invocation = boost::bind(&invoke<result_type>,
                                 active->queue(),
                                 force_wait,
                                 _1);
      }
      
      future<result_type> operator()(void)
      {
        return invocation(boost::bind(&wrap, task));
      }
    };
  
  }
  
  // Wrapper class to supply active behaviour to other objects
  template <typename base>
  class active
  {
    void thread_function(void)
    {
      while (true)
      {
        boost::optional<detail::task_descriptor> td = tasks.dequeue();
        if (!td)
          break;
        
        detail::task_type task = td->template get<0>();
        detail::synch_ptr synch = td->template get<1>();

        boost::optional<boost::any*> result = synch->get_result();
        
        if (result)
        {
          *(*result) = task();
        }
        else
        {
          task();
        }
        
        synch->notify();
      }
    }
    
    detail::task_queue tasks;
    boost::thread thread;
  
  protected:
  
    active(void) :
      thread(boost::bind(&active<base>::thread_function, this))
    {
    }
    
    ~active()
    {
      tasks.terminate();
      thread.join();
    }

    template <typename signature, bool force_wait = false>
    struct proxy : public detail::proxy_impl<base, signature, force_wait>
    {
      typedef detail::proxy_impl<base, signature, force_wait> impl_type;
      typedef typename impl_type::method_type method_type;
      
      proxy(base* object, method_type method, active<base>* active) :
        impl_type(object, method, active) {}
    };
    
  public:
    detail::task_queue* queue(void) { return &tasks; }
  };
  
};


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk