|
Boost : |
From: Matthew Vogt (mvogt_at_[hidden])
Date: 2004-02-19 18:18:21
This is a reworking of the wrapper to make any object active (that is, to
execute all its methods in its own thread of execution).
Changes since the previous post:
1. Extensible - the proxy implementation method can now be extended to any
number of parameters.
2. Future values - where the client can, if desired, postpone the wait to
acquire the result of invoking a method on an active method by explicitly
capturing a handle to the result for later inspection.
3. Policy for void proxies - the implementer of the class which wraps a non-
active class, can specify whether a client must wait for a void method
to complete exceution or not. This would be useful if a wrapped method had
side effects that a client would expect to see after invoking the method.
Comments welcome.
Matt
(Sorry this is so long, I can only post in text mode via gmane)
// test_active_object.cpp --------------------------------------------------
#include "active_object.hpp"
using std::cin;
using std::endl;
using ActiveWrapper::active; // Active-object wrapper
using ActiveWrapper::future; // Future value encapsulator
using ActiveWrapper::output; // std::cout locking object
using ActiveWrapper::sleep; // 3-second pause to make effects observable
// An example class to test active behaviour
struct Object
{
int non_void_with_param(int i)
{
{ output() << "\tInside Object::non_void_with_param" << endl; }
sleep();
return (i*i);
}
const char* non_void_without_param(void)
{
{ output() << "\tInside Object::non_void_without_param" << endl; }
sleep();
return "Hello";
}
void void_with_param(double)
{
{ output() << "\tInside Object::void_with_param" << endl; }
sleep();
}
void void_without_param(void)
{
{ output() << "\tInside Object::void_without_param" << endl; }
sleep();
}
};
// An active version of the Object class
struct ActiveObject : private active<Object>
{
ActiveObject() :
object(),
non_void_with_param(&object, &Object::non_void_with_param, this),
non_void_without_param(&object, &Object::non_void_without_param, this),
void_with_param(&object, &Object::void_with_param, this),
void_without_param(&object, &Object::void_without_param, this)
{
}
// Proxy declarations for all methods of the active object
proxy<int (int)> non_void_with_param;
proxy<const char* (void)> non_void_without_param;
proxy<void (double), true> void_with_param;
proxy<void (void)> void_without_param;
private:
Object object;
};
int main(int, char**)
{
{
ActiveObject object;
{ output() << "Invoking methods from main thread" << endl; }
// Implicit wait until task is completed by acquiring result
int i = object.non_void_with_param(5);
{ output() << "Result of non_void_with_param: " << i << endl; }
// Explicitly bypass wait for result
future<const char*> c = object.non_void_without_param();
{ output() << "Returned from non_void_without_param" << endl; }
// Block until task completion by accessing result
{ output() << "Result of non_void_without_param: " << c << endl; }
// Block until task completion, due to proxy policy parameter
object.void_with_param(3.1415927);
{ output() << "Returned from void_with_param" << endl; }
// Explicitly bypass completion-wait policy
future<void> v = object.void_with_param(1e2);
{ output() << "Not waiting for void_with_param" << endl; }
// Non-blocking call due to default policy
object.void_without_param();
{ output() << "Returned from void_without_param" << endl; }
// Block until active object thread closes on destruction
{ output() << "Waiting for completion of all tasks" << endl; }
}
{ output() << "Press <Enter> to continue..." << endl; }
cin.get();
return 0;
}
// active_object.hpp -------------------------------------------------------
#include <iomanip>
#include <iostream>
#include <queue>
#include <boost/any.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/optional.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/xtime.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/utility.hpp>
namespace ActiveWrapper {
// Lock the output stream to prevent interleaving
struct output
{
output() :
lock(mutex) {}
static unsigned count;
private:
static boost::mutex mutex;
boost::mutex::scoped_lock lock;
};
unsigned output::count;
boost::mutex output::mutex;
template <typename T>
std::ostream& operator<< (const output& out, T t)
{
++out.count;
return std::cout << std::setw(2) << std::setfill(' ')
<< out.count << ": " << t;
}
// Pause to demonstrate effect
int sleep(void)
{
boost::xtime time;
boost::xtime_get(&time, boost::TIME_UTC);
time.sec += 3;
boost::thread::sleep(time);
}
namespace detail {
// A generic thread-safe queue
template <typename T>
struct safe_queue
{
safe_queue(void) :
terminated(false) { }
bool enqueue (T t)
{
if (terminated)
return false;
boost::mutex::scoped_lock lock(mutex);
queue.push(t);
not_empty.notify_one();
return true;
}
boost::optional<T> dequeue(void)
{
boost::mutex::scoped_lock lock(mutex);
if (terminated)
return boost::optional<T>();
while (queue.empty())
{
not_empty.wait(lock);
if (terminated)
return boost::optional<T>();
}
T t = queue.front();
queue.pop();
return t;
}
void terminate(void)
{
boost::mutex::scoped_lock lock(mutex);
terminated = true;
not_empty.notify_all();
}
private:
bool terminated;
std::queue<T> queue;
boost::mutex mutex;
boost::condition not_empty;
};
// Object used to wait for the completion of a task
struct synch_type
{
synch_type(boost::optional<boost::any*> a,
boost::mutex* m,
boost::condition* c) :
result(a),
mutex(m),
condition(c),
completed(false) {}
~synch_type()
{
if (result)
delete *result;
delete mutex;
delete condition;
}
void wait_for_completion(void)
{
boost::mutex::scoped_lock lock(*mutex);
if (completed == false)
{
if (result)
{
while ((*result)->empty())
(*condition).wait(lock);
}
else
{
(*condition).wait(lock);
}
completed = true;
}
}
void notify(void)
{
boost::mutex::scoped_lock lock(*mutex);
(*condition).notify_all();
completed = true;
}
boost::optional<boost::any*> get_result(void)
{
return result;
}
private:
boost::optional<boost::any*> result;
boost::mutex* mutex;
boost::condition* condition;
bool completed;
};
// A reference-counted pointer to a synch type
typedef boost::shared_ptr<synch_type> synch_ptr;
// A type to enforce waiting policy on synchronisation
struct waiter_type
{
waiter_type(synch_ptr s, bool force) :
synch(s), force_wait(force), waited_for_completion(false) {}
~waiter_type()
{
if (force_wait)
{
wait_for_completion();
}
}
void wait_for_completion(void)
{
if (waited_for_completion == false)
{
synch->wait_for_completion();
waited_for_completion = true;
}
}
boost::optional<boost::any*> result(void)
{
waited_for_completion = true;
return synch->get_result();
}
private:
synch_ptr synch;
bool force_wait;
bool waited_for_completion;
};
}
// An encapsulation of a result that might require waiting for
template <typename T>
struct future
{
future(boost::shared_ptr<detail::waiter_type> w) :
waiter(w) {}
operator T& (void)
{
if (!value)
{
waiter->wait_for_completion();
boost::optional<boost::any*> result = waiter->result();
value = boost::any_cast<T>(*(*result));
}
return *value;
}
protected:
boost::optional<T> value;
boost::shared_ptr<detail::waiter_type> waiter;
};
template <>
struct future<void>
{
future(boost::shared_ptr<detail::waiter_type> w) :
waiter(w) {};
protected:
boost::shared_ptr<detail::waiter_type> waiter;
};
// Forward reference required by proxy_impl
template <typename base>
class active;
namespace detail {
// The type of tasks that are executed by the active object wrapper
typedef boost::function<boost::any (void)> task_type;
// A task and associated notification machinery
typedef boost::tuple<task_type, boost::shared_ptr<synch_type> >
task_descriptor;
// A thead safe queue of tasks to be performed
typedef safe_queue<task_descriptor> task_queue;
// Dispatch to task queue, returning a facility to wait for the result
template <typename result_type>
future<result_type> invoke(task_queue* queue,
bool force_wait,
task_type task)
{
synch_ptr synch(new synch_type(new boost::any(),
new boost::mutex(),
new boost::condition()));
task_descriptor td(task, synch);
queue->enqueue(td);
boost::shared_ptr<waiter_type> waiter
(new waiter_type(synch, force_wait));
return future<result_type>(waiter);
}
template <>
future<void> invoke<void>(task_queue* queue,
bool force_wait,
task_type task)
{
synch_ptr synch(new synch_type(boost::optional<boost::any*>(),
new boost::mutex(),
new boost::condition()));
task_descriptor td(task, synch);
queue->enqueue(td);
boost::shared_ptr<waiter_type> waiter
(new waiter_type(synch, force_wait));
return future<void>(waiter);
}
template <typename base, typename signature, bool force_wait>
class proxy_impl;
// Handler for non-void functions with one parameter
template <typename base, typename R, typename T1, bool force_wait>
class proxy_impl<base, R (T1), force_wait>
{
typedef R result_type;
typedef T1 arg1_type;
typedef result_type (base::*method_type)(arg1_type);
typedef boost::function<result_type (arg1_type)> wrapper_type;
typedef boost::function<future<result_type> (task_type)>
invocation_type;
static boost::any wrap(wrapper_type task, arg1_type arg1)
{
return task(arg1);
}
invocation_type invocation;
wrapper_type task;
public:
proxy_impl(base* object, method_type method, active<base>* active)
{
task = boost::bind(method, object, _1);
invocation = boost::bind(&invoke<result_type>,
active->queue(),
force_wait,
_1);
}
future<result_type> operator()(arg1_type arg1)
{
return invocation(boost::bind(&wrap, task, arg1));
}
};
// Handler for void functions with one parameter
template <typename base, typename T1, bool force_wait>
class proxy_impl<base, void (T1), force_wait>
{
typedef void result_type;
typedef T1 arg1_type;
typedef result_type (base::*method_type)(arg1_type);
typedef boost::function<result_type (arg1_type)> wrapper_type;
typedef boost::function<future<result_type> (task_type)>
invocation_type;
static boost::any wrap(wrapper_type task, arg1_type arg1)
{
task(arg1);
return boost::any();
}
invocation_type invocation;
wrapper_type task;
public:
proxy_impl(base* object, method_type method, active<base>* active)
{
task = boost::bind(method, object, _1);
invocation = boost::bind(&invoke<result_type>,
active->queue(),
force_wait,
_1);
}
future<result_type> operator()(arg1_type arg1)
{
return invocation(boost::bind(&wrap, task, arg1));
}
};
// Handler for non-void functions with no parameters
template <typename base, typename R, bool force_wait>
class proxy_impl<base, R (void), force_wait>
{
typedef R result_type;
typedef result_type (base::*method_type)(void);
typedef boost::function<result_type (void)> wrapper_type;
typedef boost::function<future<result_type> (task_type)>
invocation_type;
static boost::any wrap(wrapper_type task)
{
return task();
}
invocation_type invocation;
wrapper_type task;
public:
proxy_impl(base* object, method_type method, active<base>* active)
{
task = boost::bind(method, object);
invocation = boost::bind(&invoke<result_type>,
active->queue(),
force_wait,
_1);
}
future<result_type> operator()(void)
{
return invocation(boost::bind(&wrap, task));
}
};
// Handler for void functions without parameters
template <typename base, bool force_wait>
class proxy_impl<base, void (void), force_wait>
{
typedef void result_type;
typedef result_type (base::*method_type)(void);
typedef boost::function<result_type (void)> wrapper_type;
typedef boost::function<future<result_type> (task_type)>
invocation_type;
static boost::any wrap(wrapper_type task)
{
task();
return boost::any();
}
invocation_type invocation;
wrapper_type task;
public:
proxy_impl(base* object, method_type method, active<base>* active)
{
task = boost::bind(method, object);
invocation = boost::bind(&invoke<result_type>,
active->queue(),
force_wait,
_1);
}
future<result_type> operator()(void)
{
return invocation(boost::bind(&wrap, task));
}
};
}
// Wrapper class to supply active behaviour to other objects
template <typename base>
class active
{
void thread_function(void)
{
while (true)
{
boost::optional<detail::task_descriptor> td = tasks.dequeue();
if (!td)
break;
detail::task_type task = td->template get<0>();
detail::synch_ptr synch = td->template get<1>();
boost::optional<boost::any*> result = synch->get_result();
if (result)
{
*(*result) = task();
}
else
{
task();
}
synch->notify();
}
}
detail::task_queue tasks;
boost::thread thread;
protected:
active(void) :
thread(boost::bind(&active<base>::thread_function, this))
{
}
~active()
{
tasks.terminate();
thread.join();
}
template <typename signature, bool force_wait = false>
struct proxy : public detail::proxy_impl<base, signature, force_wait>
{
typedef detail::proxy_impl<base, signature, force_wait> impl_type;
typedef typename impl_type::method_type method_type;
proxy(base* object, method_type method, active<base>* active) :
impl_type(object, method, active) {}
};
public:
detail::task_queue* queue(void) { return &tasks; }
};
};
Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk