#include #include #include #include #include #include #include namespace mpi = boost::mpi; /// A very simple MPI message class hello_msg { int seqnr_; int rank_; std::string node_; std::string os_; public: /// Ctor, using mpi::communicator instance to get rank no. hello_msg(const mpi::communicator& comm, const int seqnr) : seqnr_(seqnr) { rank_ = comm.rank(); // get nodename and os from uname(2) struct utsname uname_; uname(&uname_); node_ = uname_.nodename; os_ = uname_.sysname; }; /// Default ctor, to use only just before serialization hello_msg() : seqnr_(-1), rank_(-1), node_("unknown"), os_("unknown") { }; // print out the greeeting friend std::ostream& operator<< (std::ostream& o, hello_msg const& msg) { o << "Hello #" << msg.seqnr_ << " from MPI rank " << msg.rank_ << " running on host " << msg.node_ << " (" << msg.os_ <<")"; }; protected: // provide serialization of this class, for MPI send/recv friend class boost::serialization::access; template void serialize(Archive& ar, const unsigned int version) { ar & seqnr_ & rank_ & node_ & os_; }; }; int main(int argc, char** argv) { mpi::environment env(argc, argv); mpi::communicator world; int myrank = world.rank(); const int downStreamTaskTag = 0; const int upStreamTaskTag = 1; const int master = 0; const int tasks_per_node = 5; if (master == myrank) { // master std::cout << "This is rank 0 (master) starting ..." << std::endl; int totalTaskNum = world.size() * tasks_per_node; // remember the handles of the (sent) "downStream" messages std::vector sent; // handles and messages of the (recv'd) "upStream" messages std::vector recv_requests; std::vector recv_msgs; // send "tasks_per_node" messages to each MPI rank for (int n = 0; n < tasks_per_node; ++n) { for (int worker = 1; worker < world.size(); ++worker) { hello_msg ping(world, n); // send downstream msg sent.push_back(world.isend(worker, downStreamTaskTag, ping)); std::cout << "master sent '" << ping << "' to worker " << worker << std::endl; // post recv for corresponding "upstream" msg recv_msgs.push_back(new hello_msg); recv_requests.push_back(world.irecv(worker, upStreamTaskTag, *(recv_msgs.back()))); }; // end: for(worker=1; ...) // the block delimited by '----' below cannot be put outside of // the `for(n = 0; ..)` loop, or MPI_ERR_TRUNCATE will happen // ------------------------------------------------------------------ // wait on sent/downstream messages, to free resources mpi::wait_all(sent.begin(), sent.end()); std::cout << "master: sent message batch #" << n << std::endl; sent.clear(); // wait for all "upstream" messages to come back mpi::wait_all(recv_requests.begin(), recv_requests.end()); for (std::vector::const_iterator it = recv_msgs.begin(); it != recv_msgs.end(); ++it) { std::cout << "master received: " << **it << std::endl; delete *it; // free memory }; recv_requests.clear(); recv_msgs.clear(); std::cout << "master: receive batch #" << n << std::endl; // ------------------------------------------------------------------ }; // end: for(n=0; ...) } else { // worker node std::cout << "This is rank " << myrank << " (worker) starting ..." << std::endl; std::vector sent; int n = 0; while(true) { hello_msg ping; world.recv(master, downStreamTaskTag, ping); std::cout << "Worker " <