Boost logo

Boost :

From: Matthew Vogt (mvogt_at_[hidden])
Date: 2004-02-19 18:18:21

This is a reworking of the wrapper to make any object active (that is, to
execute all its methods in its own thread of execution).

Changes since the previous post:
1. Extensible - the proxy implementation method can now be extended to any
number of parameters.

2. Future values - where the client can, if desired, postpone the wait to
acquire the result of invoking a method on an active method by explicitly
capturing a handle to the result for later inspection.

3. Policy for void proxies - the implementer of the class which wraps a non-
active class, can specify whether a client must wait for a void method
to complete exceution or not. This would be useful if a wrapped method had
side effects that a client would expect to see after invoking the method.

Comments welcome.


(Sorry this is so long, I can only post in text mode via gmane)

// test_active_object.cpp --------------------------------------------------

#include "active_object.hpp"

using std::cin;
using std::endl;

using ActiveWrapper::active; // Active-object wrapper
using ActiveWrapper::future; // Future value encapsulator
using ActiveWrapper::output; // std::cout locking object
using ActiveWrapper::sleep; // 3-second pause to make effects observable

// An example class to test active behaviour
struct Object
  int non_void_with_param(int i)
    { output() << "\tInside Object::non_void_with_param" << endl; }
    return (i*i);
  const char* non_void_without_param(void)
    { output() << "\tInside Object::non_void_without_param" << endl; }
    return "Hello";
  void void_with_param(double)
    { output() << "\tInside Object::void_with_param" << endl; }
  void void_without_param(void)
    { output() << "\tInside Object::void_without_param" << endl; }

// An active version of the Object class
struct ActiveObject : private active<Object>
  ActiveObject() :
    non_void_with_param(&object, &Object::non_void_with_param, this),
    non_void_without_param(&object, &Object::non_void_without_param, this),
    void_with_param(&object, &Object::void_with_param, this),
    void_without_param(&object, &Object::void_without_param, this)

  // Proxy declarations for all methods of the active object
  proxy<int (int)> non_void_with_param;
  proxy<const char* (void)> non_void_without_param;
  proxy<void (double), true> void_with_param;
  proxy<void (void)> void_without_param;

  Object object;

int main(int, char**)
    ActiveObject object;
    { output() << "Invoking methods from main thread" << endl; }
    // Implicit wait until task is completed by acquiring result
    int i = object.non_void_with_param(5);
    { output() << "Result of non_void_with_param: " << i << endl; }
    // Explicitly bypass wait for result
    future<const char*> c = object.non_void_without_param();
    { output() << "Returned from non_void_without_param" << endl; }
    // Block until task completion by accessing result
    { output() << "Result of non_void_without_param: " << c << endl; }
    // Block until task completion, due to proxy policy parameter
    { output() << "Returned from void_with_param" << endl; }
    // Explicitly bypass completion-wait policy
    future<void> v = object.void_with_param(1e2);
    { output() << "Not waiting for void_with_param" << endl; }
    // Non-blocking call due to default policy
    { output() << "Returned from void_without_param" << endl; }
    // Block until active object thread closes on destruction
    { output() << "Waiting for completion of all tasks" << endl; }
  { output() << "Press <Enter> to continue..." << endl; }
  return 0;

// active_object.hpp -------------------------------------------------------

#include <iomanip>
#include <iostream>
#include <queue>

#include <boost/any.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/optional.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/xtime.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/utility.hpp>

namespace ActiveWrapper {

  // Lock the output stream to prevent interleaving
  struct output
    output() :
      lock(mutex) {}
    static unsigned count;
    static boost::mutex mutex;
    boost::mutex::scoped_lock lock;

  unsigned output::count;
  boost::mutex output::mutex;
  template <typename T>
  std::ostream& operator<< (const output& out, T t)
    return std::cout << std::setw(2) << std::setfill(' ')
                     << out.count << ": " << t;
  // Pause to demonstrate effect
  int sleep(void)
    boost::xtime time;
    boost::xtime_get(&time, boost::TIME_UTC);
    time.sec += 3;

  namespace detail {
    // A generic thread-safe queue
    template <typename T>
    struct safe_queue
      safe_queue(void) :
        terminated(false) { }
      bool enqueue (T t)
        if (terminated)
          return false;
        boost::mutex::scoped_lock lock(mutex);
        return true;
      boost::optional<T> dequeue(void)
        boost::mutex::scoped_lock lock(mutex);
        if (terminated)
          return boost::optional<T>();
        while (queue.empty())
          if (terminated)
            return boost::optional<T>();
        T t = queue.front();
        return t;
      void terminate(void)
        boost::mutex::scoped_lock lock(mutex);
        terminated = true;
      bool terminated;
      std::queue<T> queue;
      boost::mutex mutex;
      boost::condition not_empty;
    // Object used to wait for the completion of a task
    struct synch_type
      synch_type(boost::optional<boost::any*> a,
                 boost::mutex* m,
                 boost::condition* c) :
        completed(false) {}
        if (result)
          delete *result;
        delete mutex;
        delete condition;
      void wait_for_completion(void)
        boost::mutex::scoped_lock lock(*mutex);
        if (completed == false)
          if (result)
            while ((*result)->empty())
          completed = true;
      void notify(void)
        boost::mutex::scoped_lock lock(*mutex);
        completed = true;
      boost::optional<boost::any*> get_result(void)
        return result;
      boost::optional<boost::any*> result;
      boost::mutex* mutex;
      boost::condition* condition;
      bool completed;
    // A reference-counted pointer to a synch type
    typedef boost::shared_ptr<synch_type> synch_ptr;
    // A type to enforce waiting policy on synchronisation
    struct waiter_type
      waiter_type(synch_ptr s, bool force) :
        synch(s), force_wait(force), waited_for_completion(false) {}
        if (force_wait)
      void wait_for_completion(void)
        if (waited_for_completion == false)
          waited_for_completion = true;
      boost::optional<boost::any*> result(void)
        waited_for_completion = true;
        return synch->get_result();
      synch_ptr synch;
      bool force_wait;
      bool waited_for_completion;
  // An encapsulation of a result that might require waiting for
  template <typename T>
  struct future
    future(boost::shared_ptr<detail::waiter_type> w) :
      waiter(w) {}
    operator T& (void)
      if (!value)
        boost::optional<boost::any*> result = waiter->result();
        value = boost::any_cast<T>(*(*result));
      return *value;
    boost::optional<T> value;
    boost::shared_ptr<detail::waiter_type> waiter;
  template <>
  struct future<void>
    future(boost::shared_ptr<detail::waiter_type> w) :
      waiter(w) {};
    boost::shared_ptr<detail::waiter_type> waiter;
  // Forward reference required by proxy_impl
  template <typename base>
  class active;

  namespace detail {
    // The type of tasks that are executed by the active object wrapper
    typedef boost::function<boost::any (void)> task_type;
    // A task and associated notification machinery
    typedef boost::tuple<task_type, boost::shared_ptr<synch_type> >
    // A thead safe queue of tasks to be performed
    typedef safe_queue<task_descriptor> task_queue;

    // Dispatch to task queue, returning a facility to wait for the result
    template <typename result_type>
    future<result_type> invoke(task_queue* queue,
                               bool force_wait,
                               task_type task)
      synch_ptr synch(new synch_type(new boost::any(),
                                     new boost::mutex(),
                                     new boost::condition()));
      task_descriptor td(task, synch);
      boost::shared_ptr<waiter_type> waiter
        (new waiter_type(synch, force_wait));
      return future<result_type>(waiter);
    template <>
    future<void> invoke<void>(task_queue* queue,
                              bool force_wait,
                              task_type task)
      synch_ptr synch(new synch_type(boost::optional<boost::any*>(),
                                     new boost::mutex(),
                                     new boost::condition()));
      task_descriptor td(task, synch);
      boost::shared_ptr<waiter_type> waiter
        (new waiter_type(synch, force_wait));
      return future<void>(waiter);
    template <typename base, typename signature, bool force_wait>
    class proxy_impl;
    // Handler for non-void functions with one parameter
    template <typename base, typename R, typename T1, bool force_wait>
    class proxy_impl<base, R (T1), force_wait>
      typedef R result_type;
      typedef T1 arg1_type;
      typedef result_type (base::*method_type)(arg1_type);
      typedef boost::function<result_type (arg1_type)> wrapper_type;

      typedef boost::function<future<result_type> (task_type)>
      static boost::any wrap(wrapper_type task, arg1_type arg1)
        return task(arg1);
      invocation_type invocation;
      wrapper_type task;
      proxy_impl(base* object, method_type method, active<base>* active)
        task = boost::bind(method, object, _1);
        invocation = boost::bind(&invoke<result_type>,
      future<result_type> operator()(arg1_type arg1)
        return invocation(boost::bind(&wrap, task, arg1));
    // Handler for void functions with one parameter
    template <typename base, typename T1, bool force_wait>
    class proxy_impl<base, void (T1), force_wait>
      typedef void result_type;
      typedef T1 arg1_type;
      typedef result_type (base::*method_type)(arg1_type);
      typedef boost::function<result_type (arg1_type)> wrapper_type;
      typedef boost::function<future<result_type> (task_type)>
      static boost::any wrap(wrapper_type task, arg1_type arg1)
        return boost::any();
      invocation_type invocation;
      wrapper_type task;
      proxy_impl(base* object, method_type method, active<base>* active)
        task = boost::bind(method, object, _1);
        invocation = boost::bind(&invoke<result_type>,
      future<result_type> operator()(arg1_type arg1)
        return invocation(boost::bind(&wrap, task, arg1));
    // Handler for non-void functions with no parameters
    template <typename base, typename R, bool force_wait>
    class proxy_impl<base, R (void), force_wait>
      typedef R result_type;
      typedef result_type (base::*method_type)(void);
      typedef boost::function<result_type (void)> wrapper_type;
      typedef boost::function<future<result_type> (task_type)>
      static boost::any wrap(wrapper_type task)
        return task();
      invocation_type invocation;
      wrapper_type task;
      proxy_impl(base* object, method_type method, active<base>* active)
        task = boost::bind(method, object);
        invocation = boost::bind(&invoke<result_type>,
      future<result_type> operator()(void)
        return invocation(boost::bind(&wrap, task));
    // Handler for void functions without parameters
    template <typename base, bool force_wait>
    class proxy_impl<base, void (void), force_wait>
      typedef void result_type;
      typedef result_type (base::*method_type)(void);
      typedef boost::function<result_type (void)> wrapper_type;
      typedef boost::function<future<result_type> (task_type)>
      static boost::any wrap(wrapper_type task)
        return boost::any();
      invocation_type invocation;
      wrapper_type task;
      proxy_impl(base* object, method_type method, active<base>* active)
        task = boost::bind(method, object);
        invocation = boost::bind(&invoke<result_type>,
      future<result_type> operator()(void)
        return invocation(boost::bind(&wrap, task));
  // Wrapper class to supply active behaviour to other objects
  template <typename base>
  class active
    void thread_function(void)
      while (true)
        boost::optional<detail::task_descriptor> td = tasks.dequeue();
        if (!td)
        detail::task_type task = td->template get<0>();
        detail::synch_ptr synch = td->template get<1>();

        boost::optional<boost::any*> result = synch->get_result();
        if (result)
          *(*result) = task();
    detail::task_queue tasks;
    boost::thread thread;
    active(void) :
      thread(boost::bind(&active<base>::thread_function, this))

    template <typename signature, bool force_wait = false>
    struct proxy : public detail::proxy_impl<base, signature, force_wait>
      typedef detail::proxy_impl<base, signature, force_wait> impl_type;
      typedef typename impl_type::method_type method_type;
      proxy(base* object, method_type method, active<base>* active) :
        impl_type(object, method, active) {}
    detail::task_queue* queue(void) { return &tasks; }

Boost list run by bdawes at, gregod at, cpdaniel at, john at