Expanding on my previous post, I thought a way to create a generalised version of my "event" class, would be to have a single "event" class which has a boost::function to store a functor created using boost::bind on my object and member funciton. I'd then use templatised helper functions for the general forms of using boost::bind. See below:

// typedef generalised callback types
template<class T>
struct cb0
{
    typedef void (T::*type)(); // define a type for member functions of the form 'void T::fn()'
};

template<class T, typename A0>
struct cb1
{
    typedef void (T::*type)(boost::shared_ptr<A0>); // define a type for member functions of the form 'void T::fn(shared_ptr<A0>)'
};
//-----------------------------------------------------------------------

// the event class which keeps track of our job and the queue we want to post the job to
// all private so we can only use our friend helper functions
class event
{
    EventQueue &queue_;                     // the event queue which will process the job

    typedef boost::function<void()> Job;    // the generalised job we will post to the queue
    Job job_;

    event(EventQueue &queue, Job &job) : queue_(queue), job_(job) { }

    // event creation friends
    template<typename T>
    friend boost::shared_ptr<event> create_event(boost::shared_ptr<T> that, cb0<T>::type cb, EventQueue &queue);
    template<typename T, typename A0>
    friend boost::shared_ptr<event> create_event(boost::shared_ptr<T> that, cb1<T, A0>::type cb, EventQueue &queue);

    // event post friends
    friend void post_to_event(boost::shared_ptr<event> &ev);
    template<typename A0>
    friend void post_to_event(boost::shared_ptr<event> &ev, boost::shared_ptr<A0> &data);
};
//-----------------------------------------------------------------------

// helper functions which create new events
template<typename T>
boost::shared_ptr<event> create_event(boost::shared_ptr<T> that, cb0<T>::type cb, EventQueue &queue)
{
    // encapsulate an object and it's member function using bind
    return boost::shared_ptr<event>(new event(queue, boost::bind(cb, that)));
}

template<typename T, typename A0>
boost::shared_ptr<event> create_event(boost::shared_ptr<T> that, cb1<T, A0>::type cb, EventQueue &queue)
{
    // encapsulate an object and it's member function using bind, placeholder for argument to come
    return boost::shared_ptr<event>(new event(queue, boost::bind(cb, that, _1)));
}
//-----------------------------------------------------------------------

// helper functions which post events onto the event's queue
void post_to_event(boost::shared_ptr<event> &ev)
{
    ev->queue_.post(ev->job_);
}

template<typename A0>
void post_to_event(boost::shared_ptr<event> &ev, boost::shared_ptr<A0> &data)
{
    // bind the data to the job
    ev->queue_.post(boost::bind(ev->job_, data));
}
//-----------------------------------------------------------------------

However, this doesn't compile, g++ comes back with 'cb0::type' is not a type and 'cb1::type' is not a type... any ideas on how I could fix this?

TIA 
Steve

On 4 June 2010 09:33, Steve Lorimer <steve.lorimer@gmail.com> wrote:
Hi Nat

I think we're on the same page here - but I'm looking for a more generalised form. Who owns "obj" or T that_ in my example below is unimportant - I guess we should enforce using a shared_ptr to "obj" so that it is unimportant who owns it.

So this is what I came up with before your reply... and although I think it's not really the right way to do it, I think it will serve to illustrate what I'm trying to do. (this code works in conjunction with the EventQueue code I pasted in my original question (see below)

// all callbacks will be of the form void T::mem_fn(...)
// this for the callback of form T::mem_fn()
template <typename T>
class event_0
{
    EventQueue &queue_;         // the event queue which will process the job
    boost::shared_ptr<T> that_; // the object whose member function the event queue will call

    typedef void (T::*CB_t)();  // the member function of class T which we will callback to
    
    boost::function<void()> job_;

public:
    event_0(boost::shared_ptr<T> that, CB_t cb, EventQueue &queue) :
        queue_(queue), 
        that_(that), 
        job_(boost::bind(cb, that)) // bind the class instance and its callback function together for later invocation
    {}

    // the function the user calls which posts the job to the queue. The thread associated with the queue picks up the 
    // job off the queue and calls it (ergo - calls that_->CB_t(); )
    void post() { queue_.post(job_); }
};

// So in my application I have this class:

struct foo0
    : boost::enable_shared_from_this<foo0>
{
    event_0<foo0> ev;

    foo0(EventQueue &queue) : ev(boost::shared_ptr<foo0>(this), &foo0::bar, queue) { }

    void bar()
    {
        printf("%p foo::bar()\n", this);
    }

    ~foo0()
    {
        std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;
    }

};

// and I create an event which is associated with a queue. This is most likely a class member, where the class is, for eg, the producer, and the queue being passed to the event, the queue for the consumer.

boost::shared_ptr<foo0> job_3(new foo0(queue)); 

// and then some time later, when the producer has created some data...

job_3->ev.post(); // resulting in void foo0::bar() function being called on the consumer thread

// so - now we want to be able to post a shared_ptr to the consumer - ie: the producer creates some data (maybe receives it from a socket) and wants to pass it to the consumer. So we want to be able to go job_4->ev.post(data);

// this for the callback of form T::mem_fn(A0)
template <typename T, typename A0>
class event_1
{
    EventQueue &queue_;         // the event queue which will process the job
    boost::shared_ptr<T> that_; // the object whose mem_fun the event queue will call

    typedef void (T::*CB_t)(A0);
    typedef boost::function<void()> Job;
    Job job_;

public:
    event_1(boost::shared_ptr<T> that, CB_t cb, EventQueue &queue) :
        queue_(queue), that_(that), job_(boost::bind(cb, that, _1))
    {}

    void post(A0 data) { queue_.post(boost::bind(job_, data)); }
};

In other words, I hide the details of the bind from the user, and all he has to do is create some data and post it to the event, which sorts out the binding, queueing, etc - We could make it requisite that all classes which have a callback function used by the event must be derived from shared_ptr (or be shared_ptrs, or whatever), and all data posted by events ( event_1::post(A0) ) must be shared - to ensure thread safety. 

Ideally I'd like to be able to generalise the above somehow, and be able to cater for post(A0), post(A0, A1), post(A0, A1, A2) etc.

Do you sorta see what I'm trying to do? If I can attempt to explain it better... I'm trying to create some library code which will handle inter-thread messaging in an elegant way, and hide the implementation details from the user. The idea is the EventQueue class will actually be part of an EventThread class - you create an EventThread, and all it does is endlessly loop, picking up Event objects from its EventQueue, and calling their functors (which in fact wrap up a class and member callback function and has a shared_ptr to some data which is the argument to the member callback function) - So the creation of the thread, the creation of the queue etc is hidden - all the user does is create events and call event::post(data).

Do you think this is even a good design? Would you suggest a different design philosophy?

TIA 
Steve

PS: My EventQueue code is here:

#include <boost/enable_shared_from_this.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <stdio.h>
#include <list>
#include <iostream>

// cross-thread event queue. multiple threads can post jobs, one or many threads can execute
// jobs.
class EventQueue
{
public:
    typedef boost::function<void()> Job;

private:

    boost::mutex mtx_;
    boost::condition cnd_;
    typedef std::list<Job> Jobs;
    Jobs jobs_;
    int stop_;

public:

    // puts a jon into the queue
    void post(Job job)
    {
        boost::mutex::scoped_lock lock(mtx_);
        jobs_.push_back(job);
        cnd_.notify_one();
    }

    // pulls one job from the queue, returns false when stopped
    bool pull(Job* job)
    {
        boost::mutex::scoped_lock lock(mtx_);
        for(;;)
        { // handle spurious wake-ups
            while(!stop_ && jobs_.empty())
                cnd_.wait(lock);
            if(!jobs_.empty() && 2 != stop_)
            {
                job->swap(jobs_.front()); // move efficiently, avoiding *job = jobs.front()
                jobs_.pop_front();
                return true;
            }
            else if(stop_)
            {
                return false;
            }
        }
    }

    // make pull() return false
    void stop(bool cancel_jobs)
    {
        boost::mutex::scoped_lock lock(mtx_);
        stop_ = 1 + cancel_jobs; // 1 - complete jobs, 2 - cancel jobs
        cnd_.notify_all();
    }

    EventQueue() : stop_() {}
    ~EventQueue() { this->stop(true); }
};
//-----------------------------------------------------------------------

// all callbacks will be of the form void T::mem_fn(...)
// this for the callback of form T::mem_fn()
template <typename T>
class event_0
{
    EventQueue &queue_;         // the event queue which will process the job
    boost::shared_ptr<T> that_; // the object whose mem_fun the event queue will call

    typedef void (T::*CB_t)();
    typedef boost::function<void()> Job;
    Job job_;

public:
    event_0(boost::shared_ptr<T> that, CB_t cb, EventQueue &queue) :
        queue_(queue), that_(that), job_(boost::bind(cb, that))
    {}

    void post() { queue_.post(job_); }
};
//-----------------------------------------------------------------------

struct foo0
    : boost::enable_shared_from_this<foo0>
{
    event_0<foo0> ev;

    foo0(EventQueue &queue) : ev(boost::shared_ptr<foo0>(this), &foo0::bar, queue) { }

    void bar()
    {
        printf("%p foo::bar()\n", this);
    }

    ~foo0()
    {
        std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;
    }

};
//-----------------------------------------------------------------------

void anotherThread(EventQueue* queue)
{
    std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;
    EventQueue::Job job;
    // wait and execute jobs till stopped
    while(queue->pull(&job))
        job(); // execute the job
}
//-----------------------------------------------------------------------

int main()
{
    std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;

    EventQueue queue;
    // start another thread and pass an argument
    boost::thread another_thread(boost::bind(anotherThread, &queue));

    boost::shared_ptr<foo0> job_3(new foo0(queue));
    job_3->ev.post();

    // stop the queue and let it complete all jobs
    queue.stop(false);
    another_thread.join();

    return 0;
}
//-----------------------------------------------------------------------


On 3 June 2010 22:11, Nat Goodspeed <nat@lindenlab.com> wrote:

it's not really clear to me whether 'obj' is owned by the consumer thread or is in the hands of the producer object. Let's consider both cases.

==== Either way:

class Data { ... };
typedef boost::shared_ptr<Data> DataPtr;

==== If 'obj' is the ConsumerClass instance:

class ConsumerClass
{
public:
   ...
   void run();
   void doSomething(DataPtr data);
   void somethingElse(DataPtr data);
   ...
};

typedef (ConsumerClass::*ConsumerMethod)(DataPtr);

typedef boost::function<void(ConsumerClass*)> QueueItem;
typedef YourFavoriteThreadSafeQueue<QueueItem> Queue;

void ConsumerClass::run()
{
   for (;;)
   {
       QueueItem item(queue.get());
       item(this);
   }
}

void ProducerClass::post(ConsumerMethod method, DataPtr data)
{
   queue.put(boost::bind(method, _1, data));
}