Boost logo

Boost :

From: Vaclav Vesely (vaclav.vesely_at_[hidden])
Date: 2006-03-21 07:11:30


Christopher Kohlhoff wrote:
> --- Vaclav Vesely <vaclav.vesely_at_[hidden]> wrote:
>> I'm
>> missing is a description (or even tutorial) about creating
>> custom asynchronous providers.
...
>
> Have a look at the example in src/examples/services, in
> particular the logger-related classes. The logger_service uses a
> background thread, with a private demuxer object to dispatch the
> work.
...

Thank you, it helped me much. I've written ASIO background_thread class
which calls arbitrary functions in a background thread. If you find it
useful, you can add it to examples. After some refining it may be even
added to the ASIO library.

Moreover background_thread has async_run_loop member function which
calls the work function as long as the complete handler returns true.
It's common usage of asynchronous classes (for example sockets, where
handler for async_read will probably start a new read operation). It
would be handy to cover this technique generally for all ASIO classes.

Regards,
Vaclav


#include <iostream>
#include "boost/asio.hpp"
#include "boost/bind.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
#include "boost/thread.hpp"

using namespace boost;
using namespace std;

//-----------------------------------------------------------------------------

template<typename Demuxer>
class background_thread
{

public:

    background_thread(Demuxer& demuxer):
        m_demuxer(demuxer),
        m_work_demuxer(),
        m_shutdown(false)
    {
    }

    ~background_thread()
    {
        m_shutdown = true;
    }

private:

    template<typename Function, typename Handler>
    class run_proc
    {

    public:
        
        run_proc(Demuxer& demuxer, Function function, Handler handler):
            m_demuxer(demuxer),
            m_function(function),
            m_handler(handler),
            m_work(m_demuxer)
        {
        }

        void operator()()
        {
            m_function();
            m_demuxer.post(m_handler);
        }

    private:

        Demuxer& m_demuxer;
        Function m_function;
        Handler m_handler;
        typename Demuxer::work m_work;
    
    };

public:

    template<typename Function, typename Handler>
    void async_run(Function function, Handler handler)
    {
        m_work_demuxer.post(run_proc<Function, Handler>(
            m_demuxer, function, handler));
        {
            mutex::scoped_lock lock(m_mutex);
            m_has_work = true;
            m_has_work_condition.notify_one();
        }
        start_work_thread();
    }

private:

    template<typename Function, typename Handler>
    class run_loop_handler
    {

    public:
        
        run_loop_handler(background_thread& thread, Function function,
                Handler handler):
            m_thread(thread),
            m_function(function),
            m_handler(handler)
        {
        }

        void operator()()
        {
            if(m_handler())
                    m_thread.async_run(m_function, *this);
        }

    private:

        background_thread& m_thread;
        Function m_function;
        Handler m_handler;

    };

public:

    template<typename Function, typename Handler>
    void async_run_loop(Function function, Handler handler)
    {
        async_run(function,
            run_loop_handler<Function, Handler>(*this, function, handler));
    }

private:

    void work_thread_proc()
    {
        while(!m_shutdown)
        {
            {
                mutex::scoped_lock lock(m_mutex);
                while(!m_has_work)
                    m_has_work_condition.wait(lock);
                m_has_work = false;
            }
            m_work_demuxer.run();
            m_work_demuxer.reset();
        }
    }

    void start_work_thread()
    {
        mutex::scoped_lock lock(m_mutex);
        if(!m_work_thread)
        {
            m_work_thread.reset(new thread(
                bind(&background_thread::work_thread_proc, this)));
        }
    }

private:

    Demuxer& m_demuxer;
    mutex m_mutex;
    asio::demuxer m_work_demuxer;
    scoped_ptr<thread> m_work_thread;
    bool m_has_work;
    condition m_has_work_condition;
    bool m_shutdown;

};

//-----------------------------------------------------------------------------

mutex cout_mutex;

int wait(int seconds)
{
    {
        mutex::scoped_lock lock(cout_mutex);
        cout << "Wait for " << seconds << " seconds" << endl;
    }
    xtime xt;
    xtime_get(&xt, boost::TIME_UTC);
    xt.sec += seconds;
    thread::sleep(xt);
    static int count = 0;
    return ++count;
}

void wait_finished()
{
    {
        mutex::scoped_lock lock(cout_mutex);
        cout << "Waiting finished." << endl;
    }
}

bool wait_three_times()
{
    static int count = 3;
    {
        mutex::scoped_lock lock(cout_mutex);
        cout << "Countdown " << count << endl;
    }
    return (--count > 0);
}

//-----------------------------------------------------------------------------

void main()
{
    asio::demuxer demuxer;
    background_thread<asio::demuxer> bg_thread(demuxer);
    bg_thread.async_run(bind(wait, 1), wait_finished);
    demuxer.run();

    bg_thread.async_run_loop(bind(wait, 1), wait_three_times);
    demuxer.reset();
    demuxer.run();
}

//-----------------------------------------------------------------------------


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk