Boost logo

Boost :

From: Yigong Liu (yigongliu_at_[hidden])
Date: 2008-05-16 03:53:50


Hello,

Here are some thoughts i had during studying N2561 and future proposals.
Please correct.

Before diving into "future". Let's clarify a little bit about what is Join.
Join's core primitives (async/synch/chord/actor) are for synchronization of
threads and orchestration of async events. Executor(thread pool) is only one
(the first) application built on top of Join's primitives which helps
developing samples, but is not a intrinsic part of Join (Cw doesnt use
thread pool, we also could use Asio's main threads as executor which has
been described in Join's design doc)

Now let's see what is a (plain-vanilla) "future":
   1> in Braddock's doc: a special variable with undefined value at
creation, can be set later by another thread, any attempt to get future's
value will be blocked if it is not set yet.
   2> in terms of N2561 proposal: "... a kind of return buffer that takes a
value (or an exception) in one (sub-)thread and provides the value in
another (controlling) thread. This buffer provides essentially two
interfaces:
    * an interface to assign a value as class promise and
    * an interface to wait for, query and retrieve the value (or exception)
from the buffer as classes unique_future and shared_future."

>From both documents, future has 2 interfaces with diff behaviour:
   * the promise<T> interface is "asynchronous": the thread which sets the
future's value will not wait for future's reader; it just sets the value and
go
   * the future<T> interface is "synchronous": the thread which gets/reads
the future's value will block wait if the future is not set yet

We can define these 2 interfaces with Join as following:
    template<typename T>
    class promise_intf {
    public:
       async<void(T)> set;
    };

    template <typename T>
    class future_intf {
    public:
       synch<T()> get;
    }

A future class and its synchronization between promise_interface and
future_interface can be defined as following:
    template <typename T>
    class future : public promise_intf<T>,
                         public future_intf<T>,
                   public actor
    {
    public:
       future() {
          chord(get, set, &future::get_result);
       }
    private:
       T get_result(synch_o<T()> g, async_o<void(T)> s) {
          return s.arg1;
       }
    };

Manually crafted future is normally implemented using one mutex and one
condition var. The rule to get the number of (mutex, cond) used by a Join
class is simple: each actor holds a mutex which is shared by all code in
async/synch/actor, each synch<> method holds a conditional var and async<>
methods hold nothing. So the above future class use exactly one mutex and
one conditional var, the same as manually crafted.

To transfer both result value and exception, we need use a result holder and
a diff promise interface:
    template <typename T>
    class result {
    public:
             shared_ptr<T> value;
             exception_ptr except;
              ...
    };

    template <typename T>
    class promise_intf {
    protected:
             async<void(result<T> r)> set_result;
    public:
       void set(T t) { set_result(result<T>(t)); }
       void fail(exception_ptr e) { set_result(result<T>(e)); }
    };

The future class will be modified as following:
    template <typename T>
    class future : public promise_intf<T>,
                 public future_intf<T>,
           public actor
    {
    public:
       future() {
          chord(get, set_result, &future::get_result);
       }
    private:
       T get_result(synch_o<T()> g, async_o<void(result<T>)> res) {
          if(res.arg1.except)
               rethrow_exception(res.arg1.except);
          return res.arg1.value;
       }
    };

Again this class uses one mutex and one cond var.

If we step back from the mental image of treating "future" as "a special
kind of variable" for now, we can treat promise (future-setter) interface as
message sending interface, and treat future_intf (future-reader) interface
as message receiving interface. Then "future" becomes a message passing
channel between the thread which produces and sets future's value and the
thread which consumes future's value. Message sending and promise have the
same behavior (async, send and go); and message recving and future_intf have
the same behaviour (blocking wait).

Also the above first future definition is exactly the same as the simplest
message queue defined in Join:
    template <typename T>
    class queue {
    public:
       async<void(T)> send;
       synch<T()> recv;
       queue() {
        chord(recv, send, &queue::forward);
       }
    private:
       T forward(synch_o<T()> r, async_o<void(T)> s) {
        return s.arg1;
       }
    };

The following code is from Braddock's JobQueue:
    template <class T>
      future<T> schedule(boost::function<T (void)> const& fn) {
    boost::mutex::scoped_lock lck(mutex_);
    promise<T> prom; // create promise
    q_.push_back(future_wrapper<T>(fn, prom)); //queue the job
    condition_.notify_all(); // wake worker thread(s)
    return future<T>(prom); // return a future created from the promise
      }
This code presents a very interesting idiom: we first create future (inside
promise<T>'s constructor) - the message channel. Then we bundle the work
item up with prom - the sending-interface of channel together, and pass this
bundle to worker thread. (Asynchronously the worker thread will do the work
and write the result to the sending-interface of channel). Finally we return
future_intf - the receiving-interface of channel to the receiving thread.
Here we have loose-coupling: worker thread don't care where the request come
from and the receiver don't care who sends the result.
Bundling a response-channel with a request message is the normal way how
function-call or RPC call are implemented in message passing systems. It
also shows another intrinsic property of message passing: using channels to
define interfaces between modules and threads to achieve loose-coupling.

If we can wrap around our mind and think of "future" as a form of message
passing, we can gain much freedom from traditional concept of "future", for
example:
   * we can simply bundle a reference to an async method (channel) with
request; and worker thread will return result by invoking the async method
when work item is done.
   * the receiving thread can wait-for/handle the result differently:
      . we can block the thread at the "getting" point, same as the current
"future" implementation;
      . Or if the system is already message passing based (we have a
main-loop to receive and process messages), and the work item request is
probably sent from a message handling function to worker threads, then we
can simply create a new chord which adds the response channel (async method)
to the set of channels handled by the main-loop and go back to blocking wait
at and serve main-loop;
      . Or if the application is driven by thread-pool, we can create a
chord to dispatch the result from response channel to thread-pool to handle
it asynchronously.

Treating future as message passing also make it easier to implement
wait_any_future and wait_all_future.
    template <typename T1, T2, T3...>
    class wait_all_future : public actor {
    public:
       async(void(T1)> promise1;
       async(void(T2)> promise2;
       async(void(T3)> promise3;
        ...
       synch<tuple<T1,T2,T3,...>()> wait;

       wait_all_future() {
        chord(wait, promise1, promise2, ..., &wait_all_future::proc);
       }
    private:
       tuple<T1,T2,T3,...> proc(synch_o<tuple<T1,T2,T3,...>()> w,
async_o<void(T1)> p1, async_o<void(T2)> p2, ...) {
        return tuple<T1,T2,..>(p1.arg1, p2.arg1,...);
       }
    };
Applications can instantiate one wait_all instance, bundle its async
response channels (promise1,promise2,...) with diff requests to diff worker
threads, and then call wait(), blocking wait untill all results come back.

    template <typename T1, T2, T3...>
    class wait_any_future : public actor {
    public:
       async(void(T1)> promise1;
       async(void(T2)> promise2;
       async(void(T3)> promise3;
        ...
       synch<void()> wait;

       wait_any_future() {
        chord(wait, promise1, &wait_any_future::proc_future1);
        chord(wait, promise2, &wait_any_future::proc_future2);
            ...
       }
    private:
       void proc_future1(synch_o<void()> w, async_o<void(T1)> p1) {
        ... process p1.arg1 ...
       }
       void proc_future2(synch_o<void()> w, async_o<void(T2)> p2) {
        ... process p2.arg1 ...
       }
        ...
    };
Applications can instantiate one wait_all instance, bundle its async
response channels (promise1,promise2,...) with diff requests to diff worker
threads, and then call wait() in a loop, any results coming back will be
processed in the order it arrives.

Sorry for the long-long email. Any comments? corrections?
Thanks
Yigong


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk