Boost logo

Boost Users :

Subject: Re: [Boost-users] [bind] & [function] How do I store member functions with generalised arguments for later invocation?
From: John Dlugosz (JDlugosz_at_[hidden])
Date: 2010-06-04 17:31:56


Let the existing 'function' and 'bind' libraries do the work.

Just make a queue of 'function<sig>' instances, by value.
Use bind along with reference_wrapper and smart pointers for state, when you really want something to live outside the function instance.



From: boost-users-bounces_at_[hidden] [mailto:boost-users-bounces_at_[hidden]] On Behalf Of Steve Lorimer
Sent: Friday, June 04, 2010 3:34 AM
To: boost-users_at_[hidden]
Subject: Re: [Boost-users] [bind] & [function] How do I store member functions with generalised arguments for later invocation?

Hi Nat

I think we're on the same page here - but I'm looking for a more generalised form. Who owns "obj" or T that_ in my example below is unimportant - I guess we should enforce using a shared_ptr to "obj" so that it is unimportant who owns it.

So this is what I came up with before your reply... and although I think it's not really the right way to do it, I think it will serve to illustrate what I'm trying to do. (this code works in conjunction with the EventQueue code I pasted in my original question (see below)

// all callbacks will be of the form void T::mem_fn(...)
// this for the callback of form T::mem_fn()
template <typename T>
class event_0
{
    EventQueue &queue_; // the event queue which will process the job
    boost::shared_ptr<T> that_; // the object whose member function the event queue will call

    typedef void (T::*CB_t)(); // the member function of class T which we will callback to

    boost::function<void()> job_;

public:
    event_0(boost::shared_ptr<T> that, CB_t cb, EventQueue &queue) :
        queue_(queue),
        that_(that),
        job_(boost::bind(cb, that)) // bind the class instance and its callback function together for later invocation
    {}

    // the function the user calls which posts the job to the queue. The thread associated with the queue picks up the
    // job off the queue and calls it (ergo - calls that_->CB_t(); )
    void post() { queue_.post(job_); }
};

// So in my application I have this class:

struct foo0
    : boost::enable_shared_from_this<foo0>
{
    event_0<foo0> ev;

    foo0(EventQueue &queue) : ev(boost::shared_ptr<foo0>(this), &foo0::bar, queue) { }

    void bar()
    {
        printf("%p foo::bar()\n", this);
    }

    ~foo0()
    {
        std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;
    }

};

// and I create an event which is associated with a queue. This is most likely a class member, where the class is, for eg, the producer, and the queue being passed to the event, the queue for the consumer.

boost::shared_ptr<foo0> job_3(new foo0(queue));

// and then some time later, when the producer has created some data...

job_3->ev.post(); // resulting in void foo0::bar() function being called on the consumer thread

// so - now we want to be able to post a shared_ptr to the consumer - ie: the producer creates some data (maybe receives it from a socket) and wants to pass it to the consumer. So we want to be able to go job_4->ev.post(data);

// this for the callback of form T::mem_fn(A0)
template <typename T, typename A0>
class event_1
{
    EventQueue &queue_; // the event queue which will process the job
    boost::shared_ptr<T> that_; // the object whose mem_fun the event queue will call

    typedef void (T::*CB_t)(A0);
    typedef boost::function<void()> Job;
    Job job_;

public:
    event_1(boost::shared_ptr<T> that, CB_t cb, EventQueue &queue) :
        queue_(queue), that_(that), job_(boost::bind(cb, that, _1))
    {}

    void post(A0 data) { queue_.post(boost::bind(job_, data)); }
};

In other words, I hide the details of the bind from the user, and all he has to do is create some data and post it to the event, which sorts out the binding, queueing, etc - We could make it requisite that all classes which have a callback function used by the event must be derived from shared_ptr (or be shared_ptrs, or whatever), and all data posted by events ( event_1::post(A0) ) must be shared - to ensure thread safety.

Ideally I'd like to be able to generalise the above somehow, and be able to cater for post(A0), post(A0, A1), post(A0, A1, A2) etc.

Do you sorta see what I'm trying to do? If I can attempt to explain it better... I'm trying to create some library code which will handle inter-thread messaging in an elegant way, and hide the implementation details from the user. The idea is the EventQueue class will actually be part of an EventThread class - you create an EventThread, and all it does is endlessly loop, picking up Event objects from its EventQueue, and calling their functors (which in fact wrap up a class and member callback function and has a shared_ptr to some data which is the argument to the member callback function) - So the creation of the thread, the creation of the queue etc is hidden - all the user does is create events and call event::post(data).

Do you think this is even a good design? Would you suggest a different design philosophy?

TIA
Steve

PS: My EventQueue code is here:

#include <boost/enable_shared_from_this.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <stdio.h>
#include <list>
#include <iostream>

// cross-thread event queue. multiple threads can post jobs, one or many threads can execute
// jobs.
class EventQueue
{
public:
    typedef boost::function<void()> Job;

private:

    boost::mutex mtx_;
    boost::condition cnd_;
    typedef std::list<Job> Jobs;
    Jobs jobs_;
    int stop_;

public:

    // puts a jon into the queue
    void post(Job job)
    {
        boost::mutex::scoped_lock lock(mtx_);
        jobs_.push_back(job);
        cnd_.notify_one();
    }

    // pulls one job from the queue, returns false when stopped
    bool pull(Job* job)
    {
        boost::mutex::scoped_lock lock(mtx_);
        for(;;)
        { // handle spurious wake-ups
            while(!stop_ && jobs_.empty())
                cnd_.wait(lock);
            if(!jobs_.empty() && 2 != stop_)
            {
                job->swap(jobs_.front()); // move efficiently, avoiding *job = jobs.front()
                jobs_.pop_front();
                return true;
            }
            else if(stop_)
            {
                return false;
            }
        }
    }

    // make pull() return false
    void stop(bool cancel_jobs)
    {
        boost::mutex::scoped_lock lock(mtx_);
        stop_ = 1 + cancel_jobs; // 1 - complete jobs, 2 - cancel jobs
        cnd_.notify_all();
    }

    EventQueue() : stop_() {}
    ~EventQueue() { this->stop(true); }
};
//-----------------------------------------------------------------------

// all callbacks will be of the form void T::mem_fn(...)
// this for the callback of form T::mem_fn()
template <typename T>
class event_0
{
    EventQueue &queue_; // the event queue which will process the job
    boost::shared_ptr<T> that_; // the object whose mem_fun the event queue will call

    typedef void (T::*CB_t)();
    typedef boost::function<void()> Job;
    Job job_;

public:
    event_0(boost::shared_ptr<T> that, CB_t cb, EventQueue &queue) :
        queue_(queue), that_(that), job_(boost::bind(cb, that))
    {}

    void post() { queue_.post(job_); }
};
//-----------------------------------------------------------------------

struct foo0
    : boost::enable_shared_from_this<foo0>
{
    event_0<foo0> ev;

    foo0(EventQueue &queue) : ev(boost::shared_ptr<foo0>(this), &foo0::bar, queue) { }

    void bar()
    {
        printf("%p foo::bar()\n", this);
    }

    ~foo0()
    {
        std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;
    }

};
//-----------------------------------------------------------------------

void anotherThread(EventQueue* queue)
{
    std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;
    EventQueue::Job job;
    // wait and execute jobs till stopped
    while(queue->pull(&job))
        job(); // execute the job
}
//-----------------------------------------------------------------------

int main()
{
    std::cout << __func__ << " - Thread id is: " << boost::this_thread::get_id() << std::endl;

    EventQueue queue;
    // start another thread and pass an argument
    boost::thread another_thread(boost::bind(anotherThread, &queue));

    boost::shared_ptr<foo0> job_3(new foo0(queue));
    job_3->ev.post();

    // stop the queue and let it complete all jobs
    queue.stop(false);
    another_thread.join();

    return 0;
}
//-----------------------------------------------------------------------


On 3 June 2010 22:11, Nat Goodspeed <nat_at_[hidden]<mailto:nat_at_[hidden]>> wrote:

it's not really clear to me whether 'obj' is owned by the consumer thread or is in the hands of the producer object. Let's consider both cases.

==== Either way:

class Data { ... };
typedef boost::shared_ptr<Data> DataPtr;

==== If 'obj' is the ConsumerClass instance:

class ConsumerClass
{
public:
   ...
   void run();
   void doSomething(DataPtr data);
   void somethingElse(DataPtr data);
   ...
};

typedef (ConsumerClass::*ConsumerMethod)(DataPtr);

typedef boost::function<void(ConsumerClass*)> QueueItem;
typedef YourFavoriteThreadSafeQueue<QueueItem> Queue;

void ConsumerClass::run()
{
   for (;;)
   {
       QueueItem item(queue.get());
       item(this);
   }
}

void ProducerClass::post(ConsumerMethod method, DataPtr data)
{
   queue.put(boost::bind(method, _1, data));
}

TradeStation Group, Inc. is a publicly-traded holding company (NASDAQ GS: TRAD) of three operating subsidiaries, TradeStation Securities, Inc. (Member NYSE, FINRA, SIPC and NFA), TradeStation Technologies, Inc., a trading software and subscription company, and TradeStation Europe Limited, a United Kingdom, FSA-authorized introducing brokerage firm. None of these companies provides trading or investment advice, recommendations or endorsements of any kind. The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is prohibited. If you received this in error, please contact the sender and delete the material from any computer.



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net