Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r52496 - in sandbox/join: boost boost/join boost/join/base boost/join/idioms libs libs/join libs/join/doc libs/join/examples
From: yigongliu_at_[hidden]
Date: 2009-04-19 20:00:41


Author: yigongliu
Date: 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
New Revision: 52496
URL: http://svn.boost.org/trac/boost/changeset/52496

Log:
update files to Join r6
Added:
   sandbox/join/boost/
   sandbox/join/boost/join/
   sandbox/join/boost/join/base/
   sandbox/join/boost/join/base/exceptions.hpp (contents, props changed)
   sandbox/join/boost/join/base/join_base.hpp (contents, props changed)
   sandbox/join/boost/join/base/joint.hpp (contents, props changed)
   sandbox/join/boost/join/base/port.hpp (contents, props changed)
   sandbox/join/boost/join/base/utils.hpp (contents, props changed)
   sandbox/join/boost/join/idioms/
   sandbox/join/boost/join/idioms/asio_executor.hpp (contents, props changed)
   sandbox/join/boost/join/idioms/executor.hpp (contents, props changed)
   sandbox/join/boost/join/idioms/rr_executor.hpp (contents, props changed)
   sandbox/join/boost/join/join.hpp (contents, props changed)
   sandbox/join/libs/
   sandbox/join/libs/join/
   sandbox/join/libs/join/doc/
   sandbox/join/libs/join/doc/boost_join_design.html (contents, props changed)
   sandbox/join/libs/join/doc/chords_joints.html (contents, props changed)
   sandbox/join/libs/join/doc/internals.html (contents, props changed)
   sandbox/join/libs/join/doc/references.html (contents, props changed)
   sandbox/join/libs/join/doc/samples.html (contents, props changed)
   sandbox/join/libs/join/doc/synopsis_port.html (contents, props changed)
   sandbox/join/libs/join/doc/tutorials.html (contents, props changed)
   sandbox/join/libs/join/examples/
   sandbox/join/libs/join/examples/Jamfile.v2 (contents, props changed)
   sandbox/join/libs/join/examples/async_call_ret.cpp (contents, props changed)
   sandbox/join/libs/join/examples/bounded_buffer.cpp (contents, props changed)
   sandbox/join/libs/join/examples/buffer.cpp (contents, props changed)
   sandbox/join/libs/join/examples/buffer_lambda.cpp (contents, props changed)
   sandbox/join/libs/join/examples/buffer_phoenix.cpp (contents, props changed)
   sandbox/join/libs/join/examples/chain.cpp (contents, props changed)
   sandbox/join/libs/join/examples/flows.cpp (contents, props changed)
   sandbox/join/libs/join/examples/join_many.cpp (contents, props changed)
   sandbox/join/libs/join/examples/joint_lifetime.cpp (contents, props changed)
   sandbox/join/libs/join/examples/prime_sieve.cpp (contents, props changed)
   sandbox/join/libs/join/examples/producer_consumer.cpp (contents, props changed)

Added: sandbox/join/boost/join/base/exceptions.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/exceptions.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,110 @@
+//
+// boost/join/exceptions.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_EXCEPTIONS_HPP
+#define BOOST_JOIN_EXCEPTIONS_HPP
+
+namespace boost {
+ namespace join {
+
+ class join_exception : public std::exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join-related exception";
+ }
+ };
+ class not_in_chord_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: async<>/synch<> channels not in chord exception";
+ }
+ };
+ class double_association_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: async<>/synch<> channels associated with more than one joint exception";
+ }
+ };
+ class queue_overflow_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: queue overflow exception";
+ }
+ };
+ class missing_result_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: synch<> channel missing result exception";
+ }
+ };
+ class no_executor_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: no executor exception";
+ }
+ };
+ class hidden_chord_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: hidden chord exception";
+ }
+ };
+ class too_many_ports_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: Too many channels defined in a joint";
+ }
+ };
+ class chord_override_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: Chord override failure, chord not found";
+ }
+ };
+ class chord_remove_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: Chord remove failure, chord not found";
+ }
+ };
+ class executor_missing_rr_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: executor's dynamic task queues are used, however non round robin scheduling is defined";
+ }
+ };
+ class synch_not_1st_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: synchronous channel is used not as the first method of chord";
+ }
+ };
+ class single_synch_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: more than one synchronous channels are used";
+ }
+ };
+ class port_chord_reset_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: channels or joint are reset unexpectedly";
+ }
+ };
+ class synch_time_out_exception : public join_exception {
+ public:
+ virtual const char *what() const throw () {
+ return "Join: synchronous ports time out unexpectedly";
+ }
+ };
+
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/base/join_base.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/join_base.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,118 @@
+//
+// boost/join/join_base.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_BASE_HPP
+#define BOOST_JOIN_BASE_HPP
+
+#include <string>
+#include <vector>
+#include <boost/thread.hpp>
+#include <boost/function.hpp>
+#include <boost/utility.hpp>
+#include <boost/join/base/utils.hpp>
+#include <boost/join/base/exceptions.hpp>
+
+namespace boost {
+ namespace join {
+ namespace detail {
+
+ class port;
+
+ class joint_base {
+ public:
+ typedef boost::function0<void> callable;
+ const char * name_;
+ logger log;
+ boost::mutex mutex_; // protects joint global status including ports and chords
+ joint_base(const char *n=0) : name_(n), log(n) {}
+ virtual ~joint_base() {}
+ virtual void reset() = 0;
+ virtual bool check_heartbeat() = 0;
+ virtual bool has_spawn(void) = 0;
+ virtual void spawn(callable) = 0;
+ virtual bool port_invoked(int ind) = 0;
+ virtual void port_revoked(int ind) = 0;
+ virtual void port_remove(int ind) = 0;
+ };
+
+ template <typename R>
+ class chord_base {
+ public:
+ virtual void capture_arguments(boost::function0<R> &cb) = 0;
+ virtual ~chord_base() {}
+ };
+
+ class port {
+ public:
+ enum port_type {
+ async,
+ synch
+ } ;
+ port_type type_;
+ boost::shared_ptr<joint_base> joint_; //pointer to owning joint
+ int index_; //my index in joint
+ unsigned int num_pending_; //pending calls/msgs at this port
+
+ port(port_type t, boost::shared_ptr<joint_base> j) :
+ type_(t), joint_(j), index_(-1), num_pending_(0) {
+ }
+
+ virtual ~port() {
+ detach();
+ }
+
+ //called from client, detach port from joint
+ void detach() {
+ if (joint_ != 0) {
+ joint_->log.stream() << "a port is being destructed..." << joint_.use_count() << logger::endl;
+ joint_->port_remove(index_);
+ joint_.reset();
+ }
+ }
+
+ //when detached from joint, need reset
+ //caled from joint destructor
+ virtual void reset(void) {
+ if (joint_ != 0)
+ joint_->log.msg("a port is reset ...");
+ index_ = -1;
+ num_pending_ = 0;
+ }
+
+ //called from async/synch interface, need hold a shared_ptr to joint so its destructor will not be called right away
+ void reset_joint() {
+ boost::shared_ptr<joint_base> my_joint(joint_);
+ if (my_joint != 0)
+ my_joint->reset();
+ }
+
+ //detach port from joint, only can be done by chord_remove
+ //when all its chords are removed, a port is reset & detached
+ //called from joint
+ void detach_joint(void) {
+ if (joint_ !=0) {
+ joint_->log.stream() << "a port is detached ... joint remaining count: " << joint_.use_count()-1 << logger::endl;
+ joint_.reset();
+ }
+ }
+
+ bool test_chord_fire(void) {
+ return joint_->port_invoked(index_);
+ }
+
+ void port_revoked(void) {
+ joint_->port_revoked(index_);
+ }
+ };
+
+ }
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/base/joint.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/joint.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,1143 @@
+//
+// boost/join/joint.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_JOINT_HPP
+#define BOOST_JOIN_JOINT_HPP
+
+#include <vector>
+#include <bitset>
+#include <algorithm>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/ref.hpp>
+#include <boost/mpl/if.hpp>
+#include <boost/join/base/join_base.hpp>
+#include <boost/join/base/exceptions.hpp>
+
+namespace boost {
+ namespace join {
+
+ namespace detail {
+
+ template <size_t size>
+ struct bitmap {
+ enum {
+ max_size = size
+ };
+ typedef typename boost::mpl::if_c<(size<=32), unsigned int, std::bitset<size> >::type bitset_type;
+ bitset_type bitmap_;
+ bitmap(unsigned int val=0) : bitmap_(val) {}
+ void set(bitmap &t) { bitmap_ |= t.bitmap_; }
+ void clear(bitmap &t) { bitmap_ &= ~t.bitmap_; }
+ bool test(bitmap &t) { return (bitmap_ & t.bitmap_) != 0; }
+ bool match(bitmap &b) { return (~bitmap_ & b.bitmap_) == 0; }
+ bool operator==(bitmap &t) { return bitmap_ == t.bitmap_; }
+ size_t num_of_ones(void) {
+ size_t num = 0;
+ for(int i = 0; i<max_size; i++)
+ if (bitmap_ & (1<<i)) num++;
+ return num;
+ }
+ };
+
+ template <typename PortT>
+ typename PortT::argument_type
+ top(PortT *p) {
+ return p->top();
+ }
+
+ template <typename PortT>
+ std::vector<typename PortT::argument_type>
+ top(std::vector<PortT*> &vp) {
+ std::vector<typename PortT::argument_type> vv;
+ for(size_t i=0; i<vp.size(); i++)
+ vv.push_back(vp[i]->top());
+ return vv;
+ }
+
+ template <typename PortT>
+ void pop_top(PortT *p) {
+ p->pop_top();
+ }
+
+ template <typename PortT>
+ void pop_top(std::vector<PortT*> &vp) {
+ for(size_t i=0; i<vp.size(); i++)
+ vp[i]->pop_top();
+ }
+
+ template <typename P> struct arg_type {};
+ template <typename PortT>
+ struct arg_type<PortT*> {
+ typedef typename PortT::argument_type result;
+ };
+
+ template <typename PortT>
+ struct arg_type<std::vector<PortT*> > {
+ typedef std::vector<typename PortT::argument_type> result;
+ };
+
+ template <typename P> struct res_type {};
+ template <typename PortT>
+ struct res_type<PortT*> {
+ typedef typename PortT::result_type result;
+ };
+
+ template <typename PortT>
+ struct res_type<std::vector<PortT*> > {
+ typedef void result;
+ };
+
+ template <typename PortT>
+ struct impl_ptr {
+ typedef typename PortT::ImplT* result;
+ };
+
+ template <typename PortT>
+ struct impl_ptr<std::vector<PortT> > {
+ typedef std::vector<typename PortT::ImplT*> result;
+ };
+
+ template <typename PortT>
+ bool is_synch_port(PortT *p) {
+ return p->type_ == port::synch;
+ }
+ template <typename PortT>
+ bool is_synch_port(std::vector<PortT*> &vp) {
+ if (vp[0]->type_ == port::synch)
+ throw single_synch_exception();
+ return false;
+ }
+
+
+ //------- chord definitions ------
+
+ template <size_t max_size>
+ class chord_common {
+ public:
+ bitmap<max_size> mask_;// ports of this chord
+ size_t num_ports_; //for scheduling policy fire_as_much_as_possible
+ int priority_; //sched priority of chord: 0 - max, the bigger the lower
+
+ chord_common(bitmap<max_size> &m, int pri) :
+ mask_(m), priority_(pri) {
+ num_ports_ = m.num_of_ones();
+ }
+ virtual ~chord_common() {}
+ virtual bool fire(port *caller_port) = 0;
+ };
+
+ //for chords which have a return type (so it must have a synch port)
+ template <typename ResT>
+ class chord_fire : public chord_base<ResT> {
+ public:
+ joint_base *joint_;
+ synch_port<ResT> *synch_p_;
+ chord_fire(joint_base *a, port *p) : joint_(a), synch_p_(0) {
+ synch_p_ = static_cast<synch_port<ResT>*>(p);
+ }
+ bool fire_chord(port *caller_port) {
+ joint_->log.msg("a chord fired");
+ if(caller_port->type_ == port::synch) {
+ //caller is sync, caller_port and synch_p_ should be the same
+ //invoke callback later in the same caller thread
+ synch_p_->add_chord_fired(this);
+ return true;
+ }
+ else {//caller is async_p
+ synch_p_->transfer(this);
+ return false;
+ }
+ }
+ };
+
+ template <>
+ class chord_fire<void> : public chord_base<void> {
+ public:
+ joint_base *joint_;
+ synch_port<void> *synch_p_;
+ chord_fire(joint_base *a, port *p) : joint_(a), synch_p_(0) {
+ if (is_synch_port(p)) synch_p_ = static_cast<synch_port<void>*>(p);
+ }
+ bool fire_chord(port *caller_port) {
+ joint_->log.msg("a chord fired");
+ if(caller_port->type_ == port::synch) {
+ //caller is sync, caller_port and synch_p_ should be the same
+ //invoke callback later in the same caller thread
+ synch_p_->add_chord_fired(this);
+ return true;
+ }
+ else {//caller is async_p
+ if (synch_p_) {
+ synch_p_->transfer(this);
+ return false;
+ }
+ else { //no sync ports in chord
+ if(joint_->has_spawn()) {
+ //spawn the callback later
+ static_cast<async_port*>(caller_port)->add_chord_fired(this);
+ return true;
+ } else
+ throw no_executor_exception();
+ }
+ }
+ return false;
+ }
+ };
+
+ template <size_t max_size, typename PortPtrT, typename CallT>
+ class chord1 : public chord_common<max_size>, public chord_fire<typename res_type<PortPtrT>::result> {
+ typedef typename res_type<PortPtrT>::result result_type;
+ typedef typename arg_type<PortPtrT>::result argument_type;
+ public:
+ PortPtrT port_;
+ boost::function1<result_type, argument_type> call_;
+ chord1(bitmap<max_size> &m, joint_base* a, PortPtrT p, CallT c, int pri) :
+ chord_common<max_size>(m,pri),
+ chord_fire<result_type>(a,p),
+ port_(p), call_(c) {}
+ void capture_arguments(boost::function0<result_type> &cb) {
+ cb = boost::bind(call_, top(port_));
+ pop_top(port_);
+ }
+ bool fire(port *caller_port) {
+ return chord_fire<result_type>::fire_chord(caller_port);
+ }
+ };
+
+ template <size_t max_size, typename PortPtrT1, typename PortPtrT2, typename CallT>
+ class chord2 : public chord_common<max_size>, public chord_fire<typename res_type<PortPtrT1>::result> {
+ typedef typename res_type<PortPtrT1>::result result_type;
+ typedef typename arg_type<PortPtrT1>::result argument1_type;
+ typedef typename arg_type<PortPtrT2>::result argument2_type;
+ public:
+ PortPtrT1 port1_;
+ PortPtrT2 port2_;
+ boost::function2<result_type, argument1_type, argument2_type> call_;
+ chord2(bitmap<max_size> &m, joint_base* a, PortPtrT1 p1, PortPtrT2 p2, CallT c, int pri) :
+ chord_common<max_size>(m,pri),
+ chord_fire<result_type>(a,p1),
+ port1_(p1), port2_(p2), call_(c) {}
+ void capture_arguments(boost::function0<result_type> &cb) {
+ cb = boost::bind(call_, top(port1_), top(port2_));
+ pop_top(port1_);
+ pop_top(port2_);
+ }
+ bool fire(port *caller_port) {
+ return chord_fire<result_type>::fire_chord(caller_port);
+ }
+ };
+
+ template <size_t max_size, typename PortPtrT1, typename PortPtrT2, typename PortPtrT3, typename CallT>
+ class chord3 : public chord_common<max_size>, public chord_fire<typename res_type<PortPtrT1>::result> {
+ typedef typename res_type<PortPtrT1>::result result_type;
+ typedef typename arg_type<PortPtrT1>::result argument1_type;
+ typedef typename arg_type<PortPtrT2>::result argument2_type;
+ typedef typename arg_type<PortPtrT3>::result argument3_type;
+ public:
+ PortPtrT1 port1_;
+ PortPtrT2 port2_;
+ PortPtrT3 port3_;
+ boost::function3<result_type, argument1_type, argument2_type, argument3_type> call_;
+ chord3(bitmap<max_size> &m, joint_base* a, PortPtrT1 p1, PortPtrT2 p2, PortPtrT3 p3, CallT c, int pri) :
+ chord_common<max_size>(m,pri),
+ chord_fire<result_type>(a,p1),
+ port1_(p1), port2_(p2), port3_(p3), call_(c) {}
+ void capture_arguments(boost::function0<result_type> &cb) {
+ cb = boost::bind(call_, top(port1_), top(port2_), top(port3_));
+ pop_top(port1_);
+ pop_top(port2_);
+ pop_top(port3_);
+ }
+ bool fire(port *caller_port) {
+ return chord_fire<result_type>::fire_chord(caller_port);
+ }
+ };
+
+ }
+
+ //------ schedulers ------
+
+ enum schedule_policy {
+ schedule_first_match, //(first match will fire)
+ schedule_longest_match, //(longest match will fire)
+ schedule_round_robin //(round robin)
+ };
+
+ namespace detail {
+
+ template <size_t max_size>
+ struct sched_data_base {
+ port *port_;
+ bitmap<max_size> mask_;
+ sched_data_base() :
+ port_(0), mask_(0) {
+ }
+ };
+
+ //basic schedulers
+ template <size_t max_size>
+ struct sched_data : public sched_data_base<max_size> {
+ typedef chord_common<max_size> chord_type;
+ std::vector<chord_type*> chords_; //chords this port participates in
+ sched_data() : sched_data_base<max_size>() {}
+ std::vector<chord_type*> get_chords() { return chords_; }
+ void port_add_chord(chord_type *c, int) {
+ chords_.push_back(c);
+ }
+ void port_del_chord(chord_type *c, int p, bool can_detach) {
+ typename std::vector<chord_type*>::iterator iter;
+ if ((iter = std::find(chords_.begin(), chords_.end(), c)) != chords_.end())
+ chords_.erase(iter);
+ if(chords_.size() == 0) {
+ this->port_->reset();
+ if (can_detach) this->port_->detach_joint();
+ this->port_ = 0;
+ }
+ }
+ };
+
+
+ //simple priority based schedulers
+ template <size_t max_size>
+ struct sched_pri_data : public sched_data_base<max_size> {
+ typedef chord_common<max_size> chord_type;
+ std::vector<std::vector<chord_type*> > chords_; //chords this port participates in
+ sched_pri_data() : sched_data_base<max_size>() {
+ }
+ std::vector<chord_type*> get_chords() {
+ std::vector<chord_type*> chds;
+ for(size_t i=0; i<chords_.size(); i++)
+ for(size_t j=0; j<chords_[i].size(); j++)
+ chds.push_back(chords_[i][j]);
+ return chds;
+ }
+ void port_add_chord(chord_type *c, int priority) {
+ if(chords_.size() < ((size_t)priority+1))
+ for(size_t i=chords_.size(); i<((size_t)priority+1); i++) {
+ chords_.push_back(std::vector<chord_type*>());
+ }
+ chords_[priority].push_back(c);
+ }
+ void port_del_chord(chord_type *c, int priority, bool can_detach) {
+ typename std::vector<chord_type*>::iterator iter;
+ if ((iter = std::find(chords_[priority].begin(), chords_[priority].end(), c)) != chords_[priority].end())
+ chords_[priority].erase(iter);
+ bool bound = false;
+ for(size_t i=0; i<chords_.size() && !bound; i++)
+ if(chords_[i].size() > 0) bound = true;
+ if (!bound) {
+ this->port_->reset();
+ if (can_detach) this->port_->detach_joint();
+ this->port_ = 0;
+ }
+ }
+ };
+
+ }
+
+ template <size_t max_size>
+ struct sched_first_match : public detail::sched_data<max_size> {
+ typedef detail::bitmap<max_size> bitmap_t;
+ typedef detail::chord_common<max_size> chord_type;
+ enum { policy = schedule_first_match };
+ chord_type * scan_chords(bitmap_t status_) {
+ chord_type *chord_ready= 0;
+ for (size_t i=0; i<this->chords_.size() && chord_ready == 0; i++) {
+ if (status_.match(this->chords_[i]->mask_))
+ chord_ready = this->chords_[i];
+ }
+ return chord_ready;
+ }
+ };
+
+ template <size_t max_size>
+ struct sched_longest_match : public detail::sched_data<max_size> {
+ enum { policy = schedule_longest_match };
+ typedef detail::bitmap<max_size> bitmap_t;
+ typedef detail::chord_common<max_size> chord_type;
+ chord_type * scan_chords(bitmap_t status_) {
+ chord_type *chord_ready= 0;
+ size_t chord_size = 0;
+ for(size_t i=0; i<this->chords_.size(); i++) {
+ if(this->chords_[i]->num_ports_ > chord_size && status_.match(this->chords_[i]->mask_)) {
+ chord_ready = this->chords_[i];
+ chord_size = chord_ready->num_ports_;
+ }
+ }
+ return chord_ready;
+ }
+ };
+
+ template <size_t max_size>
+ struct sched_round_robin : public detail::sched_data<max_size> {
+ enum { policy = schedule_round_robin };
+ typedef detail::bitmap<max_size> bitmap_t;
+ typedef detail::chord_common<max_size> chord_type;
+ int last_chord_fired_;
+ sched_round_robin() : detail::sched_data<max_size>(), last_chord_fired_(-1) {}
+ chord_type * scan_chords(bitmap_t status_) {
+ chord_type *chord_ready= 0;
+ size_t min = last_chord_fired_+1;
+ for(size_t j=min; j<this->chords_.size() && chord_ready == 0; j++) {
+ if(status_.match(this->chords_[j]->mask_)) {
+ chord_ready = this->chords_[j];
+ last_chord_fired_ = (int)j;
+ }
+ }
+ if (chord_ready == 0 && min > 0)
+ for(size_t j=0; j<min && chord_ready == 0; j++) {
+ if(status_.match(this->chords_[j]->mask_)) {
+ chord_ready = this->chords_[j];
+ last_chord_fired_ = (int)j;
+ }
+ }
+ return chord_ready;
+ }
+ };
+
+ template <size_t max_size>
+ struct sched_pri_first_match : public detail::sched_pri_data<max_size> {
+ enum { policy = schedule_first_match };
+ typedef detail::bitmap<max_size> bitmap_t;
+ typedef detail::chord_common<max_size> chord_type;
+ chord_type * scan_chords(bitmap_t status_) {
+ chord_type *chord_ready= 0;
+ for(size_t i=0; i<this->chords_.size() && chord_ready == 0; i++)
+ for(size_t j=0; j<this->chords_[i].size() && chord_ready == 0; j++) {
+ if(status_.match(this->chords_[i][j]->mask_))
+ chord_ready = this->chords_[i][j];
+ }
+ return chord_ready;
+ }
+ };
+
+ template <size_t max_size>
+ struct sched_pri_longest_match : public detail::sched_pri_data<max_size> {
+ enum { policy = schedule_longest_match };
+ typedef detail::bitmap<max_size> bitmap_t;
+ typedef detail::chord_common<max_size> chord_type;
+ chord_type * scan_chords(bitmap_t status_) {
+ chord_type *chord_ready= 0;
+ for(size_t i=0; i<this->chords_.size() && chord_ready == 0; i++) {
+ size_t chord_size = 0;
+ for(size_t j=0; j<this->chords_[i].size(); j++) {
+ if(this->chords_[i][j]->num_ports_ > chord_size) {
+ if(status_.match(this->chords_[i][j]->mask_)) {
+ chord_ready = this->chords_[i][j];
+ chord_size = chord_ready->num_ports_;
+ }
+ }
+ }
+ }
+ return chord_ready;
+ }
+ };
+
+ template <size_t max_size>
+ struct sched_pri_round_robin : public detail::sched_pri_data<max_size> {
+ enum { policy = schedule_round_robin };
+ typedef detail::bitmap<max_size> bitmap_t;
+ typedef detail::chord_common<max_size> chord_type;
+ std::vector<int> last_chord_fired_; //for roundrobin dispatching
+ sched_pri_round_robin() : detail::sched_pri_data<max_size>() {
+ last_chord_fired_.push_back(-1);
+ }
+ void port_add_chord(chord_type *c, int priority) {
+ if(this->chords_.size() < ((size_t)priority+1))
+ for(size_t i=this->chords_.size(); i<((size_t)priority+1); i++) {
+ this->chords_.push_back(std::vector<chord_type*>());
+ last_chord_fired_.push_back(-1);
+ }
+ this->chords_[priority].push_back(c);
+ }
+ void port_del_chord(chord_type *c, int priority, bool can_detach) {
+ typename std::vector<chord_type*>::iterator iter;
+ if ((iter = std::find(this->chords_[priority].begin(), this->chords_[priority].end(), c)) != this->chords_[priority].end()) {
+ this->chords_[priority].erase(iter);
+ last_chord_fired_[priority] = -1;
+ }
+ bool bound = false;
+ for(size_t i=0; i<this->chords_.size() && !bound; i++)
+ if(this->chords_[i].size() > 0) bound = true;
+ if (!bound) {
+ this->port_->reset();
+ if(can_detach) this->port_->detach_joint();
+ this->port_ = 0;
+ }
+ }
+ chord_type * scan_chords(bitmap_t status_) {
+ chord_type *chord_ready= 0;
+ for(size_t i=0; i<this->chords_.size() && chord_ready == 0; i++)
+ if(this->chords_[i].size()>0) {
+ size_t min = last_chord_fired_[i]+1;
+ for(size_t j=min; j<this->chords_[i].size() && chord_ready == 0; j++) {
+ if(status_.match(this->chords_[i][j]->mask_)) {
+ chord_ready = this->chords_[i][j];
+ last_chord_fired_[i] = (int)j;
+ }
+ }
+ if (chord_ready == 0 && min > 0)
+ for(size_t j=0; j<min && chord_ready == 0; j++) {
+ if(status_.match(this->chords_[i][j]->mask_)) {
+ chord_ready = this->chords_[i][j];
+ last_chord_fired_[i] = (int)j;
+ }
+ }
+ }
+ return chord_ready;
+ }
+ };
+
+ //------ joint_impl definition ------
+
+ namespace detail {
+
+ template <
+ template <size_t> class scheduler=sched_first_match,
+ size_t max_size=32
+ >
+ class joint_impl : public joint_base, private boost::noncopyable {
+ template <size_t> friend class chord_common;
+ friend class port;
+
+ public:
+ typedef bitmap<max_size> bitmap_t;
+ typedef scheduler<max_size> sched_type;
+ typedef typename joint_base::callable callable;
+ typedef function1<void, typename joint_base::callable> spawn_type;
+ typedef chord_common<max_size> chord_type;
+ typedef sched_type port_data;
+
+ private:
+
+ spawn_type spawn_;
+ bitmap_t status_; //bitmap marking which port is active
+ int heartbeat_;
+ std::vector<port_data> ports_; //all unique ports in this joint, joint not owning
+ //them and will not destroy them, joint will destroy port_data
+ std::vector<boost::shared_ptr<chord_type> > chords_; //joint owns chords_ and will destroy them
+
+ public:
+ joint_impl(spawn_type s = 0,
+ int hb = 0, //forever
+ const char *name = 0) :
+ joint_base(name), spawn_(s), status_(0), heartbeat_(hb) {}
+
+ ~joint_impl() {
+ log.msg("joint is being destructed...");
+ reset();
+ log.msg("joint is dead now...");
+ }
+
+ void reset() {
+ log.msg("joint reset...");
+ boost::mutex::scoped_lock lock(mutex_);
+ internal_reset();
+ }
+
+ bool has_spawn(void) { return spawn_ != 0; }
+ void spawn(callable c) {
+ spawn_(c);
+ }
+
+ schedule_policy my_scheduler(void) { return static_cast<schedule_policy>(sched_type::policy); }
+
+ bool check_heartbeat() {
+ if (heartbeat_ > 0) {
+ heartbeat_--;
+ if (heartbeat_ == 0) {
+ log.msg("heartbeat expires, joint reset...");
+ internal_reset();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private:
+ void internal_reset() {
+ chords_remove(); //remove all remaining chords
+ ports_.clear();
+ chords_.clear();
+ status_ = bitmap_t(0);
+ }
+
+ void port_remove(int ind) {
+ boost::mutex::scoped_lock lock(mutex_);
+ log.stream() << "port_remove: port [" << ind << "]" << logger::endl;
+ port_data &pd(ports_[ind]);
+ std::vector<chord_type*> chds = pd.get_chords();
+ for(size_t i=0; i<chds.size(); i++) {
+ chord_type *chd = chds[i];
+ for(size_t j=0; j<ports_.size(); j++) {
+ if(ports_[j].port_ != 0 && chd->mask_.test(ports_[j].mask_)) {
+ if((size_t)ind == j)
+ ports_[j].port_del_chord(chd, chd->priority_, false);
+ else
+ ports_[j].port_del_chord(chd, chd->priority_, true);
+ }
+ }
+ del_chord(chd->mask_);
+ }
+ }
+
+ void chords_remove() {
+ typename std::vector<boost::shared_ptr<chord_type> >::iterator iter;
+ for(iter = chords_.begin(); iter != chords_.end(); ) {
+ boost::shared_ptr<chord_type> chd = *iter;
+ for(size_t j=0; j<ports_.size(); j++) {
+ if (ports_[j].port_ != 0 && chd->mask_.test(ports_[j].mask_)) {
+ ports_[j].port_del_chord(chd.get(), chd->priority_, true);
+ }
+ }
+ log.msg("one chord removed");
+ iter = chords_.erase(iter);
+ }
+ }
+
+ //ports call this to notify that a new msg comes
+ //return: true - chord fired, false - chord not fired
+ bool port_invoked(int ind) {
+ port_data &pd(ports_[ind]);
+ pd.port_->num_pending_++;
+ if(status_.test(pd.mask_)) {
+ //already set
+ //log.msg("port_invoked: a port add more pending calls");
+ log.stream() << "port_invoked: a port [" << ind << "] add more pending calls" << logger::endl;
+ //do nothing
+ } else {
+ status_.set(pd.mask_);
+ //log.msg("port_invoked: a empty port get a call");
+ log.stream() << "port_invoked: a empty port [" << ind << "] get a call" << logger::endl;
+ chord_type *c = pd.scan_chords(status_); //find chord to fire based on dispatch_policy
+ if(c != 0) {
+ //update msg arrival status first, mark msgs to be consumed as
+ //"unavailable", kinds of "reserve" these msgs
+ //so that thread-scheduling will not interfere: another thread may come in
+ //between try to take some of these msgs
+ update_status(c->mask_);
+ return c->fire(pd.port_);
+ }
+ }
+ return false; //no chord fired
+ }
+
+ //one msg will be taken from each port in mask
+ //query these ports to see if any msg remaininf and
+ //if its bit in status_ should be flipped
+ void update_status(bitmap_t &chord_mask) {
+ for(size_t i=0; i<ports_.size(); i++)
+ if(chord_mask.test(ports_[i].mask_)) {
+ ports_[i].port_->num_pending_--;
+ if(ports_[i].port_->num_pending_ == 0)
+ status_.clear(ports_[i].mask_);
+ }
+ }
+
+ void port_revoked(int ind) {
+ port_data &pd(ports_[ind]);
+ pd.port_->num_pending_--;
+ if(pd.port_->num_pending_ == 0)
+ status_.clear(pd.mask_);
+ }
+
+ //
+ // utils methods for creating chords
+ //
+ template <typename PortT>
+ void add_port(PortT *pp, bitmap_t &bmap) {
+ port_data pd;
+ int ind = -1;
+ int empty = -1;
+ for(size_t i=0; i<ports_.size() && ind == -1;i++)
+ if(pp == ports_[i].port_) {
+ ind = (int)i;
+ pd = ports_[i];
+ }
+ else if (ports_[i].port_ == 0 && empty == -1)
+ empty = i;
+ if (ind == -1) {
+ if (empty >= 0) ind = empty;
+ else ind = (int)ports_.size();
+ if((size_t)ind >= max_size) {
+ log.msg("too_many_ports_exception thrown");
+ throw too_many_ports_exception();
+ }
+ pd.port_ = pp;
+ pd.mask_ = bitmap_t(1<<ind);
+ ports_.push_back(pd);
+ pp->index_ = ind;
+ pp->num_pending_ = 0;
+ }
+ bmap.set(pd.mask_);
+ }
+ template <typename PortT>
+ void add_port(std::vector<PortT*> &vp, bitmap_t &bmap) {
+ for(size_t i=0; i<vp.size(); i++)
+ add_port(vp[i], bmap);
+ }
+ template <typename PortT>
+ void find_port(PortT *p, bitmap_t &bmap) {
+ bmap.set(ports_[p->index_].mask_);
+ }
+ template <typename PortT>
+ void find_port(std::vector<PortT*> &vp, bitmap_t &bmap) {
+ for(size_t i=0; i<vp.size(); i++)
+ find_port(vp[i], bmap);
+ }
+ template <typename PortT>
+ void port_add_chord(PortT *p, chord_type *c, int priority) {
+ port_data &pd(ports_[p->index_]);
+ pd.port_add_chord(c, priority);
+ }
+ template <typename PortT>
+ void port_add_chord(std::vector<PortT*> &vp, chord_type *c, int priority) {
+ for(size_t i=0; i<vp.size(); i++)
+ port_add_chord(vp[i], c, priority);
+ }
+ template <typename PortT>
+ void port_del_chord(PortT *p, chord_type *c, int priority) {
+ port_data &pd(ports_[p->index_]);
+ pd.port_del_chord(c, priority, true);
+ }
+ template <typename PortT>
+ void port_del_chord(std::vector<PortT*> &vp, chord_type *c, int priority) {
+ for(size_t i=0; i<vp.size(); i++)
+ port_del_chord(vp[i], c, priority);
+ }
+ bool find_chord(bitmap_t &bmap, boost::shared_ptr<chord_type> &cd) {
+ for(size_t i=0; i<chords_.size(); i++) {
+ if (chords_[i]->mask_ == bmap) {
+ cd = chords_[i];
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void del_chord(bitmap_t &bmap) {
+ log.msg("del_chord...");
+ typename std::vector<boost::shared_ptr<chord_type> >::iterator iter;
+ for(iter = chords_.begin(); iter != chords_.end(); iter++) {
+ if ((*iter)->mask_ == bmap) {
+ chords_.erase(iter);
+ return;
+ }
+ }
+ }
+
+ bool del_chord(bitmap_t &bmap, boost::shared_ptr<chord_type> &cd) {
+ typename std::vector<boost::shared_ptr<chord_type> >::iterator iter;
+ for(iter = chords_.begin(); iter != chords_.end(); iter++) {
+ if ((*iter)->mask_ == bmap) {
+ cd = (*iter);
+ chords_.erase(iter);
+ return true;
+ }
+ }
+ return false;
+ }
+ void chord_data_validation(bitmap_t &bmap) {
+ if (static_cast<schedule_policy>(sched_type::policy) == schedule_first_match) {
+ //check_hidden_chord(bitmap_t &bmap)
+ for(size_t i=0; i<chords_.size(); i++) {
+ if (chords_[i]->mask_.match(bmap) || bmap.match(chords_[i]->mask_))
+ throw hidden_chord_exception();
+ }
+ }
+ boost::shared_ptr<chord_type> cd;
+ if(find_chord(bmap, cd))
+ throw hidden_chord_exception();
+ }
+
+ public:
+
+ //
+ // ***** "factory" methods to create chords ******
+ //
+ //--- chord with 1 port ---
+ template <typename PortPtrT, typename CallT>
+ void chord(PortPtrT p, CallT c, int priority) {
+ bitmap_t bmap;
+ add_port(p, bmap);
+ is_synch_port(p); // check against vector of synch
+ chord_data_validation(bmap);
+ boost::shared_ptr<chord_type> cd(new chord1<max_size,PortPtrT,CallT>(bmap, this, p, c, priority));
+ chords_.push_back(cd);
+ port_add_chord(p,cd.get(),priority);
+ }
+
+ template <typename PortPtrT>
+ void chord_remove(PortPtrT p) {
+ bitmap_t bmap;
+ find_port(p, bmap);
+ boost::shared_ptr<chord_type> cd;
+ if(!del_chord(bmap,cd))
+ throw chord_remove_exception();
+ port_del_chord(p, cd.get(), cd->priority_);
+ }
+
+ template <typename PortPtrT, typename CallT>
+ void chord_override(PortPtrT p, CallT c, int priority) {
+ bitmap_t bmap;
+ find_port(p, bmap);
+ boost::shared_ptr<chord_type> cd;
+ if(find_chord(bmap, cd)) { //override chord
+ chord1<max_size,PortPtrT,CallT> *cdp = static_cast<chord1<max_size,PortPtrT,CallT>*>(cd.get());
+ cdp->call_ = c;
+ if (cdp->priority_ != priority) {
+ port_del_chord(p, cdp, cdp->priority_);
+ port_add_chord(p, cdp, priority);
+ cdp->priority_ = priority;
+ }
+ }
+ else
+ throw chord_override_exception();
+ }
+
+ // ---- chord with 2 ports ----
+ template <typename PortPtrT1, typename PortPtrT2, typename CallT>
+ void chord(PortPtrT1 p1, PortPtrT2 p2, CallT c, int priority)
+ {
+ log.msg("joint_impl::chord 2 ports");
+ bitmap_t bmap;
+ is_synch_port(p1); // check against vector of synch
+ if (is_synch_port(p2))
+ throw synch_not_1st_exception();
+ add_port(p1, bmap);
+ add_port(p2, bmap);
+ chord_data_validation(bmap);
+ boost::shared_ptr<chord_type> cd(new chord2<max_size,PortPtrT1,PortPtrT2,CallT>(bmap, this, p1, p2, c, priority));
+ chords_.push_back(cd);
+ port_add_chord(p1,cd.get(),priority);
+ port_add_chord(p2,cd.get(),priority);
+ }
+
+ template <typename PortPtrT1, typename PortPtrT2>
+ void chord_remove(PortPtrT1 p1, PortPtrT2 p2) {
+ bitmap_t bmap;
+ find_port(p1, bmap);
+ find_port(p2, bmap);
+ boost::shared_ptr<chord_type> cd;
+ if(!del_chord(bmap, cd))
+ throw chord_remove_exception();
+ port_del_chord(p1, cd.get(), cd->priority_);
+ port_del_chord(p2, cd.get(), cd->priority_);
+ }
+
+ template <typename PortPtrT1, typename PortPtrT2, typename CallT>
+ void chord_override(PortPtrT1 p1, PortPtrT2 p2, CallT c, int priority) {
+ bitmap_t bmap;
+ find_port(p1, bmap);
+ find_port(p2, bmap);
+ boost::shared_ptr<chord_type> cd;
+ if(find_chord(bmap, cd)) { //override chord
+ chord2<max_size,PortPtrT1,PortPtrT2,CallT>* cdp = static_cast<chord2<max_size,PortPtrT1,PortPtrT2,CallT>*>(cd.get());
+ cdp->call_ = c;
+ if (cdp->priority_ != priority) {
+ port_del_chord(p1, cdp, cdp->priority_);
+ port_del_chord(p2, cdp, cdp->priority_);
+ port_add_chord(p1, cdp, priority);
+ port_add_chord(p2, cdp, priority);
+ cdp->priority_ = priority;
+ }
+ }
+ else
+ throw chord_override_exception();
+ }
+
+ //---- chord with 3 ports ---
+
+ template <typename PortPtrT1, typename PortPtrT2, typename PortPtrT3, typename CallT>
+ void chord(PortPtrT1 p1, PortPtrT2 p2, PortPtrT3 p3, CallT c, int priority)
+ {
+ bitmap_t bmap;
+ is_synch_port(p1); // check against vector of synch
+ if (is_synch_port(p2) || is_synch_port(p3))
+ throw synch_not_1st_exception();
+ add_port(p1, bmap);
+ add_port(p2, bmap);
+ add_port(p3, bmap);
+ chord_data_validation(bmap);
+ boost::shared_ptr<chord_type> cd(new chord3<max_size,PortPtrT1,PortPtrT2,PortPtrT3,CallT>(bmap, this, p1, p2, p3, c, priority));
+ chords_.push_back(cd);
+ port_add_chord(p1,cd.get(),priority);
+ port_add_chord(p2,cd.get(),priority);
+ port_add_chord(p3,cd.get(),priority);
+ }
+
+ template <typename PortPtrT1, typename PortPtrT2, typename PortPtrT3>
+ void chord_remove(PortPtrT1 &p1, PortPtrT2 &p2, PortPtrT3 &p3) {
+ bitmap_t bmap;
+ find_port(p1, bmap);
+ find_port(p2, bmap);
+ find_port(p3, bmap);
+ boost::shared_ptr<chord_type> cd;
+ if(!del_chord(bmap, cd))
+ throw chord_remove_exception();
+ port_del_chord(p1, cd.get(), cd->priority_);
+ port_del_chord(p2, cd.get(), cd->priority_);
+ port_del_chord(p3, cd.get(), cd->priority_);
+ }
+
+ template <typename PortPtrT1, typename PortPtrT2, typename PortPtrT3, typename CallT>
+ void chord_override(PortPtrT1 p1, PortPtrT2 p2, PortPtrT3 p3, CallT c, int priority) {
+ bitmap_t bmap;
+ find_port(p1, bmap);
+ find_port(p2, bmap);
+ find_port(p3, bmap);
+ boost::shared_ptr<chord_type> cd;
+ if(find_chord(bmap, cd)) { //override chord
+ chord3<max_size,PortPtrT1,PortPtrT2,PortPtrT3,CallT>* cdp = static_cast<chord3<max_size,PortPtrT1,PortPtrT2,PortPtrT3,CallT>*>(cd.get());
+ cdp->call_ = c;
+ if (cdp->priority_ != priority) {
+ port_del_chord(p1, cdp, cdp->priority_);
+ port_del_chord(p2, cdp, cdp->priority_);
+ port_del_chord(p3, cdp, cdp->priority_);
+ port_add_chord(p1, cdp, priority);
+ port_add_chord(p2, cdp, priority);
+ port_add_chord(p3, cdp, priority);
+ cdp->priority_ = priority;
+ }
+ }
+ else
+ throw chord_override_exception();
+ }
+
+ };
+
+ }
+
+ template <
+ template <size_t> class scheduler=sched_first_match,
+ size_t max_size=32
+ >
+ class joint_t {
+ public:
+ typedef detail::joint_impl<scheduler, max_size> ImplT;
+ typedef typename ImplT::sched_type sched_type;
+ typedef typename ImplT::callable callable;
+ typedef typename ImplT::spawn_type spawn_type;
+
+ private:
+ boost::shared_ptr<ImplT> pimpl_;
+
+ template <typename PortT>
+ typename PortT::ImplT* get_impl_ptr(PortT &p) {
+ if (p.pimpl_ == 0 || p.pimpl_->joint_ == 0) {
+ log->msg("a new port impl is created");
+ p.pimpl_.reset(new typename PortT::ImplT(p.max_size_, pimpl_));
+ }
+ if(p.pimpl_->joint_ != 0 && p.pimpl_->joint_ != pimpl_)
+ throw double_association_exception();
+ return p.pimpl_.get();
+ }
+
+ template <typename PortT>
+ std::vector<typename PortT::ImplT*> get_impl_ptr(std::vector<PortT> &vp) {
+ if (vp[0].pimpl_ == 0 || vp[0].pimpl_->joint_ == 0) {
+ for(size_t i=0; i<vp.size(); i++)
+ vp[i].pimpl_.reset(new typename PortT::ImplT(vp[i].max_size_, pimpl_));
+ }
+ if(vp[0].pimpl_->joint_ != 0 && vp[0].pimpl_->joint_ != pimpl_)
+ throw double_association_exception();
+ std::vector<typename PortT::ImplT*> vimpl;
+ for(size_t i=0; i<vp.size(); i++)
+ vimpl.push_back(vp[i].pimpl_.get());
+ return vimpl;
+ }
+
+ template <typename PortT, typename exception_type>
+ typename PortT::ImplT* check_port(PortT &p) {
+ if(p.pimpl_ == 0 || p.pimpl_->joint_ == 0 || p.pimpl_->joint_ != pimpl_)
+ throw new exception_type();
+ return p.pimpl_.get();
+ }
+
+ template <typename PortT, typename exception_type>
+ std::vector<typename PortT::ImplT*> check_port(std::vector<PortT> &vp) {
+ std::vector<typename PortT::ImplT*> vimpl;
+ for(size_t i=0; i<vp.size(); i++)
+ vimpl.push_back(check_port<PortT, exception_type>(vp[i]));
+ return vimpl;
+ }
+
+ public:
+ logger *log;
+
+ joint_t(spawn_type s = 0,
+ int hb = 0, //forever
+ const char *name = 0) : pimpl_(new ImplT(s, hb, name)), log(&(pimpl_->log)) {
+ log->msg("joint_t constructor");
+ }
+
+ schedule_policy my_scheduler(void) {
+ return pimpl_->my_scheduler();
+ }
+
+ void reset() { pimpl_->reset(); }
+
+ //
+ // ***** "factory" methods to create chords ******
+ //
+ //--- chord with 1 port ---
+ template <typename PortT, typename CallT>
+ joint_t& chord(PortT &p, CallT c, int priority=0) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT>::result pp = get_impl_ptr(p);
+ pimpl_->chord(pp, c, priority);
+ return *this;
+ }
+
+ template <typename PortT>
+ joint_t& chord_remove(PortT &p) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT>::result pp = check_port<PortT,chord_remove_exception>(p);
+ pimpl_->chord_remove(pp);
+ return *this;
+ }
+
+ template <typename PortT, typename CallT>
+ joint_t& chord_override(PortT &p, CallT c, int priority=0) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT>::result pp = check_port<PortT,chord_override_exception>(p);
+ pimpl_->chord_override(pp, c, priority);
+ return *this;
+ }
+
+ //wrappers for pointers to member methods as chord body
+ template <typename PortT, typename ActorT, typename R, typename ArgT>
+ joint_t& chord(PortT &p,
+ R (ActorT::*c)(ArgT),
+ int priority=0) {
+ return chord(p, boost::bind(c, static_cast<ActorT*>(this), ::_1), priority);
+ }
+ template <typename PortT, typename ActorT, typename R, typename ArgT>
+ joint_t& chord_override(PortT &p,
+ R (ActorT::*c)(ArgT),
+ int priority=0) {
+ return chord_override(p, boost::bind(c, static_cast<ActorT*>(this), ::_1), priority);
+ }
+
+
+ // ---- chord with 2 ports ----
+ template <typename PortT1, typename PortT2, typename CallT>
+ joint_t& chord(PortT1 &p1, PortT2 &p2, CallT c, int priority=0)
+ {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT1>::result pp1 = get_impl_ptr(p1);
+ typename detail::impl_ptr<PortT2>::result pp2 = get_impl_ptr(p2);
+ pimpl_->chord(pp1, pp2, c, priority);
+ return *this;
+ }
+
+ template <typename PortT1, typename PortT2>
+ joint_t& chord_remove(PortT1 &p1, PortT2 &p2) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_remove_exception>(p1);
+ typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_remove_exception>(p2);
+ pimpl_->chord_remove(pp1, pp2);
+ return *this;
+ }
+
+ template <typename PortT1, typename PortT2, typename CallT>
+ joint_t& chord_override(PortT1 &p1, PortT2 &p2, CallT c, int priority=0) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_override_exception>(p1);
+ typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_override_exception>(p2);
+ pimpl_->chord_override(pp1, pp2, c, priority);
+ return *this;
+ }
+
+ //wrappers for pointers to member methods as chord body
+ template <typename PortT1, typename PortT2, typename ActorT, typename R, typename ArgT1, typename ArgT2>
+ joint_t& chord(PortT1 &p1, PortT2 &p2,
+ R (ActorT::*c)(ArgT1, ArgT2), int priority=0) {
+ return chord(p1, p2, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2), priority);
+ }
+ template <typename PortT1, typename PortT2, typename ActorT, typename R, typename ArgT1, typename ArgT2>
+ joint_t& chord_override(PortT1 &p1, PortT2 &p2,
+ R (ActorT::*c)(ArgT1, ArgT2), int priority=0) {
+ return chord_override(p1, p2, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2), priority);
+ }
+
+ //---- chord with 3 ports ---
+
+ template <typename PortT1, typename PortT2, typename PortT3, typename CallT>
+ joint_t& chord(PortT1 &p1, PortT2 &p2, PortT3 &p3, CallT c, int priority=0)
+ {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT1>::result pp1 = get_impl_ptr(p1);
+ typename detail::impl_ptr<PortT2>::result pp2 = get_impl_ptr(p2);
+ typename detail::impl_ptr<PortT3>::result pp3 = get_impl_ptr(p3);
+ pimpl_->chord(pp1, pp2, pp3, c, priority);
+ return *this;
+ }
+
+ template <typename PortT1, typename PortT2, typename PortT3>
+ joint_t& chord_remove(PortT1 &p1, PortT2 &p2, PortT3 &p3) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_remove_exception>(p1);
+ typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_remove_exception>(p2);
+ typename detail::impl_ptr<PortT3>::result pp3 = check_port<PortT3,chord_remove_exception>(p3);
+ pimpl_->chord_remove(pp1, pp2, pp3);
+ return *this;
+ }
+
+ template <typename PortT1, typename PortT2, typename PortT3, typename CallT>
+ joint_t& chord_override(PortT1 &p1, PortT2 &p2, PortT3 &p3, CallT c, int priority=0) {
+ boost::mutex::scoped_lock lock(pimpl_->mutex_);
+ typename detail::impl_ptr<PortT1>::result pp1 = check_port<PortT1,chord_override_exception>(p1);
+ typename detail::impl_ptr<PortT2>::result pp2 = check_port<PortT2,chord_override_exception>(p2);
+ typename detail::impl_ptr<PortT3>::result pp3 = check_port<PortT3,chord_override_exception>(p3);
+ pimpl_->chord_override(pp1, pp2, pp3, c, priority);
+ return *this;
+ }
+
+ //wrappers for pointers to member methods as chord body
+ template <typename PortT1, typename PortT2, typename PortT3, typename ActorT, typename R, typename ArgT1, typename ArgT2, typename ArgT3>
+ joint_t& chord(PortT1 &p1, PortT2 &p2, PortT3 &p3,
+ R (ActorT::*c)(ArgT1, ArgT2, ArgT3), int priority=0) {
+ return chord(p1, p2, p3, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2, ::_3), priority);
+ }
+ template <typename PortT1, typename PortT2, typename PortT3, typename ActorT, typename R, typename ArgT1, typename ArgT2, typename ArgT3>
+ joint_t& chord_override(PortT1 &p1, PortT2 &p2, PortT3 &p3,
+ R (ActorT::*c)(ArgT1, ArgT2, ArgT3), int priority=0) {
+ return chord_override(p1, p2, p3, boost::bind(c, static_cast<ActorT*>(this), ::_1, ::_2, ::_3)), priority;
+ }
+
+ };
+
+ class joint : public joint_t<> {
+ public:
+ joint(spawn_type s = 0,
+ int hb = 0,
+ const char *name = 0) :
+ joint_t<>(s,hb,name) {}
+ };
+
+ //utility function to define large/wide joint
+ template <
+ template <size_t> class scheduler,
+ size_t max_size
+ >
+ joint_t<scheduler, max_size> joins_t(joint::spawn_type s = 0, int hb = 0, const char *name = 0) {
+ return joint_t<scheduler, max_size>(s,hb,name);
+ }
+
+ //utility function to define small/default joint
+ joint joins(joint::spawn_type s = 0, int hb = 0, const char *name = 0) {
+ return joint(s,hb,name);
+ }
+
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/base/port.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/port.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,706 @@
+//
+// Boost/join/port.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_PORT_HPP
+#define BOOST_JOIN_PORT_HPP
+
+#include <exception>
+#include <stdexcept>
+#include <deque>
+#include <iostream>
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/condition.hpp>
+#include <boost/join/base/join_base.hpp>
+#include <boost/join/base/exceptions.hpp>
+
+namespace boost {
+ namespace join {
+
+ struct void_t {};
+
+ namespace detail {
+
+ class joint_base;
+ template <typename ResT> class chord_base;
+
+ class async_port : public port {
+ public:
+ async_port(shared_ptr<joint_base> j) : port(port::async, j), chord_fired_(0) {}
+ void add_chord_fired(chord_base<void> *c) { chord_fired_ = c; }
+ protected:
+ chord_base<void> *chord_fired_;
+ };
+
+ //QueT supports push_back()/pop_front()/front()
+ template <typename ArgT, typename QueT>
+ class async_impl1 : public async_port, private boost::noncopyable {
+ public:
+ typedef ArgT argument_type;
+ typedef void result_type;
+
+ async_impl1(size_t sz, shared_ptr<joint_base> j) : async_port(j), max_size_(sz), arg1_(0) {//sz==0 means unlimited
+ }
+
+ void operator()(ArgT t) {
+ boost::shared_ptr<joint_base> my_joint(joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ my_joint->log.msg("async_impl1::operator(arg) enter");
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ if(test_chord_fire()) { //notify joint about a new msg
+ arg1_ = &t; //setup my arg
+ boost::function0<void> cb;
+ chord_fired_->capture_arguments(cb);
+ chord_fired_ = 0;
+ my_joint->log.msg("a new async task spawned");
+ my_joint->check_heartbeat();
+ lock.unlock();
+ my_joint->spawn(cb); //"spawn" (from executor) has its own lock protection
+ return;
+ }
+ else {
+ my_joint->log.msg("async_impl1::operator(arg) arg queued");
+ arg_.push_back(t);
+ }
+ }
+ void put(ArgT t) {
+ operator()(t);
+ }
+ ArgT &top(void) {
+ if(arg1_ != 0)
+ return *arg1_;
+ else
+ return arg_.front();
+ }
+ void pop_top(void) {
+ if (arg1_ != 0)
+ arg1_ = 0;
+ else
+ arg_.pop_front();
+ }
+ void reset(void) {
+ port::reset();
+ arg_.clear();
+ arg1_ = 0;
+ }
+ private:
+ size_t max_size_;
+ QueT arg_;
+ ArgT *arg1_;
+ };
+
+ class async_impl0 : public async_port, private boost::noncopyable {
+ public:
+ typedef void_t argument_type;
+ typedef void result_type;
+
+ async_impl0(size_t sz, shared_ptr<joint_base> j) : async_port(j), max_size_(sz) {
+ }
+
+ void operator()(void) {
+ boost::shared_ptr<joint_base> my_joint(joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ my_joint->log.msg("async_impl0::operator(void) enter");
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ if(test_chord_fire()) {
+ boost::function0<void> cb;
+ chord_fired_->capture_arguments(cb);
+ chord_fired_ = 0;
+ my_joint->log.msg("a new async task spawned");
+ my_joint->check_heartbeat();
+ lock.unlock();
+ my_joint->spawn(cb);
+ return;
+ }
+ my_joint->log.msg("async_impl0::operator(void) exit");
+ }
+ void put(void) {
+ operator()();
+ }
+ void pop_top(void) {
+ }
+ void_t top(void) {
+ return void_t();
+ }
+
+ private:
+ size_t max_size_;
+ };
+
+ template <typename ResT>
+ class synch_port : public port {
+ public:
+ synch_port(shared_ptr<joint_base> j) : port(port::synch, j), num_waiting_(0), reset_(false), chord_fired_(0) {}
+
+ void add_chord_fired(chord_base<ResT>* c) {
+ if (chord_fired_ == 0)
+ chord_fired_ = c;
+ else
+ chords_fired_.push_back(c);
+ }
+
+ //async call will call this to transfer control
+ void transfer(chord_base<ResT>* c) {
+ add_chord_fired(c);
+ if (num_waiting_ > 0)
+ cond_.notify_one();
+ joint_->log.msg("transfer");
+ }
+ chord_base<ResT> * get_chord_fired(void) {
+ chord_base<ResT> *cd = 0;
+ if (chord_fired_ != 0) {
+ cd = chord_fired_;
+ chord_fired_ = 0;
+ }
+ else {
+ if (!chords_fired_.empty()) { //assigned to complete chord callback
+ cd = chords_fired_.front();
+ chords_fired_.pop_front();
+ }
+ }
+ return cd;
+ }
+
+ void wait_fired(boost::mutex::scoped_lock &lock, chord_base<ResT> *&chd) {
+ num_waiting_++;
+ cond_.wait(lock);
+ num_waiting_--;
+ if (reset_) {
+ if (num_waiting_ == 0)
+ reset_ = false;
+ throw port_chord_reset_exception();
+ }
+ chd = get_chord_fired();
+ }
+
+ void timed_wait_fired(boost::mutex::scoped_lock &lock, chord_base<ResT> *&chd, const boost::xtime& timeout) {
+ num_waiting_++;
+ if(!cond_.timed_wait(lock, timeout) && ((chd = get_chord_fired()) == 0)) {
+ port_revoked();
+ num_waiting_--;
+ throw synch_time_out_exception();
+ }
+ num_waiting_--;
+ if (reset_) {
+ if (num_waiting_ == 0)
+ reset_ = false;
+ throw port_chord_reset_exception();
+ }
+ if (chd == 0)
+ chd = get_chord_fired();
+ }
+
+ void reset(void) {
+ port::reset();
+ chord_fired_ = 0;
+ chords_fired_.clear();
+ reset_ = true;
+ //tell all blocking threads to exit
+ if (num_waiting_ > 0)
+ cond_.notify_all();
+ //joint_->log.msg("reset");
+ }
+
+ protected:
+ boost::condition cond_;
+ size_t num_waiting_;
+ bool reset_;
+ chord_base<ResT> *chord_fired_;
+ std::deque<chord_base<ResT>*> chords_fired_;
+ };
+
+ template <typename ResT, typename ArgT>
+ class synch_impl : public synch_port<ResT>, private boost::noncopyable {
+ public:
+ typedef ArgT argument_type;
+ typedef ResT result_type;
+
+ synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<ResT>(j), max_size_(sz), arg_(0) {//sz==0 means unlimited
+ }
+
+ ResT operator()(ArgT t) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<ResT, ArgT>::operator(t) enter");
+
+ chord_base<ResT> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<resT,argT> wait");
+ this->wait_fired(lock, chd);
+ }
+ my_joint->log.msg("synch<resT,argT> fired");
+ arg_ = &t; //setup my arg
+ boost::function0<ResT> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ return cb(); //invoke callback in my thread
+ }
+
+ ResT put(ArgT t) {
+ return operator()(t);
+ }
+
+ ResT operator()(ArgT t, const boost::xtime& timeout) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<ResT, ArgT>::operator(t, timeout) enter");
+
+ chord_base<ResT> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<resT,argT> timed wait");
+ this->timed_wait_fired(lock, chd, timeout);
+ }
+ my_joint->log.msg("synch<resT,argT> fired");
+ arg_ = &t; //setup my arg
+ boost::function0<ResT> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ return cb(); //invoke callback in my thread
+ }
+
+ ResT put(ArgT t, const boost::xtime& w) {
+ return operator()(t, w);
+ }
+
+ ArgT &top(void) {
+ return *arg_;
+ }
+ void pop_top(void) {
+ }
+ private:
+ size_t max_size_;
+ ArgT *arg_;
+ };
+
+
+ template <typename ArgT>
+ class synch_impl<void, ArgT> : public synch_port<void>, private boost::noncopyable {
+ public:
+ typedef ArgT argument_type;
+ typedef void result_type;
+
+ synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<void>(j), max_size_(sz), arg_(0) {//sz==0 means unlimited
+ }
+
+ void operator()(ArgT t) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<void, ArgT>::operator(t) enter");
+
+ chord_base<void> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<void,argT> wait");
+ this->wait_fired(lock, chd);
+ }
+ my_joint->log.msg("synch<void, argT> fired");
+ arg_ = &t; //setup my arg
+ boost::function0<void> cb;
+ chd->capture_arguments(cb);
+ my_joint->log.msg("synch<void, argT> fired-bef-callback");
+ my_joint->check_heartbeat();
+ lock.unlock();
+ cb(); //invoke callback in my thread
+ }
+
+ void put(ArgT t) {
+ operator()(t);
+ }
+
+ void operator()(ArgT t, const boost::xtime& timeout) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<void, ArgT>::operator(t, timeout) enter");
+
+ chord_base<void> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<void,argT> timed wait");
+ this->timed_wait_fired(lock, chd, timeout);
+ }
+ my_joint->log.msg("synch<void, argT> fired");
+ arg_ = &t; //setup my arg
+ boost::function0<void> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ cb(); //invoke callback in my thread
+ }
+
+ void put(ArgT t, const boost::xtime& w) {
+ operator()(t, w);
+ }
+
+ ArgT &top(void) {
+ return *arg_;
+ }
+ void pop_top(void) {
+ }
+ private:
+ size_t max_size_;
+ ArgT *arg_;
+ };
+
+ template <typename ResT>
+ class synch_impl<ResT,void> : public synch_port<ResT>, private boost::noncopyable {
+ public:
+ typedef void_t argument_type;
+ typedef ResT result_type;
+
+ synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<ResT>(j), max_size_(sz) {}
+
+ ResT operator()(void) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<ResT,void>::operator(void) enter");
+
+ chord_base<ResT> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<resT,void> wait");
+ this->wait_fired(lock, chd);
+ }
+ my_joint->log.msg("synch<ResT,void> fired");
+ boost::function0<ResT> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ return cb(); //invoke callback in my thread
+ }
+
+ ResT put(void) {
+ return operator()();
+ }
+
+ ResT operator()(const boost::xtime& timeout) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<ResT,void>::operator(timeout) enter");
+
+ chord_base<ResT> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<resT,void> timed wait");
+ this->timed_wait_fired(lock, chd, timeout);
+ }
+ my_joint->log.msg("synch<ResT,void> fired");
+ boost::function0<ResT> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ return cb(); //invoke callback in my thread
+ }
+
+ ResT put(const boost::xtime& w) {
+ return operator()(w);
+ }
+
+ void pop_top(void) {
+ }
+
+ void_t top(void) {
+ return void_t();
+ }
+ private:
+ size_t max_size_;
+ };
+
+
+ template <>
+ class synch_impl<void, void> : public synch_port<void>, private boost::noncopyable {
+ public:
+ typedef void_t argument_type;
+ typedef void result_type;
+
+ synch_impl(size_t sz, shared_ptr<joint_base> j) : synch_port<void>(j), max_size_(sz){}
+
+ void operator()(void) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<void,void>::operator(void) enter");
+
+ chord_base<void> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<void,void> wait");
+ this->wait_fired(lock, chd);
+ }
+ my_joint->log.msg("synch<void,void> fired");
+ boost::function0<void> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ cb(); //invoke callback in my thread
+ }
+
+ void put(void) {
+ operator()();
+ }
+
+ void operator()(const boost::xtime& timeout) {
+ boost::shared_ptr<joint_base> my_joint(this->joint_);
+ if(my_joint == 0)
+ throw not_in_chord_exception();
+ boost::mutex::scoped_lock lock(my_joint->mutex_);
+ if (max_size_> 0 && this->num_pending_ >= max_size_)
+ throw queue_overflow_exception();
+
+ my_joint->log.msg("synch<void,void>::operator(timeout) enter");
+
+ chord_base<void> *chd = 0;
+ if(this->test_chord_fire()) chd = this->get_chord_fired();
+ while (chd == 0) {
+ //no chord ready to fire
+ //block wait here till awaken by another msg
+ my_joint->log.msg("synch<void,void> timed wait");
+ this->timed_wait_fired(lock, chd, timeout);
+ }
+ my_joint->log.msg("synch<void,void> fired");
+ boost::function0<void> cb;
+ chd->capture_arguments(cb);
+ my_joint->check_heartbeat();
+ lock.unlock();
+ cb(); //invoke callback in my thread
+ }
+
+ void put(const boost::xtime& w) {
+ operator()(w);
+ }
+
+ void pop_top(void) {
+ }
+
+ void_t top(void) {
+ return void_t();
+ }
+
+ private:
+ size_t max_size_;
+ };
+
+ }
+
+ //------ Pimpl wrappers ------
+
+ template <typename ArgT, typename QueT=std::deque<ArgT> >
+ class async {
+ template <template <size_t> class, size_t> friend class joint_t;
+ public:
+ typedef detail::async_impl1<ArgT, QueT> ImplT;
+ typedef typename ImplT::argument_type argument_type;
+ typedef typename ImplT::result_type result_type;
+ async(size_t sz=0) : max_size_(sz) {}
+ void put(ArgT t) {
+ if (pimpl_ == 0) throw not_in_chord_exception();
+ pimpl_->put(t);
+ }
+ void operator()(ArgT t) { put(t); }
+ void detach() {
+ if (!pimpl_)
+ pimpl_->detach();
+ }
+ void reset_joint() {
+ if (pimpl_ != 0)
+ pimpl_->reset_joint();
+ }
+ private:
+ size_t max_size_;
+ boost::shared_ptr<ImplT> pimpl_;
+ };
+
+ template <>
+ class async<void> {
+ template <template <size_t> class, size_t> friend class joint_t;
+ public:
+ typedef detail::async_impl0 ImplT;
+ typedef ImplT::argument_type argument_type;
+ typedef ImplT::result_type result_type;
+ async(size_t sz=0) : max_size_(sz) {}
+ void put(void) {
+ if (!pimpl_) throw not_in_chord_exception();
+ pimpl_->put();
+ }
+ void operator()(void) { put(); }
+ void detach() {
+ if (pimpl_ != 0)
+ pimpl_->detach();
+ }
+ void reset_joint() {
+ if (pimpl_ != 0)
+ pimpl_->reset_joint();
+ }
+ private:
+ size_t max_size_;
+ boost::shared_ptr<ImplT> pimpl_;
+ };
+
+ template <typename ResT, typename ArgT>
+ class synch {
+ template <template <size_t> class, size_t> friend class joint_t;
+ public:
+ typedef detail::synch_impl<ResT, ArgT> ImplT;
+ typedef typename ImplT::argument_type argument_type;
+ typedef typename ImplT::result_type result_type;
+ synch(size_t sz=0) : max_size_(sz) {}
+ ResT put(ArgT t) {
+ if (pimpl_ == 0) throw not_in_chord_exception();
+ return pimpl_->put(t);
+ }
+ ResT operator()(ArgT t) { return put(t); }
+ void detach() {
+ if (pimpl_ != 0)
+ pimpl_->detach();
+ }
+ void reset_joint() {
+ if (pimpl_ != 0)
+ pimpl_->reset_joint();
+ }
+ private:
+ size_t max_size_;
+ boost::shared_ptr<ImplT> pimpl_;
+ };
+
+ template <typename ResT>
+ class synch<ResT,void> {
+ template <template <size_t> class, size_t> friend class joint_t;
+ public:
+ typedef detail::synch_impl<ResT, void> ImplT;
+ typedef typename ImplT::argument_type argument_type;
+ typedef typename ImplT::result_type result_type;
+ synch(size_t sz=0) : max_size_(sz) {}
+ ResT put(void) {
+ if (pimpl_ == 0) throw not_in_chord_exception();
+ return pimpl_->put();
+ }
+ ResT operator()(void) { return put(); }
+ void detach() {
+ if (pimpl_ != 0)
+ pimpl_->detach();
+ }
+ void reset_joint() {
+ if (pimpl_ != 0)
+ pimpl_->reset_joint();
+ }
+ private:
+ size_t max_size_;
+ boost::shared_ptr<ImplT> pimpl_;
+ };
+
+ template <typename ArgT>
+ class synch<void,ArgT> {
+ template <template <size_t> class, size_t> friend class joint_t;
+ public:
+ typedef detail::synch_impl<void, ArgT> ImplT;
+ typedef typename ImplT::argument_type argument_type;
+ typedef typename ImplT::result_type result_type;
+ synch(size_t sz=0) : max_size_(sz) {}
+ void put(ArgT t) {
+ if (pimpl_ == 0) throw not_in_chord_exception();
+ pimpl_->put(t);
+ }
+ void operator()(ArgT t) { put(t); }
+ void detach() {
+ if (pimpl_ != 0)
+ pimpl_->detach();
+ }
+ private:
+ size_t max_size_;
+ boost::shared_ptr<ImplT> pimpl_;
+ };
+
+ template <>
+ class synch<void,void> {
+ template <template <size_t> class, size_t> friend class joint_t;
+ public:
+ typedef detail::synch_impl<void, void> ImplT;
+ typedef ImplT::argument_type argument_type;
+ typedef ImplT::result_type result_type;
+ synch(size_t sz=0) : max_size_(sz) {}
+ void put() {
+ if (pimpl_ == 0) throw not_in_chord_exception();
+ pimpl_->put();
+ }
+ void operator()() { put(); }
+ void detach() {
+ if (pimpl_ != 0)
+ pimpl_->detach();
+ }
+ void reset_joint() {
+ if (pimpl_ != 0)
+ pimpl_->reset_joint();
+ }
+ private:
+ size_t max_size_;
+ boost::shared_ptr<ImplT> pimpl_;
+ };
+
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/base/utils.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/base/utils.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,77 @@
+//
+// boost/join/utils.hpp
+//
+// Copyright (c) 2007 - 20089 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_UTILS_HPP
+#define BOOST_JOIN_UTILS_HPP
+
+#include <iostream>
+#include <string>
+#include <boost/thread.hpp>
+#include <sstream>
+
+namespace boost {
+ namespace join {
+
+ class logger : public std::ostringstream {
+ public:
+ const char *name_;
+ static boost::mutex mutex_;
+
+ logger(const char *n = 0) : name_(n) {
+ }
+
+ void msg(std::string str) {
+ if (name_ == 0) return;
+ boost::mutex::scoped_lock lock(mutex_);
+ std::cout << "[" << name_ << "] : " << str << std::endl;
+ }
+ std::ostringstream & stream(void) {
+ boost::mutex::scoped_lock lock(mutex_);
+#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
+ return *(new logger(name_));
+#else
+ return *this;
+#endif
+ }
+ void flush(void) {
+ boost::mutex::scoped_lock lock(mutex_);
+ if(name_ != 0)
+ std::cout << "[" << name_ << "] " << this->str();
+ this->str("");
+ }
+ void flushl(void) {
+ boost::mutex::scoped_lock lock(mutex_);
+ if(name_ != 0)
+ std::cout << "[" << name_ << "] " << this->str() << std::endl;
+ this->str("");
+ }
+ typedef void func(logger &l);
+ static void end(logger &l) {
+ l.flush();
+#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
+ delete &l;
+#endif
+ }
+ static void endl(logger &l) {
+ l.flushl();
+#if defined(BOOST_WINDOWS) || defined(__CYGWIN__)
+ delete &l;
+#endif
+ }
+ };
+ std::ostream &operator << (std::ostream &os, logger::func *f) {
+ f((logger&)os);
+ return os;
+ }
+ boost::mutex logger::mutex_;
+ }
+}
+
+#endif
+

Added: sandbox/join/boost/join/idioms/asio_executor.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/idioms/asio_executor.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,34 @@
+//
+// boost/join/executor.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_ASIO_EXECUTOR_HPP
+#define BOOST_JOIN_ASIO_EXECUTOR_HPP
+
+#include <boost/asio.hpp>
+#include <boost/join/join.hpp>
+
+namespace boost {
+ namespace join {
+
+ //integration with Boost.Asio
+ //submit async tasks to asio's completion_event queue to be executed by main thread
+ class asio_executor {
+ public:
+ boost::asio::io_service& io_service_;
+ asio_executor(boost::asio::io_service& io_service): io_service_(io_service) {}
+ template <typename task_type>
+ void operator()(task_type task) {
+ io_service_.post(task);
+ }
+ };
+
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/idioms/executor.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/idioms/executor.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,108 @@
+//
+// boost/join/executor.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_SIMPLE_EXECUTOR_HPP
+#define BOOST_JOIN_SIMPLE_EXECUTOR_HPP
+
+#include <iostream>
+#include <map>
+#include <boost/thread.hpp>
+#include <boost/join/join.hpp>
+
+namespace boost {
+ namespace join {
+
+ template <template <size_t> class scheduler=sched_first_match, size_t sz=32>
+ class executor_t : public joint_t<scheduler, sz> {
+ public:
+ typedef scheduler<sz> sched_type;
+ typedef joint_t<scheduler, sz> joint_type;
+ typedef typename joint_type::callable task;
+
+ //api: one default task queue
+ async<task> execute;
+ synch<void,void> shutdown;
+
+ void spawn(task t) { execute(t); }
+
+ executor_t(int num, const char *name = 0) : joint_type(0, 0, name) {
+ chord(run, execute, &executor_t::exec_cb);
+ chord(run, stopped, &executor_t::stop_cb);
+ chord(shutdown, started, &executor_t::shutdown_cb);
+ chord(shutdown, stopped, &executor_t::stopped_cb);
+ num_threads_ = num;
+ for(int i=0; i<num; i++)
+ threads_.create_thread(boost::bind(&executor_t::main_loop, this));
+ started(); //init state
+ }
+ ~executor_t() {
+ shutdown();
+ }
+
+ private:
+ synch<bool,void> run;
+ //executor states
+ async<void> started;
+ async<void> stopped;
+ //thread pool
+ boost::thread_group threads_;
+ int num_threads_;
+
+ //entry func of each thread - a loop which exists when no tasks in execute queue and shutdown
+ void main_loop(void) {
+ this->log->msg("a thread starts...");
+ while(run()) {}
+ this->log->msg("a thread exits...");
+ }
+ bool exec_cb(void_t run, task tasklet) {
+ this->log->msg("start one task...");
+ try {
+ tasklet();
+ }
+ catch (join_exception &je) {
+ this->log->msg(je.what());
+ }
+ catch (...) {
+ this->log->msg("UNKNOWN exceptions happen inside a executor thread, ignore.");
+ }
+ this->log->msg("finish one task...");
+ return true; //worker thread continue
+ }
+ bool stop_cb(void_t run, void_t stopd) {
+ return false; //worker thread exit
+ }
+ void shutdown_cb(void_t shdn, void_t started) {
+ this->log->msg("shutdown...");
+ for(int i=0; i<num_threads_; i++) //stop all threads in pool
+ stopped();
+ //waiting for the threads to exit
+ this->log->msg("wait...");
+ threads_.join_all();
+ this->log->msg("all threads exit, done...");
+ stopped(); //add one more stopped(), in case shutdown() is called more than once
+ }
+ void stopped_cb(void_t shdn, void_t stopd) {
+ this->log->msg("stopped...");
+ stopped();
+ }
+ };
+
+ //define a default executor_t
+ class executor : public executor_t<> {
+ public:
+ executor(int num, const char *name = 0) :
+ executor_t<>(num, name)
+ {
+ }
+ };
+
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/idioms/rr_executor.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/idioms/rr_executor.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,128 @@
+//
+// boost/join/rr_executor.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_RR_EXECUTOR_HPP
+#define BOOST_JOIN_RR_EXECUTOR_HPP
+
+#include <iostream>
+#include <map>
+#include <boost/thread.hpp>
+#include <boost/join/join.hpp>
+
+namespace boost {
+ namespace join {
+
+ template <template <size_t> class scheduler=sched_pri_round_robin, size_t sz=32>
+ class rr_executor_t : public joint_t<scheduler, sz> {
+ public:
+ typedef scheduler<sz> sched_type;
+ typedef joint_t<scheduler, sz> joint_type;
+ typedef typename joint_type::callable task;
+ typedef std::map<size_t, async<task> > que_map_type;
+
+ //static api: one default task queue
+ async<task> execute;
+ synch<void,void> shutdown;
+
+ void spawn(task t) { execute(t); }
+
+ //dynamic api: dynamically added task queues
+ async<task> task_queue(size_t i=0) {
+ //since rr_executor itself uses async/synch_p<> methods, we can have task
+ //queues [1-(sz-5)]
+ if (i>(sz-5)) i = i%(sz-5);
+ if(i==0) return execute;
+ if(this->my_scheduler() != schedule_round_robin) {
+ return execute;
+ }
+ typename que_map_type::iterator iter;
+ if ((iter=que_map_.find(i)) != que_map_.end())
+ return iter->second;
+ else {
+ this->log->msg(" creating task_queue ......");
+ async<task> nq;
+ chord(run, nq, &rr_executor_t::exec_cb);
+ que_map_[i] = nq;
+ return nq;
+ }
+ }
+
+ rr_executor_t(int num, const char *name = 0) :
+ joint_type(0, 0, name) {
+ chord(run, execute, &rr_executor_t::exec_cb);
+ chord(run, stopped, &rr_executor_t::stop_cb, 1);
+ chord(shutdown, started, &rr_executor_t::shutdown_cb, 1);
+ chord(shutdown, stopped, &rr_executor_t::stopped_cb, 1);
+ for(int i=0; i<num; i++)
+ threads_.create_thread(boost::bind(&rr_executor_t::main_loop, this));
+ started(); //init state
+ }
+ ~rr_executor_t() {
+ shutdown();
+ }
+
+ private:
+ synch<bool,void> run;
+ //rr_executor states
+ async<void> started;
+ async<void> stopped;
+ boost::thread_group threads_;
+ //dynamic api: dynamically added task queues
+ que_map_type que_map_;
+
+ void main_loop(void) {
+ this->log->msg("a thread starts...");
+ while(run()) {}
+ this->log->msg("a thread exits...");
+ }
+ bool exec_cb(void_t run, task tasklet) {
+ this->log->msg("start one task...");
+ try {
+ tasklet();
+ }
+ catch (join_exception &je) {
+ this->log->msg(je.what());
+ }
+ catch (...) {
+ this->log->msg("UNKNOWN exceptions happen inside a rr_executor thread, ignore.");
+ }
+ this->log->msg("finish one task...");
+ return true; //worker thread continue
+ }
+ bool stop_cb(void_t run, void_t stopd) {
+ stopped();
+ return false; //worker thread exit
+ }
+ void shutdown_cb(void_t shdn, void_t started) {
+ this->log->msg("shutdown...");
+ stopped();
+ //waiting for the threads to exit
+ this->log->msg("wait...");
+ threads_.join_all();
+ this->log->msg("all threads exit, done...");
+ }
+ void stopped_cb(void_t shdn, void_t stopd) {
+ this->log->msg("stopped...");
+ stopped();
+ }
+ };
+
+ //define a default rr_executor_t taking 32 async / synch methods or at most 27 task queues
+ class rr_executor : public rr_executor_t<> {
+ public:
+ rr_executor(int num, const char *name = 0) :
+ rr_executor_t<>(num, name)
+ {
+ }
+ };
+
+ }
+}
+
+#endif

Added: sandbox/join/boost/join/join.hpp
==============================================================================
--- (empty file)
+++ sandbox/join/boost/join/join.hpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,19 @@
+//
+// boost/join/join.hpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#ifndef BOOST_JOIN_HPP
+#define BOOST_JOIN_HPP
+
+#include <boost/join/base/port.hpp>
+#include <boost/join/base/joint.hpp>
+#include <boost/join/idioms/executor.hpp>
+#include <boost/join/idioms/rr_executor.hpp>
+
+#endif
+

Added: sandbox/join/libs/join/doc/boost_join_design.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/boost_join_design.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,301 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+ <title>Join Design</title>
+ <meta content="Yigong Liu" name="author">
+ <meta content="Join Design Doc" name="description">
+</head>
+<body>
+<h2 style="text-align: center;">Join - Asynchronous Message
+Coordination and
+Concurrency
+Library</h2>
+<div style="text-align: center;">
+<h2>Yigong Liu (2007-2009)</h2>
+<h4>yigongliu_at_[hidden]</h4>
+<p style="color: rgb(255, 0, 0);">Warning:&nbsp; This library is NOT an
+official Boost library yet </p>
+<hr style="width: 100%; height: 2px;">
+<ol style="text-align: left;" id="mozToc">
+<!--mozToc h3 1-->
+ <li>Introduction</li>
+ <li><a href="chords_joints.html">Chords and
+Joints</a></li>
+ <li>Samples</li>
+ <li>Synopsis of Join Classes</li>
+ <li>Join Internals</li>
+ <li>References</li>
+</ol>
+<hr style="width: 100%; height: 2px;"></div>
+<h3><a class="mozTocH3" name="mozTocId387457"></a>Introduction</h3>
+Join is an asynchronous message coordination and concurrency library
+based
+on concepts and techniques developed in JoCaml[1] and C&#969;[2]. It is
+applicable both to
+multithreaded applications
+and
+to the orchestration of asynchronous message flows.<br>
+<br>
+Join supports message orchestration with a few simple
+abstractions:<br>
+<ol>
+ <li>
+ <h4>&nbsp;typed asynchronous and synchronous message
+ports<br>
+ </h4>
+ </li>
+</ol>
+<div style="text-align: left;">
+<div style="margin-left: 40px;">These are function objects used as
+typed ports or interfaces to message flows. Message passing is done by
+invoking or
+calling these function objects. Ports by themselves
+are not
+complete message passing constructs yet, they are merely interfaces to
+these constructs which are created when ports are "joined"
+together thru chords. For example, a message queue has a sending port
+and receiving port.<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>&nbsp;&nbsp; <span style="font-weight: bold;">async&lt;MsgT&gt;</span></li>
+</ul>
+<div style="margin-left: 80px;">A asynchronous port provides
+one-way,
+non-blocking calls. Message passing thru async&lt;T&gt; port is
+guaranteed to return immediately; internally there
+could be a queue to buffer the arguments or message.<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>&nbsp;&nbsp; <span style="font-weight: bold;">synch&lt;ResT,
+MsgT&gt;</span></li>
+</ul>
+<div style="margin-left: 80px;">A synchronous port is similar to
+normal
+method call, in that the calling thread will block till result is
+returned. However a normal method call just
+involve the calling thread, a call to synch&lt;R,T&gt; port could
+involves
+multiple threads and synchronization.<br>
+</div>
+</div>
+<ol start="2">
+ <li>
+ <h4>chords (or join patterns)<br>
+ </h4>
+ </li>
+</ol>
+<div style="margin-left: 40px;">A chord binds a set of ports to the
+message processing function which will be invoked when all ports in the
+set are called and messages are available.<br>
+</div>
+&nbsp; &nbsp;<br>
+<div style="margin-left: 40px;">In C&#969;, a thread-safe buffer can be
+defined as following:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">public
+class Buffer {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;
+&nbsp;&nbsp; public async Put(string s);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;
+&nbsp;&nbsp; <span style="font-style: italic;">public string Get()
+&amp; Put(string s) { return s; }&nbsp;&nbsp; //a chord</span></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">} </span><br>
+</div>
+Here a chord is defined with a synchronous Get port and asynchronous
+Put port;
+the calling of Get will block if no Put is called yet, otherwise the
+string sent by Put will be returned to Get.&nbsp; &nbsp;<br>
+<br>
+In Join, the buffer class can be
+defined in C&#969; style as
+following:<br>
+<div style="margin-left: 40px;">class buffer : public <span
+ style="font-weight: bold;">joint</span> {<br>
+public:<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;string&gt;
+put;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+synch&lt;string,void&gt; get;</span><br>
+&nbsp;&nbsp;&nbsp; buffer() {<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+<span style="font-style: italic;">chord(get, put, &amp;buffer::proc);</span></span><br>
+&nbsp;&nbsp;&nbsp; }<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; string proc(void_t
+g, string p) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+&nbsp;&nbsp;&nbsp;&nbsp; return p;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; }</span><br>
+<span style="font-weight: bold;"></span>};<br>
+<br>
+or with boost::lambda, it becomes:<br>
+class buffer {<br>
+public:<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;string&gt;
+put;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; synch&lt;string,
+void&gt; get;</span><br style="font-weight: bold;">
+&nbsp;&nbsp;&nbsp; buffer() {<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; <span
+ style="font-style: italic;">joins().chord(get, put, lambda::_2);</span></span><br
+ style="font-weight: bold;">
+&nbsp;&nbsp;&nbsp; }<br>
+}<br>
+</div>
+</div>
+<div style="margin-left: 40px;">&nbsp;&nbsp; <br>
+</div>
+<div style="margin-left: 40px;">In JoCaml, such a buffer can be created
+with a "factory" function:<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; let
+create_buffer() = </span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+&nbsp;&nbsp; &nbsp;&nbsp; def put(n) &amp; get() = reply n to get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+&nbsp;&nbsp; &nbsp;&nbsp; put, get</span><br>
+<br>
+Similarly in Join, with the help of boost lambda and tuples, a factory
+function can be defined in JoCaml style as following:<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+template &lt;typename T&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+tuple&lt;async&lt;T&gt;, synch&lt;T,void&gt; &gt; create_buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+&nbsp;&nbsp; &nbsp;&nbsp; async&lt;T&gt; put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+&nbsp;&nbsp; &nbsp;&nbsp; synch&lt;T,void&gt; get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+&nbsp;&nbsp; &nbsp;&nbsp; <span style="font-style: italic;">joins().chord(get,
+put, lambda::_2);</span></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+&nbsp;&nbsp; &nbsp;&nbsp; return make_tuple(put, get);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; }</span><br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; &nbsp;&nbsp; &nbsp;&nbsp; <br>
+such a buffer can be safely used in
+multithread applications:<br>
+<div style="margin-left: 40px;">buffer
+b;<br>
+b.put("Hello");
+b.put("World");<br>
+cout &lt;&lt;
+b.get() &lt;&lt; b.get() &lt;&lt; endl;<br>
+</div>
+</div>
+<br>
+<div style="margin-left: 40px;">Difference of Join with C&#969; in
+Chord definitions:<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>C&#969;'s chord defintion is "static", declared in class definition,
+cannot change during runtime, can be optimized by compiler.</li>
+ <li>Join's chord definition is "dynamic", created by chord()
+functions, can be changed during runtime. It can support dynamic join
+programming similar
+to JoCaml and CCR[4].</li>
+</ul>
+<ol start="3">
+ <li>
+ <h4>joint<br>
+ </h4>
+ </li>
+</ol>
+<div style="margin-left: 40px;">A joint "joins" together a set of
+chords, each of which may bind several ports and may share some ports
+with each other, thus creating competing requests for
+messages, which are synchronized by joint's internal logic. A joint
+defines a complete synchronized concurrent message passing contruct. A
+joint is
+where message ports meet
+and processing logic are attached thru chords. All application code
+which use
+async/synch ports and chords to define concurrent behaviours should
+either inherit class joint or use a joint instance to join
+message flows together. Joint maintains the internal state
+about the
+message
+arrivals and synchronization, and firing chords when all of their
+messages are available.<br>
+<br>
+</div>
+In Join based applications, with the help of simple thread-pool based
+executors (which can be built on top of Join's primitives), we can
+develop concurrent
+applications without
+explicit usage of threads (thread creation and synchronization);
+concurrency are mostly defined and controlled by chords with only
+async&lt;&gt; ports.<br>
+<div style="margin-left: 40px;"></div>
+<h3><span class="mozTocH3"></span>License</h3>
+Join is licensed under the <a
+ href="http://www.boost.org/LICENSE_1_0.txt">Boost Software License</a>.<br>
+<h3><span class="mozTocH3"></span>Installation<br>
+</h3>
+Join is continuously being developed and tested in Linux (Fedora
+Core &amp; Ubuntu) and Windows (WindowsXP/Vista and Visual C++ 2008).
+The
+implementation is solely based on standard boost facilities
+(Boost.Bind, Boost.Function, Boost.Shared_Ptr, etc.) which will be part
+of next C++ standard. <br>
+Download: http://port.sourceforge.net<br>
+Installation: <br>
+Join is a header only library. There is no need to build the
+library itself to use it. Please following these steps:<br>
+<div style="margin-left: 40px;"><a
+ href="http://www.boost.org/more/getting_started.html">download or
+checkout boost distribution</a><br>
+download latest boost_join_x_x.tar.gz<br>
+tar xvzf boost_join_x_x.tar.gz<br>
+add boost's directory and Join's directory to compilers' include
+path<br>
+cd to &lt;boost_join_directory&gt;/libs/join/exmaple<br>
+bjam<br>
+</div>
+<h3>Acknowlegement<br>
+</h3>
+The original theory and systems of Join Calculus / JoCaml are developed
+by C&eacute;dric Fournet, Georges
+Gonthier, Jean-Jacques L&eacute;vy, Luc Maranget and Didier R&eacute;my
+and others at MOSCOVA INRIA.<br>
+At Microsoft Research, C&#969; is developed by Nick Benton, Luca Cardelli,
+C&eacute;dric Fournet and others. Claudio Russo developed the C# Joins
+library which is an efficient combinator library for C&#969;-style join
+patterns, implemented in C# 2.0 generics and accessible from multiple
+.Net languages.<br>
+An earlier version of the Join library described here is based on
+both C&#969; and Jocaml
+which supports multiple synchronous methods per chord (join-pattern)
+and propagation of exceptions to all synchronous callers. <br>
+The current version
+is mostly based on C&#969;, supporting only one synchronous method per chord
+for simplicity and better performance.<br>
+Thanks Dr. Claudio Russo at Microsoft Research for many insightful
+email
+discussions, clarifying many of my questions about C&#969; internals and
+semantics, and many helpful links to other Join based projects.<br>
+Thanks Dr. Luc Maranget at Moscova INRIA for many kind advices and
+links to papers, for pointing out the advantages (simplification and
+optimization) of the single-synch per chord design.<br>
+In the earlier version of this Join library, the functional signature
+API of async / synch methods is modeled after
+Douglas Gregor's Boost.Signals.<br>
+For earlier versions which support multiple
+synchronous methods and exception propagation,&nbsp; the scheme of
+catching
+exceptions in one thread, and rethrowing it in another thread is
+based on Christopher M. Kohlhoff's idea and sample code posted in boost
+email
+list discussions.<br>
+<br>
+<br>
+<div style="margin-left: 40px;"></div>
+<span style="font-weight: bold;">
+</span>
+</body>
+</html>

Added: sandbox/join/libs/join/doc/chords_joints.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/chords_joints.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,256 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+ <title>Chords and Joints</title>
+</head>
+<body>
+<h2>Chords and Joints<br>
+</h2>
+<h3>The Role of Chords, and Joints</h3>
+In sequential languages, a common construct of control flow is "switch
+/ case" statements:<br>
+<div style="margin-left: 40px;">switch(val) {<br>
+case val1: statements1; break;<br>
+case val2: statements2; break;<br>
+...<br>
+default: ...<br>
+}<br>
+</div>
+<br>
+In CSP based concurrent languages, such as Occam, Limbo, etc., there is
+a "ATL" construct to branch control flow based on the communication
+readiness of channels:<br>
+<div style="margin-left: 40px;">alt {<br>
+&nbsp;&nbsp;&nbsp; i = &lt;- inchan =&gt;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; statements1;<br>
+&nbsp;&nbsp;&nbsp; outchan &lt;- = "sent it!" =&gt;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; statements2;<br>
+}<br>
+</div>
+Different blocks of statements will run based on the communication
+readiness of channels (inchan, outchan). However in "alt", if the
+qualifier before "=&gt;" contains several channel communication
+operations (&lt;-, -&gt;), only the first one will be tested. The
+relationship between different "alting" segments are OR relationship.<br>
+<br>
+In JoCaml, an "count-down" idiom is defined as :<br>
+<div style="margin-left: 40px;">def count(n) &amp;tick() = count(n-1)<br>
+or count(0) &amp; wait() = reply to wait<br>
+</div>
+There are 2 join-patterns (chords) here: count(n) &amp; tick(),
+count(0) &amp; wait(), each of which contains 2 ports. Similar to CSP,
+join-patterns act as qualifier "guarding" the actions statements
+following it. If all two ports in a join-pattern are called, the action
+statements of this chord is ready to run. The relationship inside a
+join-pattern or chord is AND, while the relationship between different
+chords are OR.<br>
+<br>
+Similarly in Join, joints act as a "switch / case" construct for
+message passing, run different codes depending on the communication
+readiness of ports. So it is a construct for message coordination and
+orchestration, such as following code, different operations are done
+based on readiness of ports:<br>
+async&lt;T1&gt; port1;<br>
+async&lt;T2&gt; port2;<br>
+async&lt;T3&gt; port3;<br>
+joins(executor)<br>
+&nbsp;&nbsp;&nbsp; .chord(port1, port2, operation1)<br>
+&nbsp;&nbsp;&nbsp; .chord(port2, port3, operation2);<br>
+<h3>Chord Definition</h3>
+Chords are created when joint's chord() methods are invoked. A chord
+binds a set of ports to the processing logic which consume and
+process messages from those ports. The processing logic is a
+function object which can be in the form of normal function, lambda,
+class method, or object method. Its signature can be deduced as
+following:<br>
+<div style="margin-left: 40px;">for a chord of a set of ports with
+types: <br>
+&nbsp;&nbsp;&nbsp; PortT1, PortT2, PortT3, ...<br>
+the chord binds to a function object "call" with signature:<br>
+&nbsp;&nbsp;&nbsp; <span style="font-weight: bold;">PortT1::result_type
+call(PortT1::argument_type,
+PortT2::argument_type, PortT3::argument_type, ...);</span><br>
+</div>
+For async&lt;MsgT&gt; port, its result_type is "void", and its
+argument_type is the message type it carries - MsgT.<br>
+For synch&lt;ResT,MsgT&gt;, obviously its result_type is ResT and its
+argument_type is MsgT.<br>
+For ports sending "void" messages (async&lt;void&gt;, synch&lt;ResT,
+void&gt;), their argument type is "void_t" defined as <span
+ style="font-weight: bold;">struct void_t {}</span>; so C/C++ compilers
+can be satisfied.<br>
+<br>
+In the set of ports of a chord, there can be at most one
+synch&lt;R,T&gt; port; and if there is one, it must be the first
+argument of chord and so its result_type is the return type of chord's
+"call" function object.<br>
+<br>
+So a thread safe message queue with async send port and synchronous
+"blocking" recv port can be defined as following:<br>
+&nbsp;&nbsp;&nbsp; template &lt;MsgT&gt;<br>
+&nbsp;&nbsp;&nbsp; class msg_que : public joint {<br>
+&nbsp;&nbsp;&nbsp; public:<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; async&lt;MsgT&gt; send;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; synch&lt;MsgT,void&gt; recv;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; msg_que() {<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; &nbsp;&nbsp; chord(recv, send,
+&amp;msg_que::proc);<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; }<br>
+&nbsp;&nbsp;&nbsp; private:<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; MsgT proc(void_t r, MsgT s) { return s;
+}<br>
+&nbsp;&nbsp;&nbsp; }<br>
+Please note that synchronous port "recv" is used as the first
+argument of chord; and the signature of&nbsp; msg_que::proc matches the
+argument types of chord's ports.<br>
+<br>
+Using boost::lambda, the msg_que class can be redefined as following:<br>
+&nbsp;&nbsp;&nbsp; template &lt;MsgT&gt;<br>
+&nbsp;&nbsp;&nbsp; class msg_que {<br>
+&nbsp;&nbsp;&nbsp; public:<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; async&lt;MsgT&gt; send;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; synch&lt;MsgT,void&gt; recv;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; msg_que() {<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; &nbsp;&nbsp; joins().chord(recv, send,
+_2);<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; }<br>
+&nbsp;&nbsp;&nbsp; }<br>
+<h3>Joint Definition</h3>
+A joint "joins" a set of chords which may share ports, thus
+competing for messages. Internally joint synchronizes the consumption
+of messages and schedules the firing of chords.<br>
+A joint can be defined and used in 2 ways:<br>
+<ol>
+ <li>The joint initially defines a set of chords which later may need
+to be changed (by chord_remove, chord_override, reset) to change
+message processing logic. In this case we need a name or reference to
+the "joint" object to invoke those methods. For this, the "joint" class
+can be used as parent of application classes, or a "joint" object can
+be instantiated; for example:</li>
+</ol>
+<div style="margin-left: 40px;">async&lt;T1&gt; chan1;<br>
+async&lt;T2&gt; chan2;<br>
+async&lt;T3&gt; chan3;<br>
+joint joins1(executor);<br>
+joins1<br>
+&nbsp;&nbsp;&nbsp; .chord(chan1, chan2, proc1)<br>
+&nbsp;&nbsp;&nbsp; .chord(chan2, chan3, proc2);<br>
+...<br>
+joins1.override_chord(chan1, chan2, new_proc);<br>
+joins1.remove_chord(...);<br>
+</div>
+<ol start="2">
+ <li>In many cases, joint is used one-shot to create a "fixed" set of
+synchronization chords which never change during its lifetime. And
+async&lt;&gt; / synch&lt;&gt; ports are the primary programming
+interfaces. We can
+use "factory" funtions "joins() / joins_t()" to create unnamed joints
+for this purpose:</li>
+</ol>
+<div style="margin-left: 40px;">async&lt;T1&gt; chan1;<br>
+async&lt;T2&gt; chan2;<br>
+async&lt;T3&gt; chan3;<br>
+joins(executor)<br>
+&nbsp;&nbsp;&nbsp; .chord(chan1, chan2, proc1)<br>
+&nbsp;&nbsp;&nbsp; .chord(chan2, chan3, proc2);<br>
+<br>
+</div>
+The variance of joint can be configured from the following aspects:<br>
+<ul>
+ <li>template parameters: <br>
+ </li>
+ <ul>
+ <li>scheduling policies: <br>
+ </li>
+ </ul>
+</ul>
+<div style="margin-left: 80px;">simple schedulers:<br>
+&nbsp;&nbsp;&nbsp; sched_first_match, sched_longest_match,
+sched_round_robin;<br>
+priority based schedulers:<br>
+&nbsp;&nbsp;&nbsp; sched_pri_first_match, sched_pri_longest_match,
+sched_pri_round_robin;</div>
+<ul>
+ <ul>
+ <li>max number of ports: 32 or larger<br>
+ </li>
+ </ul>
+</ul>
+<div style="margin-left: 40px;">The templated joint type can be defined
+similar to:&nbsp; joint_t&lt;sched_round_robin, 32&gt;. The default
+"joint" type is "joint_t&lt;sched_first_match,32&gt;".<br>
+or unnamed joints can be created with template factory function:&nbsp;
+joins_t&lt;sched_round_robin, 32&gt;(executor,...). The default factory
+method "joins()" is "joins_t&lt;sched_first_match,32&gt;()".<br>
+</div>
+<ul>
+ <li>instantiation parameters: <br>
+ </li>
+ <ul>
+ <li>executor: if a joint contains chords with all async ports
+(such as the above 2 joint definitions), chord's function have to run
+in a separate thread; executors with type
+boost::function&lt;void(callable)&gt; are used to spawn new tasks.<br>
+ </li>
+ <li>heartbeat: an experiment feature to allow auto-destruction of
+joint and its chords. If a positive heartbeat is specified, after
+"heartbeat" number of firings of chords, this joint will be
+auto-destroyed and ports detached.<br>
+ </li>
+ <li>name: if defined, debuging messages will be printed for this
+joint.<br>
+ </li>
+ </ul>
+</ul>
+<div style="margin-left: 40px;"></div>
+<h3>Runtime Semantics</h3>
+Quoted and modified from <a
+ href="http://research.microsoft.com/Comega/doc/comega_tutorials_concurrency_extensions.htm">C&#969;
+Concurrency Extensions Tutorials [2]</a>:
+<p style="margin-left: 40px;">The body (message processing logic) of a
+chord can only execute once
+<i>all</i> the ports in
+its header have been called. When a async / synch port
+is called there may be zero, one, or more chords which are enabled
+(ready to fire):</p>
+<ul style="margin-left: 40px;">
+ <li> If no chord is enabled then the port invocation is queued up.
+If the port is asynchronous, then this simply involves adding the
+message to a queue. If the method is
+synchronous, then the calling thread is blocked. </li>
+ <li> If there is a single enabled chord, then the messages of the
+calls involved in the match are de-queued, any blocked thread involved
+in the match is awakened, and the body runs. </li>
+ <li> If there are several chords which are enabled then the joint
+scheduling policy decides which is chosen to run. </li>
+ <li>chord body execution:<br>
+ </li>
+ <ul>
+ <li> When a chord which involves only asynchronous ports runs,
+then it does so in a thread of executors' thread pool.</li>
+ </ul>
+ <ul>
+ <li>If a chord involves a (single) synch&lt;&gt; method, the chord
+body will execute in the thread which calls the synch&lt;&gt; method</li>
+ </ul>
+ <li><span style="font-weight: bold;">Exception handling</span>: if an
+exception is thrown inside a chord
+"body" function object,</li>
+ <ul>
+ <li>if this chord only involves asynchronous methods, its body will
+run in a thread from executor's thread pool, and the exception thrown
+inside chord body will be caught and then simply dropped (in fact if
+executor's log is
+turned on, we'll see it in log), since the semantics of async calls are
+to return immediately with no result.<br>
+ </li>
+ <li>if this chord has a synchronous method, the chord body will
+execute in the calling thread of this synchronous method and this
+caller will also get the exception.<br>
+ </li>
+ </ul>
+</ul>
+</body>
+</html>

Added: sandbox/join/libs/join/doc/internals.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/internals.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,320 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <title>Join Internals</title>
+</head>
+<body>
+<h2 style="text-align: left;">Join Internals</h2>
+<h3>Architecture</h3>
+Join's internal design consists of 2 parts:<br>
+<ol>
+ <li>Low Level Resource Acquisition Core <br>
+ </li>
+</ol>
+<div style="margin-left: 40px;">The core of Join (and similarly
+JoCaml/Cw/C#.Joins) is a simple resource acquisiton (and contention
+management)
+manager which supports atomic acquisition of multiple resources:<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>use a bitmap to represent
+(global) resource-availability status:</li>
+</ul>
+<div style="margin-left: 40px;">
+<ul>
+</ul>
+</div>
+<div style="margin-left: 80px;">
+<ul>
+ <li>each bit is used for
+status of one resource (available/unavailable) or message
+(arrival/missing)</li>
+ <li>bitmap -
+represent the status of multiple resources</li>
+ <li>each acquisition
+(behaviour/pattern) is also represented as a bitmap (chord)</li>
+</ul>
+<ul>
+</ul>
+</div>
+<ul style="margin-left: 40px;">
+ <li>one
+acquisition can try to acquire multiple resources (chord bitmap
+contains
+multiple "1" bits)</li>
+ <li>applications can use
+multiple concurrent / competing acquisition behaviours(patterns) whose
+bitmaps overlap or which
+compete for the same global resources</li>
+ <li>fast
+dispatching / scheduling:</li>
+</ul>
+<ul style="margin-left: 40px;">
+</ul>
+<div style="margin-left: 80px;">whenever a new resource becomes
+available, the global status
+bitmap is updated, and the new global status
+bitmap is compared with chords' bitmaps; if a chord's bitmap
+matches (covered by) the global
+status bitmap, this acquisition can be satisfied (or chord is fired):
+one item is removed from each resource marked by the bits of chord's
+bitmap, and the global status bitmap is updated to reflect new
+status of resource availability.<br>
+If more than one chord can be fired, we can apply
+various scheduling algorithms: round robin, priority based<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li><span style="font-weight: bold;">a single mutex</span> is used to
+protect all
+these bitmaps and comparing logic</li>
+ <li>multiple threads can try
+concurrent acquisitions and contention/conflicts are resolved
+atomically and thread-safely&nbsp;&nbsp;
+&nbsp;<br>
+ </li>
+</ul>
+<ol start="2">
+ <li>High Level Messaging Semantics</li>
+</ol>
+<div style="margin-left: 40px;">On top of Join's core
+resource-acquisition manager, Join
+(Jocaml/Cw/C#.Joins) adds the following high level messaging based
+semantics :<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>each bit of global resource bitmap is "set" by a port
+(messages are the resources to
+be
+consumed), so each chord/acquisition-pattern is defined
+by a set of ports</li>
+ <li>each chord/pattern is associated with a callback
+(chord-body) which defines how the resources/messages&nbsp; (acquired
+by
+this chord from ports/ports) are consumed</li>
+</ul>
+<div style="margin-left: 40px;">Two types of port give
+different semantics :<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>asynchronous: when an asynchronous port is invoked, if no
+acqusition bitmaps is satisfied (no chord can fire), arguments/messages
+are buffered,
+global
+resource bitmap is updated and calling thread returns without blocking</li>
+ <li>synchronous: when a synchronous port is invoked, if no
+acqusition bitmap
+is satisfied (no chord can fire), arguments/messages are kept at stack,
+global
+resource bitmap is updated and calling thread is blocked (<span
+ style="font-weight: bold;">a condition variable </span>is
+used)</li>
+</ul>
+<div style="margin-left: 40px;">If there exist a chord (acquisition
+pattern) which
+can fire, depending on the types of acquisition pattern (chord), there
+are 2 behaviours:<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>if the chord contains a synchronous port, the blocked thread
+of synch-call is waked up and the
+chord-body
+callback is executed by/inside that thread</li>
+ <li>if the chord contains all async ports/ports, the chord-body
+callback is executed in
+"another/different"
+thread (which can be
+a new thread or from thread-pool), this is how "spawning" is done in
+Join</li>
+</ul>
+<h3>Some facts result from the architecture<br>
+</h3>
+<ol start="1">
+ <li>Join's usage of low level synchronization primitives<br>
+ </li>
+</ol>
+<div style="margin-left: 40px;">The rule to get the number of
+(mutex, condition variable)
+used by a
+Join class is simple: <br>
+<ul>
+ <li>each joint holds a mutex which is used to protect all
+code in async&lt;&gt;/synch&lt;&gt;/chord/joint, </li>
+ <li>each synch&lt;&gt; port holds a
+condition variable and </li>
+ <li>async&lt;&gt; ports hold no synchronization primitive<br>
+ </li>
+</ul>
+Although Join's primitives (async&lt;&gt; / synch&lt;&gt; / chord) are
+high level, the author's experience has shown that concurrent
+applications written with Join usually use almost the same number of
+low level synchronization primitives as manually crafted ones.<br>
+<br>
+</div>
+<div style="margin-left: 40px;">For example, a manually crafted thread
+safe message queue will use a mutex and a condition variable to provide
+an asynchronous send/push interface (send and go) and a synchronous
+receive/pop interface (blocking wait if no message available).<br>
+</div>
+<div style="margin-left: 40px;">In Join, a simple thread safe message
+queue can be defined as following:<br>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename msg_type&gt;</span><br>
+<span style="font-weight: bold;">class message_queue : public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+async&lt;msg_type&gt; send;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+synch&lt;msg_type,void&gt; recv;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+message_queue() {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;
+&nbsp;&nbsp;&nbsp; chord(recv,
+send,
+&amp;message_queue::forward);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; private:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+msg_type
+forward(void_t r, msg_type s) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;
+&nbsp;&nbsp;&nbsp; return s;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};</span><br>
+Based on the above rule, since this message_queue class inherits from
+joint class and has one synch&lt;&gt; port, it will use one mutex
+and
+one condition variable, using exactly the same number of low level
+synchronization primitives as manually crafted. &nbsp;&nbsp;
+&nbsp;&nbsp; </div>
+<ol start="2">
+ <li>Join's expressiveness<br>
+ </li>
+ <ol>
+ </ol>
+</ol>
+<div style="margin-left: 40px;">Join provides a generic framework to
+compose asynchronous and synchronous behaviours in thread safe manner.
+Join's special internal design makes it
+very expressive in creating concurrent applications:<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>Join is expressive in that it can be used to
+create most common concurrency / synchronization constructs with
+ease,&nbsp; such as monitor, read-write lock, or joints. See tutorials
+for detailed coding</li>
+ <ul>
+ </ul>
+ <li>Join is expressive in that it is common and easy to
+write
+concurrent (thread-safe) applications using solely Join's primitives
+(async/synch/chord) without using low level threads, mutex and
+condition variables. Again see tutorials for more samples. Please refer
+to this <a href="../../../boost/join/idioms/executor.hpp">Join based
+simple
+executor</a> implmented sole thru Join's primitives.<br>
+ </li>
+</ul>
+<ol start="2">
+ <ol>
+ </ol>
+</ol>
+<ol start="3">
+ <li>Since Join's core
+directly supports atomic acquisition of multiple resources, it helps
+nicely multithreaded applications which involve multiple resources:</li>
+</ol>
+<ul style="margin-left: 40px;">
+ <li>Multithreaded applications whose threads acquiring conflicting
+sets of multiple resources (or locks)&nbsp; are prone to
+dead-locks.&nbsp; The normal way to avoid dead-lock is by enforcing a
+consistent (global) order of acquiring resources (locks), as clearly
+explained in Herb Sutter's article <a
+ href="http://www.ddj.com/hpc-high-performance-computing/204801163">"Use
+Lock Hierarchies to Avoid Deadlock"</a>.</li>
+</ul>
+<div style="margin-left: 80px;">Join provides a natural solution to
+this issue. We can use async ports/ports to represent the
+availability status of resources. The existence of messages at these
+async ports represent that resources (locks) are available to be
+consumed. Each acquisition pattern is represented as a chord which has
+one synch port and multiple async ports. For example, to
+resolve the conflicting acquisitions of resources from multiple
+threads, we can define the following resource manager:<br>
+<span style="font-weight: bold;">class resource_manager : joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;void&gt;
+resource1;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;void&gt;
+resource2;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;void&gt;
+resource3;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;void&gt;
+resource4;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; ......</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+synch&lt;void,void&gt;
+acquire1;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+synch&lt;void,void&gt;
+acquire2;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+synch&lt;void,void&gt;
+acquire3;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; ......</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; resource_manager() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+chord(acquire1, resource1, resource3, resource4,
+&amp;resource_manager::handle_acquire1);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+chord(acquire2, resource3, resource2, resource1,
+&amp;resource_manager::handle_acquire2);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+chord(acquire3, resource2, resource4,
+&amp;resource_manager::handle_acquire3);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; ......<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; resource1();<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; resource2();<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; resource3();<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp; ......<br style="font-weight: bold;">
+</span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+The above 3 chords define 3 different acquisition patterns competing
+for the same set of resources. Initially a message will be sent on each
+of these async ports to mark resource1,2,3 are available. Different
+threads will call acquire1() / acquire2() / acquire3(), block waiting
+until all of its required resources are available. When a chord is
+fired in one thread, that thread will remove a messages from each of
+async ports marked in the chord pattern, meaning it consumes these
+resources. When this thread is finished with using these resources, it
+will call these async ports to mark the resources available again.
+Please note that the order of these async ports (resources) defined
+in chords are not relevant anymore; since all required async ports
+(resources) are acquired atomically.<br>
+</div>
+<ul style="margin-left: 40px;">
+ <li>In the Futures/Promises concurrency model, one common idiom
+(wait_for_all) is using a future to wait for results from multiple
+asynchronous computations. Join can also be used to implement Futures
+and its wait_for_any and wait_for_all idioms. Please refer to the
+following section on futures/promises
+for more detailed discussions.</li>
+</ul>
+&nbsp;<br>
+<br>
+</body>
+</html>

Added: sandbox/join/libs/join/doc/references.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/references.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,26 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+ <title>References</title>
+</head>
+<body>
+<h3>References</h3>
+<ol>
+ <li>JoCaml<br>
+ </li>
+ <li><a href="http://research.microsoft.com/Comega/">Comega: "Modern
+Concurrency Abstractions for C#"</a><br>
+ </li>
+ <li><a href="http://research.microsoft.com/%7Ecrusso/joins/index.htm">The
+Joins Concurrency Library</a></li>
+ <li><a
+ href="http://channel9.msdn.com/wiki/default.aspx/Channel9.ConcurrencyRuntime">Coordination
+and Concurrency Runtime (CCR)</a>&nbsp; <br>
+ </li>
+ <br>
+</ol>
+<br>
+</body>
+</html>

Added: sandbox/join/libs/join/doc/samples.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/samples.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,213 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+ <title>Chords and Joints</title>
+</head>
+<body>
+<h2>Samples</h2>
+Thread safe buffer and message queue
+<br>
+Handle multiple message flows
+<br>
+Using portss as the interface between components<br>
+Joint's life cycle<br>
+<h3><a name="buffer"></a>Thread safe buffer and message queue</h3>
+a thread safe buffer can be defined in various manners.<br>
+<br>
+the most explicit version:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename V&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer: public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;V&gt; put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;V,void&gt; get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; chord(get, put,
+&amp;buffer::chord_body);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; V chord_body(void_t g, V p) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; return p;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer.cpp<br>
+<br>
+simplified version using boost::lambda or boost::phoenix and "joins()"
+factory function:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename V&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;V&gt; put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;V,void&gt; get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; joins().chord(get,
+put, lambda::_2);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer_lambda.cpp<br>
+buffer_phoenix.cpp<br>
+<br>
+Using boost::tuples, boost::phoenix, and "joins()" factory function, we
+can create message queue (essentially a thread safe buffer) using
+factory function in JoCaml
+style:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename V&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">boost::tuple&lt;async&lt;V&gt;,synch&lt;V,void&gt;
+&gt; create_msg_que() {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;V&gt; send;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;V,void&gt; recv;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; joins().chord(recv, send, arg2);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; return boost::make_tuple(send,
+recv);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}&nbsp;&nbsp; <br>
+...<br>
+&nbsp; async&lt;int&gt; send;<br>
+&nbsp; synch&lt;int, void&gt; recv;<br>
+&nbsp; boost::tie(send, recv) = create_msg_que&lt;int&gt;();<br>
+</span></div>
+producer_consumer.cpp<br>
+<h3><a name="flows"></a>Handle multiple message flows</h3>
+A simple demo of joint handling 3 message flows and performing
+different processing on messages when they are available in different
+portss:<br>
+<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">flows_bundle
+make_data_portss(joint::spawn_type e) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;int&gt; flow1;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;int&gt; flow2;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;int&gt; flow3;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; joins(e)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; .chord(flow1,
+flow2, std::cout &lt;&lt;&nbsp; (arg1 + arg2) &lt;&lt; "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; .chord(flow2,
+flow3, std::cout &lt;&lt;&nbsp; (arg1 - arg2) &lt;&lt; "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; .chord(flow1,
+flow3, std::cout &lt;&lt;&nbsp; (arg1 * arg2) &lt;&lt; "\n");</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; return boost::make_tuple(flow1,
+flow2, flow3);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+</div>
+flows.cpp<br>
+<br>
+a sample of joining a vector of message flows into one output flow:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename T&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class join_many : public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; vector&lt;async&lt;T&gt; &gt;
+inputs;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;vector&lt;T&gt;,
+void&gt; output;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; join_many(int num) :
+inputs(num) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; chord(output,
+inputs, &amp;join_many::chord_body);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; vector&lt;T&gt;
+chord_body(void_t out, vector&lt;T&gt; in) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; return in;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+join_many.cpp<br>
+<h3><a name="chain"></a>Using ports as the interface between
+components</h3>
+A sample of a chain of nodes, each node exposes its input and output
+interfaces as ports.<br>
+Each node reads from its input, does some transformation of data, wait
+a second and send it to output interface, all these are performed in
+executor's thread pool.<br>
+The way to connect one node (A) to the next (B) is simple:&nbsp;
+A.output = B.input.<br>
+<div style="margin-left: 40px; font-weight: bold;">template
+&lt;typename T&gt;<br>
+class node {<br>
+&nbsp; int my_no;<br>
+&nbsp; boost::function&lt;T(T)&gt; func_;<br>
+public:<br>
+&nbsp; async&lt;T&gt; input;<br>
+&nbsp; async&lt;T&gt; output;<br>
+&nbsp; node(joint::spawn_type e, int n, boost::function&lt;T(T)&gt; f)
+: <br>
+&nbsp;&nbsp;&nbsp; my_no(n), func_(f), stop(false) {<br>
+&nbsp;&nbsp;&nbsp; joins(e).chord(input, bind(&amp;node::proc, this,
+_1));<br>
+&nbsp; }<br>
+private:<br>
+&nbsp; void proc(T in) {<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; T o = func_(in);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; thread_sleep(1);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; output(o);<br>
+&nbsp; }<br>
+};<br>
+</div>
+chain.cpp<br>
+<br>
+Another sample of using async ports as service interface. The server
+exposes the following interface taking requests in the form of a string
+name and a response port:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">class
+IService {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;tuple&lt;string,
+async&lt;string&gt; &gt; &gt; Service;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};</span><br>
+</div>
+When server receives such a request, it does some work, and send then
+result in response port.<br>
+In other side, clients first create some response ports (Result2.first
+and Result2.second) and pack them inside service requests when sending
+requests to servers; then block waiting on these response ports for
+the results from servers.<br>
+async_call_ret.cpp<br>
+<h3><span style="font-weight: bold;"><a name="lifecycle"></a>Joint's
+life cycle</span></h3>
+After the initial definition, a joint's set of chords can modified thru
+methods: chord_override(), chord_remove() or reset(). And if a joint's
+heartbeat is set during its creation, its set of chords will be
+auto-destroyed after "heartbeat" number of firings of chords.<br>
+Here is a sample demonstrating these joint's lifecycle related methods.<br>
+joint_lifetime.cpp<br>
+<br>
+</body>
+</html>

Added: sandbox/join/libs/join/doc/synopsis_port.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/synopsis_port.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,571 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+ <title>Synopsis of Join Classes</title>
+</head>
+<body>
+<h2>Synopsis of Join Classes<br>
+</h2>
+<ol>
+ <li>classes</li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">namespace
+boost {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; namespace join {</span><br>
+</div>
+<div style="margin-left: 40px;">&nbsp;&nbsp;<span
+ style="font-weight: bold;">&nbsp; &nbsp;&nbsp;&nbsp; template
+&lt;typename MsgT, typename QueT&gt; class async;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;
+template &lt;typename ResT, typename MsgT&gt; class synch;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;
+template &lt;typename scheduler, size_t max_size&gt;
+class joint;<br>
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; joint joins(joint::spawn_type s =
+0, int hb = 0, const char *name = 0);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp;
+class executor;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+</div>
+<ol start="2">
+ <li>Asynchronous ports<br>
+ <span style="font-weight: bold;"></span></li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename MsgT, typename QueT&gt;&nbsp; </span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">class async {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; typedef
+MsgT argument_type;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+typedef void result_type;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+......</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;"></span>
+<span style="font-weight: bold;"></span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+async(size_t sz=0) : ... { ... }</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; void
+operator()(MsgT msg)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br style="font-weight: bold;">
+<br>
+Asynchronous port objects provide one-way non-blocking message passing
+interfaces.<br>
+<br>
+<span style="font-weight: bold;">typename QueT: <br>
+</span>The queue type async ports used to buffer messages internally
+when no chords are ready to fire. The default QueT is std::deque. QueT
+must satisfy the following interface:<br>
+<div style="margin-left: 40px;">class QueT {<br>
+&nbsp;&nbsp;&nbsp; void push_back(MsgT);<br>
+&nbsp;&nbsp;&nbsp; MsgT front();<br>
+&nbsp;&nbsp;&nbsp; void pop_front();<br>
+&nbsp;&nbsp;&nbsp; void clear();<br>
+}<br>
+</div>
+<span style="font-weight: bold;"></span><br>
+<span style="font-weight: bold;">async(size_t sz=0):<br>
+Effect: </span>initialize a asynchronous port with underlying
+message queue of <span style="font-weight: bold;">QueT</span> type.
+The queue size <span style="font-weight: bold;">sz</span> is specified
+for flow control; the default value 0 means
+unlimited queue size. Internally there are 2 implementations for this
+class. If the signature is async&lt;void&gt;, there is no
+message to be passed, and template specialization will choose
+a proper implementation without a queue (and its overhead) while
+maintaining the
+same semantics.<span style="font-weight: bold;"><br>
+Throw:<br>
+<br>
+</span><span style="font-weight: bold;">void operator()(MsgT msg): </span><br>
+<span style="font-weight: bold;">Effect:</span> the port api to
+send asynchronous messages; calling threads will return immediately.
+Since async calls will return immediately, and <span
+ style="font-weight: bold;">msg</span> argument may be
+buffered in queues, the <span style="font-weight: bold;">msg</span>
+argument cannot be references or pointers to
+variables on calling stacks which will probably be unwound when async
+calls
+return.<br>
+<span style="font-weight: bold;">Throws</span>: not_in_chord_exception,
+queue_overflow_exception, no_executor_exception<br>
+</div>
+<ol start="3">
+ <li>Synchronous ports<span style="font-weight: bold;"></span></li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename ResT, typename MsgT&gt; <br>
+class synch {<br>
+public:<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; typedef MsgT argument_type;<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; typedef ResT result_type;<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ......<br>
+&nbsp;
+<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; synch(size_t sz=0) : ... { ... }<br>
+</span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+ResT operator()(MsgT msg);<br>
+</span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+ResT operator()(MsgT msg, boost::xtime &amp;timeout);<br>
+</span><span style="font-weight: bold;">};<br>
+<br>
+</span>Synchronous ports provides blocking message ports.<br>
+<br>
+<span style="font-weight: bold;">synch(size_t sz=0):<br>
+Effect: </span>initialize a synchronous port with underlying thread
+waiting queue.
+The queue size <span style="font-weight: bold;">sz</span> is specified
+for flow control; the default value 0 means
+unlimited queue size. Internally there are four implementations and
+template specialization help choose the most efficient one.<span
+ style="font-weight: bold;"><br>
+Throw:<br>
+<br>
+</span><span style="font-weight: bold;">ResT operator()(MsgT msg):</span><br>
+<span style="font-weight: bold;">Effect: </span>the port
+interface to send a synchronous message;
+calling thread will block here till a reply is available. Since the
+call will block till a result is returned, the <span
+ style="font-weight: bold;">msg</span> argument can be
+anything just as normal function calls, including pointers or
+references to variables on stack.<br>
+<span style="font-weight: bold;">Throws: </span>not_in_chord_exception,
+queue_overflow_exception, or&nbsp;
+application_specific_exceptions_raised_inside_chord_body<br>
+<br>
+<span style="font-weight: bold;">ResT operator()(MsgT msg, boost::xtime
+&amp;timeout):</span><br>
+<span style="font-weight: bold;">Effect: </span>this is a variant of
+the above interface with a timeout. The call will return either the
+result is available or return with a "synch_time_out_exception" when
+the timeout is reached.<br>
+<span style="font-weight: bold;">Throws:</span><span
+ style="font-weight: bold;"></span> not_in_chord_exception,
+queue_overflow_exception, synch_time_out_exception,&nbsp;
+application_specific_exceptions_raised_inside_chord_body<span
+ style="font-weight: bold;"> </span>
+</div>
+<ol start="4">
+ <li>Joint<br>
+ </li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;</span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">template &lt;size_t&gt; scheduler =
+sched_first_match,<br>
+</span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; size_t
+max_size = 32&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class joint {<br>
+&nbsp;&nbsp;&nbsp; typedef function1&lt;void, typename
+joint_base::callable&gt; spawn_type;<br style="font-weight: bold;">
+</span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">&nbsp;&nbsp;&nbsp; joint(spawn_type spawn =
+0, int heartbeat = 0, const char *name = 0):<br>
+</span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+template
+&lt;typename PortT1, typename PortT2,
+......, typename CallT&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; void chord(PortT1
+&amp;p1, PortT2 &amp;p2, ......, CallT c, int priority=0)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+template
+&lt;typename PortT1, typename PortT2, </span><span
+ style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">typename CallT&gt;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; void
+chord_override(PortT1 &amp;p1, PortT2 &amp;p2, </span><span
+ style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">CallT c</span><span
+ style="font-weight: bold;">, int priority=0</span><span
+ style="font-weight: bold;">)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+template
+&lt;typename PortT1, typename PortT2,
+......&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; void
+chord_remove(PortT1
+&amp;p1, PortT2 &amp;p2, ......)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+......</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">};</span><br>
+Joint can be used as the parent of classes which use
+async&lt;&gt; / synch&lt;&gt; methods and chords to define concurrent
+activities; or it can be used to directly join message ports
+together. Joint maintains the status of message arrival and
+synchronization. <br>
+<br>
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">typename
+spawn_type:</span> <br>
+&nbsp;&nbsp;&nbsp; the type of execution service used to drive Join
+based asynchronous applications. It could be the async "execute" method
+of the following thread pool based executor, or it could be a adaptor
+to existing applications execution service, as long as it provides a
+functional / functor interface to spawn a task:<br>
+&nbsp;&nbsp;&nbsp; <span style="font-weight: bold;">void
+operator()(callable t);<br>
+<br>
+</span><span style="font-weight: bold;">max_size:</span> <br>
+&nbsp;&nbsp;&nbsp; defines the capacity
+of Joint, ie. how many async&lt;&gt; / synch&lt;&gt; ports can be
+defined in this Joint. When max_size &lt;= 32, an integer bitmask is
+used
+to maintain status; while for "larger" Joint, std::bitset&lt;&gt; is
+used for status.<br>
+<br>
+<span style="font-weight: bold;">scheduling_policy: </span><br>
+&nbsp;&nbsp;&nbsp; Joint
+supports three basic kinds of scheduling policies that decide which
+chord
+will fire when multiple chords become ready at the same time:<br>
+<ul>
+ <li><span style="font-weight: bold;">sched_first_match</span>: fire
+the first chord which is ready to
+fire</li>
+ <li><span style="font-weight: bold;">sched_longest_match</span>: when
+multiple chords are ready to fire,
+fire the one defined with most async&lt;&gt; / synch&lt;&gt; ports.</li>
+ <li><span style="font-weight: bold;">sched_round_robin</span>:
+internally the last chord fired is
+remembered and chords are fired in round-robin style.</li>
+</ul>
+&nbsp;&nbsp;&nbsp; Additionally Joint supports priority based variants
+of the above three scheduling algorithms.<br>
+<br>
+<span style="font-weight: bold;">joint(spawn_type spawn = 0, int
+heartbeat = 0, const char *name = 0):<br>
+</span>&nbsp;&nbsp;&nbsp; an Joint is constructed with the following
+two settings:<br>
+<ul>
+ <li><span style="font-weight: bold;">spawn_type spawn</span>: the
+asynchronous port to spawn a new task.
+It could be an adaptor to spawn tasks into existing applications main
+threads (and it must be non-blocking and returns immediately), or the
+execute port of the following executor (thread pool),
+ie.
+the task waiting queues. In Join based applications, concurrency are
+generated by
+chords with only async&lt;&gt; ports whose body will run as a task
+in
+executor thread pool.</li>
+ <li>int <span style="font-weight: bold;">heartbeat: </span>used to
+control the life time of message coordination definitions (chords)
+inside the joint. If a heartbeat ( &gt; 0 ) is provided, the joint's
+internal definitions (chords) will be automatically reset / destroyed
+after its chords have fired "heartbeat" times totally. So after
+heartbeat expires, all of joint's chords will be destroyed and ports
+detached.<br>
+ </li>
+ <li>const char *name: the name of Joint; it is used as a flag to turn
+on/off debug messages, when it is set to a non-NULL value, debugging
+message will be printed out.<br>
+ </li>
+</ul>
+<span style="font-weight: bold;">template
+&lt;typename PortT1, typename PortT2,
+......, typename CallT&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">void chord(PortT1 &amp;p1, PortT2
+&amp;p2, ......, CallT c, int priority=0)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">Effect:
+</span>overloaded chord()
+functions to create chords with different number of
+async&lt;&gt; / synch&lt;&gt; ports. By default Joint has just eight
+overloaded chord() functions to create chords for one to eight
+async&lt;&gt; / synch&lt;&gt; ports,&nbsp; because any of these
+async&lt;&gt; ports can be a std::vector&lt;async&lt;&gt;&gt;, so in
+fact
+chords can be created for unlimited number of
+async&lt;&gt; ports, while each chord can have at most one
+synchronous port:<br>
+<ul>
+ <li>PortT &amp;p, &amp;p1, ... &amp;pn: async&lt;&gt; / synch&lt;&gt;
+ports which are in the header of chord body; only one of them can be
+synchronous and if we do have one, it must the first port.<br>
+ </li>
+ <li>CallT c: a function, a lambda, class-method, or object-method
+which will consume and process the messages from chord's specified
+ports. Normally the signature of CallT is deduced from chord's port /
+port types:</li>
+</ul>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">PortT1::result_type
+call(PortT1::argument_type, PortT2::argument_type, ...);<br>
+<br>
+</span>Please note that if a port's message type is "void" (such as
+async&lt;void&gt;), we substitue its argument_type as "void_t" which is
+an "empty" struct type defined internally.<span
+ style="font-weight: bold;"><br>
+</span></div>
+<ul>
+ <li>int priority: the scheduling priority of this chord; the
+default value 0 has the highest priority, the greater the number, the
+lower the priority. During scheduling, the chords with higher priority
+will be scanned and scheduled first.</li>
+</ul>
+<span style="font-weight: bold;">Throws: </span>hidden_chord_exception,
+too_many_ports_exception<br>
+<br>
+<span style="font-weight: bold;">template &lt;typename PortT1, typename
+PortT2, </span><span style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">typename CallT&gt;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">void chord_override(PortT1 &amp;p1,
+PortT2 &amp;p2, </span><span style="font-weight: bold;">......, </span><span
+ style="font-weight: bold;">CallT c</span><span
+ style="font-weight: bold;">, int priority=0</span><span
+ style="font-weight: bold;">)</span><span style="font-weight: bold;"></span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">Effect:&nbsp; </span>allow overriding
+an existing chord (the chord body is replaced with the new one).
+It is mostly used in child class to override a chord defined in parent
+class. The overridden chord is identified by the set of async&lt;&gt; /
+synch&lt;&gt; ports in its header. It uses the same arguments as
+normal chord() functions. <br>
+<span style="font-weight: bold;">Throws:&nbsp; </span>chord_override_exception
+(chord not found)<br>
+<br>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename PortT1, typename PortT2,
+......&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">void chord_remove(PortT1
+&amp;p1, PortT2 &amp;p2, ......)</span><br style="font-weight: bold;">
+</div>
+<div style="margin-left: 40px;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">Effect:&nbsp; </span>remove the chord
+identified by the set of async / synch ports. <br>
+<span style="font-weight: bold;">Throws:&nbsp; </span>chord_remove_exception
+(chord not found)<br>
+<br>
+</div>
+<ol start="5">
+ <li>Factory functions to create "unnamed" joints</li>
+</ol>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; joint
+joins(joint::spawn_type s = 0, int hb = 0, const char *name =
+0);&nbsp;&nbsp; <br>
+&nbsp;&nbsp;&nbsp; &nbsp; </span>or a more general templated function:<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; template &lt;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
+template &lt;size_t&gt; class scheduler, </span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; size_t
+max_size</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &gt;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+joint_t&lt;scheduler, max_size&gt; joins_t(joint::spawn_type s = 0, int
+hb = 0, const char *name = 0);<br>
+&nbsp;&nbsp;&nbsp; </span>Often joint is used directly to join message
+ports together, and we don't need direct reference to the "joint"
+object. In these cases, the factory functions can be used to create
+unnamed joints, such as:<br>
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; joins(exec)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+.chord(async1, async2, handler1)</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; &nbsp;&nbsp;
+.chord(async2, async3, handler2);</span><span style="font-weight: bold;"></span><br>
+<span style="font-weight: bold;"></span>
+<div style="margin-left: 40px;"><span style="font-weight: bold;"></span></div>
+<ol start="6">
+ <li>executor</li>
+</ol>
+<div style="margin-left: 40px; font-weight: bold;">template
+&lt;template &lt;size_t&gt; class scheduler=sched_first_match, size_t
+sz=32&gt;<br>
+class executor :
+public Joint {<br>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+executor(int
+num_threads, const char *name = NULL);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; async&lt;task&gt;
+execute;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp;
+synch&lt;void,void&gt; shutdown;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;"></span><span style="font-weight: bold;">};</span><br>
+Chords with only
+async&lt;&gt; ports will run its body as an asynchronous task,
+either in a newly spawned thread, or in the thread
+pool of executor, or in exisiting applications' execution services
+(such as Boost.Asio's
+main threads). So executor is the "engine" of Join based
+applications. Different execution strategies can be used, such as a
+thread per request or thread pool, as long as it provides a
+functional / functor interface to spawn a task: <span
+ style="font-weight: bold;">void
+operator()(task t). </span><br>
+Here the default executor class is thread
+pool based and defined using
+async&lt;&gt; / synch&lt;&gt; ports and chords,&nbsp; and is a good
+sample
+of
+Join.<br>
+<br style="font-weight: bold;">
+<span style="font-weight: bold;">executor(int num_threads, const char
+*name = NULL)</span>: <br>
+<ul>
+ <li>int num_threads: number of worker threads in executor's thread
+pool</li>
+ <li>const char *name: name of executor, a flag to turn on/off
+debugging messages<br>
+ </li>
+</ul>
+<span style="font-weight: bold;">async&lt;task&gt; execute</span>: an
+asynchronous port for applications to submit tasks; never block; it
+is the default <span style="font-weight: bold;">task queue</span> of
+executor.<br>
+<span style="font-weight: bold;">synch&lt;void()&gt; shutdown</span>:
+a synchronous port for application code to wait for executor to
+shutdown (and its threads to exit). All chords defined with <span
+ style="font-weight: bold;">shutdown</span> have a low priority (1)
+than normal (0), so shutdown will return when all submited tasks have
+been finished and all worker threads exit.<br>
+</div>
+<ol start="7">
+ <li>support of group/bundle of async&lt;&gt; ports:</li>
+</ol>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">boost::std::vector&lt;async&lt;</span><span
+ style="font-weight: bold;">MsgT</span><span style="font-weight: bold;">&gt;
+&gt;<br>
+</span><span style="font-weight: bold;"></span>In C&#969;&nbsp; the chords
+are defined with a&nbsp; few async/synch ports with distinct names.
+What if
+we want to
+define chords with a group / std::vector of asynchronous message ports
+of the same message type? Inspired by
+similar abstractions in CCR[4] and C#.Joins[3], Join allows the
+definition of an std::vector of async ports and their participation
+in chord as a whole. Since there are at most one synch port per
+chord, so there is no support for std::vector of synch ports. The
+argument type for vector of async ports will be vector&lt;MsgT&gt;.
+Please
+refer to the Group of Asynchronous port
+tutotail (join_many.cpp) for their usage. <span
+ style="font-weight: bold;"></span></div>
+<ol start="8">
+ <li>exceptions</li>
+</ol>
+<div style="margin-left: 40px;">
+<p>In C&#969; , many of the following errors related to the definitions of
+async/synch ports and chords will be found by the compiler. Since
+Join is implemented as a library, these errors can only be
+reported as the following exceptions during runtime:</p>
+<p><span style="font-weight: bold;">class
+join_exception : public
+std::exception</span></p>
+<p style="margin-left: 40px;"><span style="font-weight: bold;"></span>the
+base class of all Join related exceptions</p>
+<p><span style="font-weight: bold;">class not_in_chord_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When a async / synch port is
+declared
+and no chord include it, this port has no body defined similar
+to&nbsp; pure virtual port in C++ abstract classes (pure virtual ports
+have only signature / declaration, no body). When this class
+is
+instantiated and client code calls this port, not_in_chord_exception
+will be thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">class double_association_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">Every async / synch port should only
+be associated with one Joint. When attempting to use the port in the
+chords of more than one Joints, this exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">class
+queue_overflow_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">For basic flow control, async / synch
+port can be constructed with a limited queue size. When the buffered
+(unprocessed) messages exceed the queue size, this exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span><span
+ style="font-weight: bold;">class no_executor_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When an Joint contains chords with only
+async ports but without an executor associated, this exception is
+thrown.<br>
+</div>
+<p><span style="font-weight: bold;">class hidden_chord_exception :
+public
+join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When Joint's scheduling policy is
+fire_as_soon_as_possible (default), if the definition of a new chord
+contains all the ports of another chord, or vice versa, this exception
+will be thrown.<br>
+</div>
+<p><span style="font-weight: bold;">class too_many_ports_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When defining a new chord, the total
+number of async / synch ports (accumulated thru all the defined
+chords) exceeds the max_size of Joint, this exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;"></span></p>
+<p><span style="font-weight: bold;">class chord_override_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When chord_override() is called and no
+existing chord found with the same set of async / synch ports, this
+exception is thrown.<br>
+</div>
+<p><span style="font-weight: bold;">class chord_remove_exception :
+public join_exception<br>
+</span></p>
+<div style="margin-left: 40px;">When chord_remove() is called and no
+existing chord found with the same set of async / synch ports, this
+exception is thrown.<br>
+<br>
+</div>
+</div>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">class
+synch_not_1st_exception : public join_exception </span><br
+ style="font-weight: bold;">
+<p style="margin-left: 40px;">When chords are defined with synchronous
+port, however the synchronous port is not used as the first port
+of chord, this exception is thrown.<br>
+</p>
+<span style="font-weight: bold;"> class single_synch_exception : public
+join_exception </span><br>
+<p style="margin-left: 40px;">When chords are defined with more than
+one synchronous ports, this exception is thrown.<br>
+</p>
+<span style="font-weight: bold;"> class synch_time_out_exception :
+public
+join_exception </span><br>
+<p style="margin-left: 40px;">The call of a synchronous port is timed
+out.<br>
+</p>
+</div>
+<div style="margin-left: 40px;"></div>
+</body>
+</html>

Added: sandbox/join/libs/join/doc/tutorials.html
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/doc/tutorials.html 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,205 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
+<html>
+<head>
+ <meta content="text/html; charset=ISO-8859-1"
+ http-equiv="content-type">
+ <title>Chords and Joints</title>
+</head>
+<body>
+<h2>Samples</h2>
+<a href="#buffer"><br>
+Thread safe buffer and message queue</a>
+<br>
+Handle multiple message flows
+<br>
+Using channels as the interface between components<br>
+Joint's life cycle<br>
+<h3><a name="buffer"></a>Thread safe buffer and message queue</h3>
+a thread safe buffer can be defined in various manners.<br>
+<br>
+the most explicit version:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename V&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer: public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;V&gt; put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;V,void&gt; get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; chord(get, put,
+&amp;buffer::chord_body);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; V chord_body(void_t g, V p) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; return p;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer.cpp<br>
+<br>
+simplified version using boost::lambda or boost::phoenix and "joins()"
+factory function:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename V&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class buffer {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;V&gt; put;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;V,void&gt; get;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; buffer() {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; joins().chord(get,
+put, lambda::_2);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+buffer_lambda.cpp<br>
+buffer_phoenix.cpp<br>
+<br>
+Using boost::tuples, boost::phoenix, and "joins()" factory function, we
+can create message queue (essentially a thread safe buffer) in JoCaml
+style:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename V&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">boost::tuple&lt;async&lt;V&gt;,synch&lt;V,void&gt;
+&gt; create_msg_que() {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;V&gt; send;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;V,void&gt; recv;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; joins().chord(recv, send, arg2);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; return boost::make_tuple(send,
+recv);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}&nbsp;&nbsp; <br>
+...<br>
+&nbsp; async&lt;int&gt; send;<br>
+&nbsp; synch&lt;int, void&gt; recv;<br>
+&nbsp; boost::tie(send, recv) = create_msg_que&lt;int&gt;();<br>
+</span></div>
+producer_consumer.cpp<br>
+<h3><a name="flows"></a>Handle multiple message flows</h3>
+A simple demo of joint handling 3 message flows and performing
+different processing on messages when they are available in different
+channels:<br>
+<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">flows_bundle
+make_data_channels(joint::spawn_type e) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;int&gt; flow1;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;int&gt; flow2;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; async&lt;int&gt; flow3;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; joins(e)</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; .chord(flow1,
+flow2, std::cout &lt;&lt;&nbsp; (arg1 + arg2) &lt;&lt; "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; .chord(flow2,
+flow3, std::cout &lt;&lt;&nbsp; (arg1 - arg2) &lt;&lt; "\n")</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; .chord(flow1,
+flow3, std::cout &lt;&lt;&nbsp; (arg1 * arg2) &lt;&lt; "\n");</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; return boost::make_tuple(flow1,
+flow2, flow3);</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">}</span><br>
+</div>
+flows.cpp<br>
+<br>
+a sample of joining a vector of message flows into one output flow:<br>
+<div style="margin-left: 40px;"><span style="font-weight: bold;">template
+&lt;typename T&gt;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">class join_many : public joint {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">public:</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; vector&lt;async&lt;T&gt; &gt;
+inputs;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; synch&lt;vector&lt;T&gt;,
+void&gt; output;</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; join_many(int num) :
+inputs(num) {</span><br style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; chord(output,
+inputs, &amp;join_many::chord_body);</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; vector&lt;T&gt;
+chord_body(void_t out, vector&lt;T&gt; in) {</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp;&nbsp;&nbsp; return in;</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">&nbsp; }</span><br
+ style="font-weight: bold;">
+<span style="font-weight: bold;">};<br>
+</span></div>
+join_many.cpp<br>
+<h3><a name="chain"></a>Using channels as the interface between
+components</h3>
+A sample of a chain of nodes, each node exposes its input and output
+interfaces as channels.<br>
+Each node reads from its input, does some transformation of data, wait
+a second and send it to output interface, all these are performed in
+executor's thread pool.<br>
+The way to connect one node (A) to the next (B) is simple:&nbsp;
+A.output = B.input.<br>
+<div style="margin-left: 40px; font-weight: bold;">template
+&lt;typename T&gt;<br>
+class node {<br>
+&nbsp; int my_no;<br>
+&nbsp; boost::function&lt;T(T)&gt; func_;<br>
+public:<br>
+&nbsp; async&lt;T&gt; input;<br>
+&nbsp; async&lt;T&gt; output;<br>
+&nbsp; bool stop;<br>
+&nbsp; node(joint::spawn_type e, int n, boost::function&lt;T(T)&gt; f)
+: <br>
+&nbsp;&nbsp;&nbsp; my_no(n), func_(f), stop(false) {<br>
+&nbsp;&nbsp;&nbsp; joins(e).chord(input, bind(&amp;node::proc, this,
+_1));<br>
+&nbsp; }<br>
+private:<br>
+&nbsp; void proc(T in) {<br>
+&nbsp;&nbsp;&nbsp; try {<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; if(!stop) {<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; T o = func_(in);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; cout &lt;&lt; "node ["
+&lt;&lt; my_no &lt;&lt; "] recv [" &lt;&lt; in &lt;&lt;"] and send ["
+&lt;&lt; o &lt;&lt; "]" &lt;&lt; endl;<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; thread_sleep(1);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; output(o);<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; }<br>
+&nbsp;&nbsp;&nbsp; } catch (join_exception &amp;e) {<br>
+&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; cout &lt;&lt; "node [" &lt;&lt; my_no
+&lt;&lt; "] got exception: " &lt;&lt; e.what() &lt;&lt; "\n";<br>
+&nbsp;&nbsp;&nbsp; }<br>
+&nbsp; }<br>
+};<br>
+</div>
+chain.cpp<br>
+<br>
+<h3><span style="font-weight: bold;"><a name="lifecycle"></a>Joint's
+life cycle</span></h3>
+After the initial definition, a joint's set of chords can modified thru
+methods: chord_override(), chord_remove() or reset(). And if a joint's
+heartbeat is set during its creation, its set of chords will be
+auto-destroyed after "heartbeat" number of firings of chords.<br>
+Here is a sample demonstrating these joint's lifecycle related methods.<br>
+joint_lifetime.cpp<br>
+<br>
+</body>
+</html>

Added: sandbox/join/libs/join/examples/Jamfile.v2
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/Jamfile.v2 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,30 @@
+
+project
+ : requirements
+ <library>/boost/thread//boost_thread
+ <define>BOOST_ALL_NO_LIB=1
+ <threading>multi
+ ;
+
+obj buffer.obj : buffer.cpp ;
+exe buffer : buffer.obj ;
+obj buffer_phoenix.obj : buffer_phoenix.cpp ;
+exe buffer_phoenix : buffer_phoenix.obj ;
+obj buffer_lambda.obj : buffer_lambda.cpp ;
+exe buffer_lambda : buffer_lambda.obj ;
+obj bounded_buffer.obj : bounded_buffer.cpp ;
+exe bounded_buffer : bounded_buffer.obj ;
+obj join_many.obj : join_many.cpp ;
+exe join_many : join_many.obj ;
+obj flows.obj : flows.cpp ;
+exe flows : flows.obj ;
+obj prime_sieve.obj : prime_sieve.cpp ;
+exe prime_sieve : prime_sieve.obj ;
+obj joint_lifetime.obj : joint_lifetime.cpp ;
+exe joint_lifetime : joint_lifetime.obj ;
+obj chain.obj : chain.cpp ;
+exe chain : chain.obj ;
+obj producer_consumer.obj : producer_consumer.cpp ;
+exe producer_consumer : producer_consumer.obj ;
+obj async_call_ret.obj : async_call_ret.cpp ;
+exe async_call_ret : async_call_ret.obj ;

Added: sandbox/join/libs/join/examples/async_call_ret.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/async_call_ret.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,88 @@
+//
+// async_call_ret.cpp
+//
+// adapted from Cw sample
+//
+#include <boost/join/join.hpp>
+#include "boost/tuple/tuple.hpp"
+#include <iostream>
+
+using namespace std;
+using namespace boost;
+using namespace boost::join;
+
+logger log1("log");
+
+//server interface: expose an async ports taking requests
+//in the form of a string name and a response ports
+class IService {
+public:
+ async<tuple<string, async<string> > > Service;
+};
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+//server class, implement the server interface:
+//for each incoming request <name, respChan>, sleep a few seconds
+//send "name is done" to respChan
+class MyService : public joint, public IService {
+ int n;
+public:
+ MyService(int m, joint::spawn_type e) : joint(e), n(m) {
+ chord(Service, &MyService::service_fun);
+ }
+ void service_fun(tuple<string, async<string> > req) {
+ string srvName = tuples::get<0>(req);
+ async<string> replyChan = tuples::get<1>(req);
+ for(int i=0; i<n; i++) {
+ log1.stream() << srvName << " does " << i << logger::endl;
+ thread_sleep((n%2)?1:2);
+ }
+ replyChan(srvName+" is done");
+ }
+};
+
+//clients use Join2 to wait for responses from 2 servers
+class Result2 : public joint {
+public:
+ async<string> first;
+ async<string> second;
+ synch<tuple<string,string>,void> Wait;
+ Result2() {
+ chord(Wait, first, second, &Result2::wait_body);
+ }
+ tuple<string,string> wait_body(void_t w, string f, string s) {
+ return make_tuple(f,s);
+ }
+};
+
+int main(int argc, char **argv) {
+ executor exec(2);
+ //create 2 server threads
+ MyService s1(5, exec.execute);
+ MyService s2(10, exec.execute);
+ //main thread will play the role of clients
+ //create a Result2 for waiting for results from 2 servers
+ Result2 j;
+ //send to 2 servers requests with response ports packed inside
+ s1.Service(make_tuple("Service 1", j.first));
+ s2.Service(make_tuple("Service 2", j.second));
+ //client idle
+ for(int i=0; i<7; i++) {
+ log1.stream() << "client idle " << i << logger::endl;
+ thread_sleep(1);
+ }
+ string x,y;
+ //client blocking here waiting for responses from servers
+ log1.stream() << "client block wait " << logger::endl;
+ tie(x,y) = j.Wait();
+ log1.stream() << "first result = " << x << ", second result = " << y << logger::endl;
+ exec.shutdown();
+ return 0;
+}
+

Added: sandbox/join/libs/join/examples/bounded_buffer.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/bounded_buffer.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,83 @@
+//
+// bounded_buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+
+using namespace boost::join;
+
+logger log1("log");
+
+template <typename T>
+class BufferN : public joint {
+private:
+ async<void> token;
+ async<T> value;
+
+public:
+ synch<void,T> put;
+ synch<T,void> get;
+
+ BufferN(int sz) {
+ chord(put, token, &BufferN::put_cb);
+ chord(get, value, &BufferN::get_cb);
+ for(int i=0; i<sz; i++)
+ token();
+ }
+ void put_cb(T put, void_t) {
+ value(put);
+ }
+ T get_cb(void_t, T val) {
+ token();
+ return val;
+ }
+};
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+void producer(synch<void,int> put) {
+ for(int i=0; i<2000000; i++) {
+ put(i);
+ log1.stream() << "producer sends [" << i << "]" << logger::endl;
+ //thread_sleep(2);
+ }
+}
+void consumer1(synch<int,void> get) {
+ for(int i=0; i<1000000; i++) {
+ log1.stream() << "consumer1 recvs [" << get() << "]" << logger::endl;
+ //thread_sleep(3);
+ }
+}
+void consumer2(synch<int,void> get) {
+ for(int i=0; i<1000000; i++) {
+ log1.stream() << "consumer2 recvs [" << get() << "]" << logger::endl;
+ //thread_sleep(1);
+ }
+}
+
+int main(int argc, char **argv) {
+ executor exec(3);
+
+ //create bounded buffer
+ BufferN<int> buf(5);
+
+ //create test tasks
+ exec.spawn(boost::bind(consumer1, buf.get));
+ exec.spawn(boost::bind(consumer2, buf.get));
+ exec.spawn(boost::bind(producer, buf.put));
+
+ exec.shutdown();
+ return 0;
+}
+

Added: sandbox/join/libs/join/examples/buffer.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/buffer.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,77 @@
+//
+// buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+
+using namespace boost;
+using namespace boost::join;
+
+logger log1("log");
+
+template <typename V>
+class buffer: public joint {
+public:
+ async<V> put;
+ synch<V,void> get;
+ buffer() {
+ chord(get, put, &buffer::chord_body);
+ }
+ V chord_body(void_t g, V p) {
+ return p;
+ }
+};
+
+void hello_world(buffer<std::string> &b) {
+ b.put("hello");
+ b.put("world");
+ log1.msg(b.get()+" ");
+ log1.msg(b.get());
+}
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+void producer(async<std::string> put) {
+ std::ostringstream ostr;
+ for(int i=0; i<5; i++) {
+ ostr << i;
+ put(ostr.str());
+ log1.stream() << "producer sends [" << i << "]" << logger::endl;
+ ostr.str("");
+ thread_sleep(1);
+ }
+}
+void consumer(synch<std::string,void> get) {
+ for(int i=0; i<5; i++) {
+ log1.stream() << "consumer recvs [" << get() << "]" << logger::endl;
+ thread_sleep(2);
+ }
+}
+
+int main(int argc, char **argv) {
+ buffer<std::string> b;
+ hello_world(b);
+
+ executor exec(2); //spawn 2 threads for executor thread pool
+
+ //spawn test tasks
+ exec.spawn(boost::bind(consumer, b.get));
+ exec.spawn(boost::bind(producer, b.put));
+
+ exec.shutdown();
+ return 0;
+}

Added: sandbox/join/libs/join/examples/buffer_lambda.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/buffer_lambda.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,76 @@
+//
+// buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+#include <boost/lambda/lambda.hpp>
+
+using namespace boost;
+using namespace boost::join;
+using namespace boost::lambda;
+
+logger log1("log");
+
+template <typename V>
+class buffer {
+public:
+ async<V> put;
+ synch<V,void> get;
+ buffer() {
+ joins().chord(get, put, lambda::_2);
+ }
+};
+
+void hello_world(buffer<std::string> &b) {
+ b.put("hello");
+ b.put("world");
+ log1.msg(b.get()+" ");
+ log1.msg(b.get());
+}
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+void producer(async<std::string> put) {
+ std::ostringstream ostr;
+ for(int i=0; i<5; i++) {
+ ostr << i;
+ put(ostr.str());
+ log1.stream() << "producer sends [" << i << "]" << logger::endl;
+ ostr.str("");
+ thread_sleep(1);
+ }
+}
+void consumer(synch<std::string,void> get) {
+ for(int i=0; i<5; i++) {
+ log1.stream() << "consumer recvs [" << get() << "]" << logger::endl;
+ thread_sleep(2);
+ }
+}
+
+int main(int argc, char **argv) {
+ buffer<std::string> b;
+ hello_world(b);
+
+ executor exec(2); //spawn 2 threads for executor thread pool
+
+ //spawn test tasks
+ exec.spawn(boost::bind(consumer, b.get));
+ exec.spawn(boost::bind(producer, b.put));
+
+ exec.shutdown();
+ return 0;
+}

Added: sandbox/join/libs/join/examples/buffer_phoenix.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/buffer_phoenix.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,76 @@
+//
+// buffer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+#include <boost/spirit/include/phoenix1.hpp>
+
+using namespace boost;
+using namespace boost::join;
+using namespace phoenix;
+
+logger log1("log");
+
+template <typename V>
+class buffer {
+public:
+ async<V> put;
+ synch<V,void> get;
+ buffer() {
+ joins().chord(get, put, arg2);
+ }
+};
+
+void hello_world(buffer<std::string> &b) {
+ b.put("hello");
+ b.put("world");
+ log1.msg(b.get()+" ");
+ log1.msg(b.get());
+}
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+void producer(async<std::string> put) {
+ std::ostringstream ostr;
+ for(int i=0; i<5; i++) {
+ ostr << i;
+ put(ostr.str());
+ log1.stream() << "producer sends [" << i << "]" << logger::endl;
+ ostr.str("");
+ thread_sleep(1);
+ }
+}
+void consumer(synch<std::string,void> get) {
+ for(int i=0; i<5; i++) {
+ log1.stream() << "consumer recvs [" << get() << "]" << logger::endl;
+ thread_sleep(2);
+ }
+}
+
+int main(int argc, char **argv) {
+ buffer<std::string> b;
+ hello_world(b);
+
+ executor exec(2); //spawn 2 threads for executor thread pool
+
+ //spawn test tasks
+ exec.spawn(boost::bind(consumer, b.get));
+ exec.spawn(boost::bind(producer, b.put));
+
+ exec.shutdown();
+ return 0;
+}

Added: sandbox/join/libs/join/examples/chain.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/chain.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,96 @@
+//
+// chain.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+#include <boost/spirit/include/phoenix1.hpp>
+
+using namespace boost;
+using namespace boost::join;
+using namespace std;
+using namespace phoenix;
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+//the node in the chain:
+//read data from input, transform its value, wait a sec
+//and send the new value to output
+//all these are running in executor thread pool
+template <typename T>
+class node {
+ int my_no;
+ boost::function<T(T)> func_;
+public:
+ async<T> input;
+ async<T> output;
+ bool stop;
+
+ node(joint::spawn_type e, int n, boost::function<T(T)> f) :
+ my_no(n), func_(f), stop(false) {
+ joins(e).chord(input, bind(&node::proc, this, _1));
+ }
+
+private:
+ void proc(T in) {
+ try {
+ if(!stop) {
+ T o = func_(in);
+ cout << "node [" << my_no << "] recv [" << in <<"] and send [" << o << "]" << endl;
+ thread_sleep(1);
+ output(o);
+ }
+ } catch (join_exception &e) {
+ cout << "node [" << my_no << "] got exception: " << e.what() << "\n";
+ }
+ }
+};
+
+enum {
+ num_nodes = 10,
+ num_data = 5
+};
+
+int main(int, char**) {
+ executor exec(3);
+ vector<node<int>*> nodes;
+ nodes.reserve(num_nodes);
+
+ //create nodes
+ for(int i=0; i<num_nodes; i++) {
+ nodes.push_back(new node<int>(exec.execute, i, arg1+val(1)));
+ }
+
+ //setup chaining, please note we should connect output to input
+ for(int j=0;j<num_nodes;j++) {
+ if(j<(num_nodes-1)) {
+ nodes[j]->output = nodes[j+1]->input;
+ }
+ }
+ nodes[num_nodes-1]->output = nodes[0]->input;
+
+ //pump some data into chain
+ for(int i=0;i<num_data;i++) {
+ nodes[0]->input(i);
+ thread_sleep(1);
+ }
+
+ //wait for while to see the chain-reaction
+ thread_sleep(20);
+
+ //stop the chain
+ for(int i=0; i<num_nodes; i++)
+ nodes[i]->stop = true;
+ exec.shutdown();
+ return 0;
+}

Added: sandbox/join/libs/join/examples/flows.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/flows.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,78 @@
+//
+// flows.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+#include <boost/spirit/include/phoenix1.hpp>
+#include "boost/tuple/tuple.hpp"
+
+using namespace boost;
+using namespace boost::join;
+using namespace phoenix;
+
+logger log1("log");
+
+typedef boost::tuple<async<int>, async<int>, async<int> > flows_bundle;
+
+flows_bundle make_data_channels(joint::spawn_type e) {
+ async<int> flow1;
+ async<int> flow2;
+ async<int> flow3;
+ joins_t<sched_first_match,32>(e)
+ .chord(flow1, flow2, std::cout << val('(') << arg1 << " + " << arg2 << ") = " << (arg1 + arg2) << "\n")
+ .chord(flow2, flow3, std::cout << val('(') << arg1 << " - " << arg2 << ") = " << (arg1 - arg2) << "\n")
+ .chord(flow1, flow3, std::cout << val('(') << arg1 << " * " << arg2 << ") = " << (arg1 * arg2) << "\n");
+ return boost::make_tuple(flow1, flow2, flow3);
+}
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+void producer1(async<int> chan1) {
+ for(int i=0; i<12; i++) {
+ chan1(i);
+ log1.stream() << "producer1 sends [" << i << "]" << logger::endl;
+ thread_sleep(1);
+ }
+}
+void producer2(async<int> chan2) {
+ for(int i=0; i<10; i++) {
+ chan2(i);
+ log1.stream() << "producer2 sends [" << i << "]" << logger::endl;
+ thread_sleep(2);
+ }
+}
+void producer3(async<int> chan3) {
+ for(int i=0; i<12; i++) {
+ chan3(i);
+ log1.stream() << "producer3 sends [" << i << "]" << logger::endl;
+ thread_sleep(1);
+ }
+}
+
+int main(int argc, char **argv) {
+ executor exec(5);
+
+ //create data channels
+ flows_bundle flows = make_data_channels(exec.execute);
+
+ //spawn test tasks
+ exec.spawn(boost::bind(producer1, tuples::get<0>(flows)));
+ exec.spawn(boost::bind(producer2, tuples::get<1>(flows)));
+ exec.spawn(boost::bind(producer3, tuples::get<2>(flows)));
+
+ thread_sleep(10);
+ exec.shutdown();
+ return 0;
+}
+

Added: sandbox/join/libs/join/examples/join_many.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/join_many.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,83 @@
+//
+// join_many.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <iostream>
+#include <vector>
+
+using namespace std;
+using namespace boost;
+using namespace boost::join;
+
+logger log1("log");
+
+// join multiple input streams into one output stream
+template <typename T>
+class join_many : public joint {
+public:
+ vector<async<T> > inputs;
+ synch<vector<T>, void> output;
+ join_many(int num) : inputs(num) {
+ chord(output, inputs, &join_many::chord_body);
+ }
+ vector<T> chord_body(void_t out, vector<T> in) {
+ return in;
+ }
+};
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+enum {
+ num_chan = 4,
+ num_test = 5
+};
+
+void collect(synch<vector<int>,void> output) {
+ vector<int> results;
+ for(int i=0; i<num_test; i++) {
+ results = output();
+ log1.stream() << "output" << i << " = " << logger::end;
+ for(size_t j=0;j<num_chan;j++)
+ log1.stream() << results[j] << " " << logger::end;
+ log1.stream() << logger::endl;
+ }
+}
+
+void distributor(int p, async<int> input) {
+ int start = p*num_test;
+ int end = start+num_test;
+ for(int i=start; i<end; i++) {
+ log1.stream() << "inputer[" << p << "] sends [" << i << "] and wait..." << logger::endl;
+ input(i);
+ thread_sleep((p+1)%3);
+ }
+}
+
+int main(int argc, char **argv) {
+ executor exec(num_chan); //spawn 4 threads for executor thread pool
+
+ join_many<int> merger(num_chan);
+
+ //spawn distributor tasks
+ for(int i=0; i<num_chan; i++)
+ exec.spawn(boost::bind(distributor, i, merger.inputs[i]));
+
+ //main thread wait and collect results
+ log1.msg("main thread starts waiting...");
+ collect(merger.output);
+ log1.msg("main thread finish waiting...");
+
+ exec.shutdown();
+ return 0;
+}

Added: sandbox/join/libs/join/examples/joint_lifetime.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/joint_lifetime.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,153 @@
+//
+// jocaml1.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <boost/bind.hpp>
+
+using namespace boost::join;
+using namespace boost;
+using namespace std;
+
+logger log1("log");
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+//define 2 plain functions as message handlers
+void apple_pie(string a, void_t) {
+ log1.stream() << a << " apple pie" << logger::endl;
+}
+
+void raspberry_pie(string r, void_t) {
+ log1.stream() << r << " raspberry pie" << logger::endl;
+}
+
+//define a class using its object's methods as message handlers
+class fruit_pie {
+ int count;
+public:
+ fruit_pie(int c) : count(c) {}
+ void apple(string a, void_t) {
+ log1.stream() << a << " apple pie " << count << logger::endl;
+ }
+ void raspberry(string r, void_t) {
+ log1.stream() << r << " raspberry pie " << count << logger::endl;
+ }
+ void blueberry(string r, void_t) {
+ log1.stream() << r << " blueberry pie " << count << logger::endl;
+ }
+ void strange(string a, void_t) {
+ log1.stream() << "STRANGE " << a << " apple pie " << count << logger::endl;
+ }
+};
+
+
+int main(int argc, char **argv) {
+ //define a few asynchronous message ports
+ async<string> apple;
+ async<string> raspberry;
+ async<string> blueberry;
+ async<void> pie;
+
+ //create an executor with 2 threads in pool
+ executor exec(2);
+
+ //test1: a new joint obj is created, used, reused and destroyed at end of scope
+ try
+ {
+ //define an joint/joint object with a few chords/join-patterns to specify
+ //what to do with messages at ports : apple(), raspberry() and pie()
+ joint joins(exec.execute);
+ joins
+ .chord(apple, pie, apple_pie)
+ .chord(raspberry, pie, raspberry_pie);
+
+ //send a few messages to test
+ pie(); pie();
+ raspberry("green");
+ apple("red");
+
+ //clear all chords (join patterns or coordination definitions) in "joins" to reuse it
+ joins.reset();
+
+ //redefine 2 chords with different processing code
+ fruit_pie fp1(1);
+ joins.chord(apple, pie, bind(&fruit_pie::apple, fp1, _1, _2))
+ .chord(raspberry, pie, bind(&fruit_pie::raspberry, fp1, _1, _2));
+
+ //send more test messages
+ pie(); pie();
+ raspberry("purple");
+ apple("yellow");
+
+ //override chord <apple,pie> to print diff msg
+ joins.chord_override(apple, pie, bind(&fruit_pie::strange, fp1, _1, _2));
+ apple("white"); pie();
+
+ //add a new chord into existing chords
+ joins.chord(blueberry, pie, bind(&fruit_pie::blueberry, fp1, _1, _2));
+ blueberry("dark"); pie();
+
+ try {
+ //test unbound async methods/ports
+ //an exception should be thrown here since
+ //all joiners/joints are gone and async port "pie"
+ //is not associated with any joint
+ pie.detach();
+ pie();
+ }
+ catch (join_exception &e) {
+ log1.stream() << "Caught unbound-pie exception: " << e.what() << logger::endl;
+ }
+
+ //must reset this joint to reuse the ports with another joint
+ joins.reset();
+ }
+ catch (join_exception &e) {
+ log1.stream() << "Caught exception: " << e.what() << logger::endl;
+ }
+
+ //test2: another "unamed" joint object is used
+ try
+ {
+ //define an joint object with 3 heartbeats (its chords can fire 3 times, then it will self-destroy)
+ fruit_pie fp2(2);
+ joins(exec.execute, 3)
+ .chord(apple, pie, bind(&fruit_pie::apple, fp2, _1, _2))
+ .chord(raspberry, pie, bind(&fruit_pie::raspberry, fp2, _1, _2))
+ .chord(blueberry, pie, bind(&fruit_pie::blueberry, fp2, _1, _2));
+
+ //send some test messages
+ pie(); pie();
+ raspberry("sour");
+ apple("dull");
+ blueberry("sweet"); pie();
+
+ //since joint heartbeat count expires here, it should be destroyed and all ports detached,
+ //calling ports now will throw exception
+ raspberry("bitter");
+ }
+ catch (join_exception &e) {
+ log1.stream() << "Caught bitter-raspberry exception: " << e.what() << logger::endl;
+ }
+
+ try {
+ exec.shutdown();
+ }
+ catch(join_exception &e) {
+ log1.stream() << "Caught exception: " << e.what() << logger::endl;
+ }
+
+ return 0;
+}
+

Added: sandbox/join/libs/join/examples/prime_sieve.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/prime_sieve.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,80 @@
+//
+// prime_sieve.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <boost/join/join.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/xtime.hpp>
+#include <string>
+#include <iostream>
+
+using namespace std;
+using namespace boost::join;
+
+logger log1("log");
+
+typedef rr_executor exec_type;
+
+class prime_task : public joint {
+ int my_prime_;
+ int my_no_;
+ prime_task * next_;
+ exec_type *e_;
+ //using async<> methods to represent states
+ async<void> init;
+ async<void> ready;
+public:
+ async<int> sieve;
+ void sieve_init_body(int value, void_t init) {
+ my_prime_ = value;
+ log1.stream() << "------ prime_task [" << my_no_ << "] found prime = " << my_prime_ << logger::endl;
+ next_ = new prime_task(e_, my_no_+1); //create the next task
+ ready();
+ }
+ void sieve_ready_body(int value, void_t redy) {
+ int val = value;
+ ready(); //allow processing the next incoming number as soon as possible
+ if (val % my_prime_) { //not my multiples
+ next_->sieve(val);
+ } else { //my multiples
+ log1.stream() << "prime_task [" << my_no_ << "] drop " << val << logger::endl;
+ }
+ }
+ prime_task(exec_type *e, int no) :
+ joint(e->task_queue(no)), my_prime_(-1), my_no_(no), next_(0), e_(e) {
+ chord(sieve, init, &prime_task::sieve_init_body);
+ chord(sieve, ready, &prime_task::sieve_ready_body);
+ init(); //initialize joint's state
+ }
+ ~prime_task() {
+ if (next_ != 0) delete next_;
+ }
+};
+
+int main(int argc, char **argv) {
+ exec_type exec(4);
+
+ int max_num;
+ if (argc > 1)
+ max_num = atoi(argv[1]);
+ else
+ max_num = 1000;
+
+ log1.stream() << "find primes in range [2-" << max_num << "]" << logger::endl;
+
+ //create prime_task0
+ prime_task first_task(&exec,0);
+
+ //generate integer series to be sieved and feed them to the chain of tasks
+ for(int i=2; i<=max_num; i++)
+ first_task.sieve(i);
+
+ log1.msg("main thread shutdown...");
+ exec.shutdown();
+ return 0;
+}

Added: sandbox/join/libs/join/examples/producer_consumer.cpp
==============================================================================
--- (empty file)
+++ sandbox/join/libs/join/examples/producer_consumer.cpp 2009-04-19 20:00:35 EDT (Sun, 19 Apr 2009)
@@ -0,0 +1,67 @@
+//
+// producer_consumer.cpp
+//
+// Copyright (c) 2007 - 2009 Yigong Liu (yigongliu at gmail dot com)
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <boost/function.hpp>
+#include <boost/join/join.hpp>
+#include <boost/spirit/include/phoenix1.hpp>
+#include "boost/tuple/tuple.hpp"
+
+using namespace boost;
+using namespace boost::join;
+using namespace phoenix;
+
+logger log1("log");
+
+void thread_sleep(int sec) {
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += sec;
+ boost::thread::sleep(xt);
+}
+
+template <typename V>
+boost::tuple<async<V>,synch<V,void> > create_msg_que() {
+ async<V> send;
+ synch<V,void> recv;
+ joins().chord(recv, send, arg2);
+ return boost::make_tuple(send, recv);
+}
+
+void producer(async<int> send) {
+ for(int i=0;i<10;i++) {
+ send(i);
+ log1.stream() << "producer sends: " << i << logger::endl;
+ thread_sleep(1);
+ }
+}
+
+void consumer(synch<int,void> recv) {
+ for(int i=0; i<10; i++) {
+ log1.stream() << "consumer recvs: " << recv() << logger::endl;
+ }
+}
+
+int main(int argc, char **argv) {
+ executor exec(2); //spawn 2 threads for executor thread pool
+
+ //create msg que
+ async<int> send;
+ synch<int, void> recv;
+ boost::tie(send, recv) = create_msg_que<int>();
+
+ //spawn prod/consum tasks
+ exec.spawn(boost::bind(consumer, recv));
+ exec.spawn(boost::bind(producer, send));
+
+ exec.shutdown();
+ return 0;
+}


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk