Boost logo

Boost :

Subject: [boost] Is there any interest in Asynchronous Executors and Asynchronous Completion Token library
From: vicente.botet (vicente.botet_at_[hidden])
Date: 2009-01-02 11:48:35


Hi,

I'm playing with some asynchronous executors and want to know if there is an interest in a library that work with Asynchronous Executors (AE) and Asynchronous Completion Tokens (ACT) in a generic way (or atleast this was my intention). The library can be considered as a front-end for several Asynchronous Execution models making it possible to share common algorithms and making easier to switch from an Asynchronous Executors to another.

For the moment I have included it in my Bosst.InterThreads library (bith=boost::interthreads), but this is independent and can be included in an independent library (any suggestion for a name and a namespace?)

Next follows the motivation for the library and how the library manage with this concepts. Of couse any comments are welcome.

In N1833 - Preliminary Threading Library Proposal for TR2 Kevlin Henney introduce the concept of threader and a function thread that evaluate a function asynchronously and returns a joiner handle.
In N2185 - Proposed Text for Parallel Task Execution Peter Dimov introduce a fork function able to evaluate a function asynchronously and returns a future handle.
In N2276 - Thread Pools and Futures Anthony William introduce launch_in_thread and launch_in_pool function templates which evaluate a function asynchronously either in a specific thread or a thread pool and returns a unique_future handle.
In Boost.ThreadPool Oliver Kowalke propose a complete implementation of a thread pool with a submit function which evaluate a function asynchronously and returns a task handle.

Behind all these proposal there is a concept of asynchronous executor, fork-launch-like function and the asynchronous completion token handle.

Name executor fork-like ACT
Boost.Thread ?? thread constructor thread
Boost.ThreadPool tp::pool submit tp::task
N2276 thread launch_in_thread unique_future<T>
N2276 thread_pool launch_in_pool unique_future<T>
N2185 ?? fork future<T>
N1833 threader thread joiner<T>

The asynchronous completion token models can follows two interfaces, the thread interface and the unique_future interface.
Some asynchronous completion token handles allows to recover the result of the evaluation of the function (futures), other allows to manage the underlying thread of execution (thread) and some both (joiner and tp::task).

It seems natural to make a generic fork function that will evaluate a function asynchronously with respect to the calling thread and returns an ACT handle. The following metafunction associated an ACT handle to a asynchronous executor.

template <typename AE, typename T>
struct asynchronous_completion_token {
    typedef typename AE::handle<T>::type type;
};

The result of forking a nullary function by an asynchronous executor is given by the following metafunction:

namespace result_of {
    template <typename AE,typename F>
    struct fork {
        typedef typename asynchronous_completion_token<AE, typename result_of<F()>::type>::type type;
    };
}

The default implementation of fork delegates on fork asynchronous executor function.

template< typename AE, typename F >
result_of::fork<AE, F>::type fork( AE& ae, F fn ) {
    return ae.fork(fn);
}

Forking n-ary functions relies on the nullary version and bind.

template< typename AE, typename F, typename A1, ..., typename An >
asynchronous_completion_token<AE, typename result_of<F(A1,..., An)>::type >::type
fork( AE& ae, F fn, A1 a1, ..., An an ) {
    return ae.fork( bind( fn, a1, ..., an ) );
}

We can define a basic_threader which just returns a new thread as follows:

class basic_threader {
public:
    template <typename T>
    struct handle {
        typedef boost::thread type;
    };

    template <typename F>
    boost::thread fork(F f) {
        thread th(f);
        return boost::move(th);
    }
};

The library includes a launcher class that creates a thread and returns a unique_future when forking.

class launcher {
public:
    template <typename T>
    struct handle {
        typedef unique_future<T> type;
    };
    template <typename F>
    unique_future<typename result_of<F()>::type>
    fork(F f) {
        typedef typename result_of<F()>::type result_type;
        packaged_task<result_type> tsk(f);
        unique_future<result_type> res = tsk.get_future();
        thread th(boost::move(tsk));
        return res;
    }
};

Given the sequential example (taken from N2185)

double f( double a, int n )
{
    double r = 0.0;

    for( int i = 1; i <= n; ++i )
    {
        double x = 1.0 / i;
        r += std::pow( x, a );
    }

    return r;
}

int main()
{
    double m1 = f( 1.0, 1000000 );
    double m2 = f( 1.0, 5000000 );
    double m3 = f( 2.2, 1000000 );
    double m4 = f( 2.2, 5000000 );

    std::cout << m2 - m1 + m3 - m4 << std::endl;
}

this library allows a programmer to switch to parallel execution as follows:

int main()
{
    launcher l;
    boost::unique_future<double> fm1 = bith::fork( l, f, 1.0, 1000000 );
    boost::unique_future<double> fm2 = bith::fork( l, f, 1.0, 5000000 );
    boost::unique_future<double> fm3 = bith::fork( l, f, 2.2, 1000000 );
    boost::unique_future<double> fm4 = bith::fork( l, f, 2.2, 5000000 );

    std::cout << fm2.get() - fm1.get() + fm3.get() - fm4.get() << std::endl;
}

The library include also a threader class based on the Kevlin proposal:

class threader {
public:
    template <typename T>
    struct handle {
        typedef joiner<T> type;
    };
    template <typename F>
    joiner<typename result_of<F()>::type>
    fork(F f) {
        typedef typename result_of<F()>::type result_type;
        return joiner<result_type>(f);
    }

};

The question now is how we can adapt it to an existing asynchronous executor such as the Boost.ThreadPool library. We need to specialize the template class asynchronous_completion_token

template <typename Channel, typename T>
struct asynchronous_completion_token<boost::tp::pool<Channel>,T> {
    typedef boost::tp::task<T> type;
};

and function fork function

template< typename Channel, typename F >
result_of::fork<boost::tp::pool<Channel>, F>::type
fork<boost::tp::pool<Channel>,F>( boost::tp::pool<Channel>& ae, F fn ) {
    return ae.submit(fn);
}

Note that the single fork function that needs specialization is the one taking a nullary function as parameter.

We can write the preceding main function in a more generic way

template < typename AE>
void doit(AE& ae)
{
    bith::result_of::fork<AE, f, tuple<double, int> >::type fm1 = bith::fork(ae, f, 1.0, 1000000 );
    bith::result_of::fork<AE, f, tuple<double, int> >::type fm2 = bith::fork(ae, f, 1.0, 5000000 );
    bith::result_of::fork<AE, f, tuple<double, int> >::type fm3 = bith::fork(ae, f, 2.2, 1000000 );
    bith::result_of::fork<AE, f, tuple<double, int> >::type fm4 = bith::fork(ae, f, 2.2, 5000000 );

    std::cout << fm2.get() - fm1.get() + fm3.get() - fm4.get() << std::endl;
}

int main()
{
    bith::launcher ae;
    doit(ae);
}

and we can switch from using the launcher or the tp::pool just by changing one line
int main()
{
    boost::tp::pool<> ae(boost::tp::poolsize(6));
    doit(ae);
}

The library allows also to fork several functions at the same time

template< typename AE, typename F1, ..., typename Fn>
typename result_of::fork_all<AE, fusion::tuple<F1,..., F2> >::type
fork_all( AE& ae, F1 f1, ..., Fn fn ) {
    typedef typename result_of::fork_all<AE, fusion::tuple<F1,..., Fn> >::type type;
    return type(fork(ae,f1),..., fork(ae,fn));
}

And use it like:

bith::result_of::fork_all<AE, f, g, h>::type handles = bith::fork_all(ae, f, g, h);
std::cout << get<1>(res).get() - get<0>(res).get() + get<2>(res).get() << std::endl;

The result of the fork_all operation is a Boost.Fusion tuple of asynchronous completion token handles. The user can apply any Boost.Fusion algorithm on this tuple as for example

bool b = boost::fusion::none(handles, fct::interruption_requested());

To make common tasks with ACT easier the library provide some ACT related functors in the namespace fct.
For the thread interface:
* fct::join
* fct::join_until
* fct::join_for
* fct::detach
* fct::interrupt
* fct::interrupt_requested

and for the future interface:
* fct::get
* fct::wait
* fct::wait_until
* fct::wait_for
* fct::is_ready
* fct::has_value
* fct::has_exception

Here is an example for get:
namespace fct {
    struct get {
        template<typename ACT>
        typename ACT::result_type operator()(ACT& t) const {
            return t.get();
        }
    };
}

In addition the library provides some non member functions that are the result of applying these functors to the tuple using a fusion algorithm:
* join_all
* join_all_until
* join_all_for
* detach_all
* interrupt_all
* interrupt_requested_on_all
* get_all
* get_all_until
* get_all_for
* wait_all
* wait_all_until
* wait_all_for
* are_all_ready
* have_all_value
* have_all_exception

Next follows how get_all is defined.

template <typename Sequence>
typename result_of::get_all<Sequence>::type
get_all(Sequence& t) {
    return fusion::transform(t, fct::get());
}

The library defines in a systematic way the result_of of a function as a metafunction having the same name as the function on the namespace result_of, as the Boost.Fusion library does.

namespace result_of {
    template <typename Sequence>
    struct get_all {
        typedef typename fusion::result_of::transform<Sequence, fct::get>::type type
    };
}

So the user can do the following

bith::result_of::fork_all<AE, f, g, h>::type res = bith::fork_all(ae, f, g, h);
bith::result_of::get_all<result_of::fork_all<AE, f, g, h>::type values = bith::get_all(handles);

or using a typedef

typedef bith::result_of::fork_all<AE, f, g, h>::type handles_type;
handles_type handles = bith::fork_all(ae, f, g, h);
bith::result_of::get_all<handles_type>::type values= bith::get_all(handles);

Note that the auto C++0x feature is very useful in these cases.

auto res = bith::fork_all(ae, f, g, h);
auto values = bith::get_all(handles);

Last but not least the library provides also some sugaring functions like wait_for_all that forks and wait for the result.

bith::result_of::wait_for_all<AE, f, g, h>::type res = bith::wait_for_all(ae, f, g, h);
std::cout << get<1>(res) - get<0>(res) + get<2>(res) << std::endl;

and wait_for_any which works with functions that return the same type or are convertible to the same type.

bith::result_of::wait_for_any<AE, f, g, h>::type res = bith::wait_for_any(ae, f, g, h);
std::cout << "function " << res.first << " finshed first with result=" << res.second << std::endl;

Greatings,
Vicente Juan Botet Escribá


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk