Boost logo

Boost :

From: Matthew Vogt (mvogt_at_[hidden])
Date: 2004-02-16 17:51:18


Jeff Garland <jeff <at> crystalclearsoftware.com> writes:

> I believe you'all are talking about 'Active Objects'. Doug Schmidt, primary
> author of ACE, has written extensively on this subject. Take a look at this
> paper:
>
> http://citeseer.nj.nec.com/lavender96active.html
>
> ACE has direct support for the paradigm. It's been a couple years since I
> looked at it, but as I recall using a bunch of template magic object methods
> are morphed into stubs that queue messages for the object thus disconnecting
> the requesting thread from the execution of the method.

This is an interesting idea, using the boundaries of objects as the interaction
points between threads. I have no experience with this sort of threading,
but your comments led me to sketch out a wrapper which could be used to give
'active object' behaviour to existing classes, which might provide a simple
pattern for concurrency in some apps...

Anyway:

// Code ----------------------------------------------------
#include <iostream>
#include <queue>
#include <string>

#include <boost/any.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/utility.hpp>

// Pinched from the boost::threads example...
template <typename T, size_t size = 5>
struct bounded_buffer
{
  bounded_buffer(void) :
    begin(0), end(0), buffered(0), circular_buffer(size) { }

  void enqueue (T t)
  {
    boost::mutex::scoped_lock lock(mutex);
    
    while (buffered == circular_buffer.size())
      buffer_not_full.wait(lock);
        
    circular_buffer[end] = t;
    end = (end+1) % circular_buffer.size();
    ++buffered;
    
    buffer_not_empty.notify_one();
  }
  
  T dequeue(void)
  {
    boost::mutex::scoped_lock lock(mutex);
    
    while (buffered == 0)
      buffer_not_empty.wait(lock);
        
    T t = circular_buffer[begin];
    begin = (begin+1) % circular_buffer.size();
    --buffered;
    
    buffer_not_full.notify_one();
    return t;
  }

private:
  int begin, end, buffered;
  std::vector<T> circular_buffer;
  boost::condition buffer_not_full, buffer_not_empty;
  boost::mutex mutex;
};

// Wrapper class to supply active behaviour to other objects
template <typename Base>
class Active
{
  typedef boost::function<boost::any (void)> task_type;
  typedef boost::tuple<task_type,
                       boost::any*,
                       boost::condition*,
                       boost::mutex*> task_descriptor;
  
  void thread_function(void)
  {
    while (true)
    {
      task_descriptor td = tasks.dequeue();
      
      task_type task = td.template get<0>();
      boost::any* result = td.template get<1>();
      boost::condition* condition = td.template get<2>();
      boost::mutex* mutex = td.template get<3>();
      
      if (result == NULL)
        break;
      
      *result = task();
      
      boost::mutex::scoped_lock lock(*mutex);
      (*condition).notify_all();
    }
  }
  
  static boost::any end_function(void)
  {
    std::string ("Hideous hack to end thread for Active<Object>");
  }
  
  bounded_buffer<task_descriptor> tasks;
  task_descriptor end_task;
  boost::thread thread;

protected:

  Active(void) :
    thread(boost::bind(&Active<Base>::thread_function, this)),
    end_task(end_function, NULL, NULL, NULL)
  {
  }
  
  ~Active()
  {
    tasks.enqueue(end_task);
    thread.join();
  }
  
  template <typename result_type>
  result_type invoke(task_type task,
                     boost::condition& condition,
                     boost::mutex& mutex)
  {
    boost::any result;
    
    task_descriptor td(task, &result, &condition, &mutex);
    
    boost::mutex::scoped_lock lock(mutex);

    tasks.enqueue(td);
    while (result.empty())
      condition.wait(lock);
      
    return boost::any_cast<result_type>(result);
  }

  // Only handles non-void functions with one parameter, currently
  template <typename Signature>
  class Proxy
  {
    typedef typename boost::function_traits<Signature>::result_type result_type;
    typedef typename boost::function_traits<Signature>::arg1_type arg1_type;
        
    typedef result_type (Base::*method_type)(arg1_type);
    typedef boost::function<result_type (arg1_type)> wrapper_type;
    
    typedef boost::function<result_type (task_type,
                                         boost::condition&,
                                         boost::mutex&)> invocation_type;

    static boost::any wrap(wrapper_type task, arg1_type arg1)
    {
      return task(arg1);
    }

    invocation_type invocation;
    wrapper_type task;
    
  public:
  
    Proxy(Base* object, method_type method, Active<Base>* active)
    {
      task = boost::bind(method, object, _1);
      invocation = boost::bind(&Active<Base>::invoke<result_type>,
                               active, _1, _2, _3);
    }
    
    result_type operator()(arg1_type arg1)
    {
      boost::mutex mutex;
      boost::condition condition;
      
      return invocation(boost::bind(&wrap, task, arg1), condition, mutex);
    }
  };
};

// User Code -----------------------------------------------------------
using std::cout;
using std::cin;
using std::endl;

// An example class to test active behaviour
struct Object
{
  int method(int i) { cout << "In Object::method" << endl; return (i+1); }
  float another(float f) { cout << "In Object::another" << endl; return (f*f); }
};

// An active version of the Object class
struct ActiveObject : private Active<Object>
{
  ActiveObject() :
    object(),
    method(&object, &Object::method, this),
    another(&object, &Object::another, this)
  {
  }
  
  Object object;

public:
  // Proxy declarations for all methods of the active object
  Proxy<int (int)> method;
  Proxy<float (float)> another;
};

int main(int, char**)
{
  ActiveObject object;
 
  // Is there any way to yield a printable thread id with boost::thread,
  // to demonstrate that the control passes to another thread?
  cout << "Invoking methods from main thread" << endl;

  int i = object.method(5);
  cout << "Result of method: " << i << endl;

  float f = object.another(3.1415927f);
  cout << "Result of another: " << f << endl;

  cout << "\nPress <Enter> to continue..." << endl;
  cin.get();
  return 0;
}

// Done ------------------------------------------------

It may be of some utility, especially if it can be reworked to allow for
interface inheritance from the wrapped object. If nothing else, it shows
how lots of boost libraries can be combined in a short space of lines...

I'm sure it can stand a lot of refinement, but I can't be bothered working on
it any more!

Matt


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk