Boost logo

Boost-Commit :

From: dgregor_at_[hidden]
Date: 2008-02-07 16:08:10


Author: dgregor
Date: 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
New Revision: 43165
URL: http://svn.boost.org/trac/boost/changeset/43165

Log:
Support for non-blocking MPI operations in Python, from Andreas Kloeckner
Added:
   trunk/libs/mpi/src/python/py_nonblocking.cpp (contents, props changed)
   trunk/libs/mpi/src/python/request_with_value.hpp (contents, props changed)
   trunk/libs/mpi/test/python/nonblocking_test.py (contents, props changed)
Text files modified:
   trunk/libs/mpi/build/Jamfile.v2 | 1
   trunk/libs/mpi/build/__init__.py | 4
   trunk/libs/mpi/src/python/documentation.cpp | 150 ++++++++++++++++++++++++++++++++--------
   trunk/libs/mpi/src/python/module.cpp | 4
   trunk/libs/mpi/src/python/py_communicator.cpp | 13 +-
   trunk/libs/mpi/src/python/py_exception.cpp | 9 +-
   trunk/libs/mpi/src/python/py_request.cpp | 90 +++++++++++++++++------
   trunk/libs/mpi/src/python/py_timer.cpp | 2
   trunk/libs/mpi/src/python/skeleton_and_content.cpp | 21 ++--
   trunk/libs/mpi/src/python/status.cpp | 2
   trunk/tools/build/v2/tools/mpi.jam | 6 +
   11 files changed, 219 insertions(+), 83 deletions(-)

Modified: trunk/libs/mpi/build/Jamfile.v2
==============================================================================
--- trunk/libs/mpi/build/Jamfile.v2 (original)
+++ trunk/libs/mpi/build/Jamfile.v2 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -82,6 +82,7 @@
         python/datatypes.cpp
         python/documentation.cpp
         python/py_environment.cpp
+ python/py_nonblocking.cpp
         python/py_exception.cpp
         python/module.cpp
         python/py_request.cpp

Modified: trunk/libs/mpi/build/__init__.py
==============================================================================
--- trunk/libs/mpi/build/__init__.py (original)
+++ trunk/libs/mpi/build/__init__.py 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -1,10 +1,10 @@
 import sys
 if sys.platform == 'linux2':
- import dl
+ import DLFCN as dl
     flags = sys.getdlopenflags()
     sys.setdlopenflags(dl.RTLD_NOW|dl.RTLD_GLOBAL)
     import mpi
     sys.setdlopenflags(flags)
 else:
-import mpi
+ import mpi
 

Modified: trunk/libs/mpi/src/python/documentation.cpp
==============================================================================
--- trunk/libs/mpi/src/python/documentation.cpp (original)
+++ trunk/libs/mpi/src/python/documentation.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -132,10 +132,10 @@
   "\n"
   " Once you have registered your C++ data structures, you can extract\n"
   " the skeleton for an instance of that data structure with skeleton().\n"
- " The resulting skeleton_proxy can be transmitted via the normal send\n"
+ " The resulting SkeletonProxy can be transmitted via the normal send\n"
   " routine, e.g.,\n\n"
   " mpi.world.send(1, 0, skeleton(my_data_structure))\n\n"
- " skeleton_proxy objects can be received on the other end via recv(),\n"
+ " SkeletonProxy objects can be received on the other end via recv(),\n"
   " which stores a newly-created instance of your data structure with the\n"
   " same `shape' as the sender in its `object' attribute:\n\n"
   " shape = mpi.world.recv(0, 0)\n"
@@ -212,6 +212,86 @@
   "Determine if the MPI environment has already been finalized.\n";
 
 /***********************************************************
+ * nonblocking documentation *
+ ***********************************************************/
+const char* request_list_init_docstring=
+ "Without arguments, constructs an empty RequestList.\n"
+ "With one argument `iterable', copies request objects from this\n"
+ "iterable to the new RequestList.\n";
+
+const char* nonblocking_wait_any_docstring =
+ "Waits until any of the given requests has been completed. It provides\n"
+ "functionality equivalent to MPI_Waitany.\n"
+ "\n"
+ "requests must be a RequestList instance.\n"
+ "\n"
+ "Returns a triple (value, status, index) consisting of received value\n"
+ "(or None), the Status object for the completed request, and its index\n"
+ "in the RequestList.\n";
+
+const char* nonblocking_test_any_docstring =
+ "Tests if any of the given requests have been completed, but does not wait\n"
+ "for completion. It provides functionality equivalent to MPI_Testany.\n"
+ "\n"
+ "requests must be a RequestList instance.\n"
+ "\n"
+ "Returns a triple (value, status, index) like wait_any or None if no request\n"
+ "is complete.\n";
+
+const char* nonblocking_wait_all_docstring =
+ "Waits until all of the given requests have been completed. It provides\n"
+ "functionality equivalent to MPI_Waitall.\n"
+ "\n"
+ "requests must be a RequestList instance.\n"
+ "\n"
+ "If the second parameter `callable' is provided, it is called with each\n"
+ "completed request's received value (or None) and it s Status object as\n"
+ "its arguments. The calls occur in the order given by the `requests' list.\n";
+
+const char* nonblocking_test_all_docstring =
+ "Tests if all of the given requests have been completed. It provides\n"
+ "functionality equivalent to MPI_Testall.\n"
+ "\n"
+ "Returns True if all requests have been completed.\n"
+ "\n"
+ "requests must be a RequestList instance.\n"
+ "\n"
+ "If the second parameter `callable' is provided, it is called with each\n"
+ "completed request's received value (or None) and it s Status object as\n"
+ "its arguments. The calls occur in the order given by the `requests' list.\n";
+
+const char* nonblocking_wait_some_docstring =
+ "Waits until at least one of the given requests has completed. It\n"
+ "then completes all of the requests it can, partitioning the input\n"
+ "sequence into pending requests followed by completed requests.\n"
+ "\n"
+ "This routine provides functionality equivalent to MPI_Waitsome.\n"
+ "\n"
+ "Returns the index of the first completed request."
+ "\n"
+ "requests must be a RequestList instance.\n"
+ "\n"
+ "If the second parameter `callable' is provided, it is called with each\n"
+ "completed request's received value (or None) and it s Status object as\n"
+ "its arguments. The calls occur in the order given by the `requests' list.\n";
+
+const char* nonblocking_test_some_docstring =
+ "Tests to see if any of the given requests has completed. It completes\n"
+ "all of the requests it can, partitioning the input sequence into pending\n"
+ "requests followed by completed requests. This routine is similar to\n"
+ "wait_some, but does not wait until any requests have completed.\n"
+ "\n"
+ "This routine provides functionality equivalent to MPI_Testsome.\n"
+ "\n"
+ "Returns the index of the first completed request."
+ "\n"
+ "requests must be a RequestList instance.\n"
+ "\n"
+ "If the second parameter `callable' is provided, it is called with each\n"
+ "completed request's received value (or None) and it s Status object as\n"
+ "its arguments. The calls occur in the order given by the `requests' list.\n";
+
+/***********************************************************
  * exception documentation *
  ***********************************************************/
 const char* exception_docstring =
@@ -310,14 +390,14 @@
  * communicator documentation *
  ***********************************************************/
 const char* communicator_docstring =
- "The communicator class abstracts a set of communicating\n"
+ "The Communicator class abstracts a set of communicating\n"
  "processes in MPI. All of the processes that belong to a certain\n"
  "communicator can determine the size of the communicator, their rank\n"
  "within the communicator, and communicate with any other processes\n"
  "in the communicator.\n";
 
 const char* communicator_default_constructor_docstring =
- "Build a new Boost.MPI communicator for MPI_COMM_WORLD.\n";
+ "Build a new Boost.MPI Communicator instance for MPI_COMM_WORLD.\n";
 
 const char* communicator_rank_docstring =
   "Returns the rank of the process in the communicator, which will be a\n"
@@ -335,10 +415,10 @@
   " - For C++ objects registered via register_serialized(), the value\n"
   " will be serialized and transmitted.\n"
   "\n"
- " - For skeleton_proxy objects, the skeleton of the object will be\n"
+ " - For SkeletonProxy objects, the skeleton of the object will be\n"
   " serialized and transmitted.\n"
   "\n"
- " - For content objects, the content will be transmitted directly.\n"
+ " - For Content objects, the content will be transmitted directly.\n"
   " This content can be received by a matching recv/irecv call that\n"
   " provides a suitable `buffer' argument.\n"
   "\n"
@@ -351,12 +431,12 @@
   "the message can be received from any process. Likewise, if the tag\n"
   "parameter is not specified, a message with any tag can be received.\n"
   "If return_status is True, returns a tuple containing the received\n"
- "object followed by a status object describing the communication.\n"
+ "object followed by a Status object describing the communication.\n"
   "Otherwise, recv() returns just the received object.\n"
   "\n"
   "When receiving the content of a data type that has been sent separately\n"
   "from its skeleton, user code must provide a value for the `buffer'\n"
- "argument. This value should be the content object returned from\n"
+ "argument. This value should be the Content object returned from\n"
   "get_content().\n";
 
 const char* communicator_isend_docstring =
@@ -364,7 +444,7 @@
   "tag to the process with rank dest. It can be received by the\n"
   "destination process with a matching recv call. The value will be\n"
   "transmitted in the same way as with send().\n"
- "This routine returns a request object, which can be used to query\n"
+ "This routine returns a Request object, which can be used to query\n"
   "when the transmission has completed, wait for its completion, or\n"
   "cancel the transmission.\n";
 
@@ -373,15 +453,15 @@
   "source with the given tag. If the source parameter is not specified,\n"
   "the message can be received from any process. Likewise, if the tag\n"
   "parameter is not specified, a message with any tag can be received.\n"
- "This routine returns a request object, which can be used to query\n"
+ "This routine returns a Request object, which can be used to query\n"
   "when the transmission has completed, wait for its completion, or\n"
   "cancel the transmission. The received value be accessible\n"
- "through the `value' attribute of the request object once transmission\n"
+ "through the `value' attribute of the Request object once transmission\n"
   "has completed.\n"
   "\n"
   "As with the recv() routine, when receiving the content of a data type\n"
   "that has been sent separately from its skeleton, user code must provide\n"
- "a value for the `buffer' argument. This value should be the content\n"
+ "a value for the `buffer' argument. This value should be the Content\n"
   "object returned from get_content().\n";
 
  const char* communicator_probe_docstring =
@@ -389,7 +469,7 @@
   "is available to be received. It then returns information about\n"
   "that message. If source is omitted, a message from any process\n"
   "will match. If tag is omitted, a message with any tag will match.\n"
- "The actual source and tag can be retrieved from the returned status\n"
+ "The actual source and tag can be retrieved from the returned Status\n"
   "object. To check if a message is available without blocking, use\n"
   "iprobe.\n";
 
@@ -399,7 +479,7 @@
   "message; otherwise, it returns None. If source is omitted, a message\n"
   "from any process will match. If tag is omitted, a message with any\n"
   "tag will match. The actual source and tag can be retrieved from the\n"
- "returned status object. To wait for a message to become available, use\n"
+ "returned Status object. To wait for a message to become available, use\n"
   "probe.\n";
 
 const char* communicator_barrier_docstring =
@@ -418,8 +498,8 @@
   "the ordering of processes with the same color in the resulting\n"
   "communicator. If omitted, the key will default to the rank of\n"
   "the process in the current communicator.\n\n"
- "Returns a new communicator containing all of the processes in\n"
- "this communicator that have the same color.\n";
+ "Returns a new Communicator instance containing all of the \n"
+ "processes in this communicator that have the same color.\n";
 
 const char* communicator_abort_docstring =
   "Makes a \"best attempt\" to abort all of the tasks in the group of\n"
@@ -435,36 +515,46 @@
  * request documentation *
  ***********************************************************/
 const char* request_docstring =
- "The request class contains information about a non-blocking send\n"
+ "The Request class contains information about a non-blocking send\n"
   "or receive and will be returned from isend or irecv, respectively.\n"
- "When a request object represents a completed irecv, the `value' \n"
+ "When a Request object represents a completed irecv, the `value' \n"
   "attribute will contain the received value.\n";
 
+const char* request_with_value_docstring =
+ "This class is an implementation detail. Any call that accepts a\n"
+ "Request also accepts a RequestWithValue, and vice versa.\n";
+
 const char* request_wait_docstring =
   "Wait until the communication associated with this request has\n"
   "completed. For a request that is associated with an isend(), returns\n"
- "a status object describing the communication. For an irecv()\n"
+ "a Status object describing the communication. For an irecv()\n"
   "operation, returns the received value by default. However, when\n"
- "return_status=True, a (value, status) pair is returned by a.\n"
+ "return_status=True, a (value, status) pair is returned by a\n"
   "completed irecv request.\n";
 
 const char* request_test_docstring =
   "Determine whether the communication associated with this request\n"
- "has completed successfully. If so, returns the status object\n"
+ "has completed successfully. If so, returns the Status object\n"
   "describing the communication (for an isend request) or a tuple\n"
- "containing the received value and a status object (for an irecv\n"
- "request). Note that once test() returns a status object, the\n"
+ "containing the received value and a Status object (for an irecv\n"
+ "request). Note that once test() returns a Status object, the\n"
   "request has completed and wait() should not be called.\n";
 
 const char* request_cancel_docstring =
   "Cancel a pending communication, assuming it has not already been\n"
   "completed.\n";
 
+const char* request_value_docstring =
+ "If this request originated in an irecv(), this property makes the"
+ "sent value accessible once the request completes.\n"
+ "\n"
+ "If no value is available, ValueError is raised.\n";
+
 /***********************************************************
  * skeleton/content documentation *
  ***********************************************************/
 const char* object_without_skeleton_docstring =
- "The object_without_skeleton class is an exception class used only\n"
+ "The ObjectWithoutSkeleton class is an exception class used only\n"
   "when the skeleton() or get_content() function is called with an\n"
   "object that is not supported by the skeleton/content mechanism.\n"
   "All C++ types for which skeletons and content can be transmitted\n"
@@ -475,13 +565,13 @@
   "The object on which skeleton() or get_content() was invoked.\n";
 
 const char* skeleton_proxy_docstring =
- "The skeleton_proxy class is used to represent the skeleton of an\n"
- "object. The skeleton_proxy can be used as the value parameter of\n"
+ "The SkeletonProxy class is used to represent the skeleton of an\n"
+ "object. The SkeletonProxy can be used as the value parameter of\n"
   "send() or isend() operations, but instead of transmitting the\n"
   "entire object, only its skeleton (\"shape\") will be sent, without\n"
   "the actual data. Its content can then be transmitted, separately.\n"
   "\n"
- "User code cannot generate skeleton_proxy instances directly. To\n"
+ "User code cannot generate SkeletonProxy instances directly. To\n"
   "refer to the skeleton of an object, use skeleton(object). Skeletons\n"
   "can also be received with the recv() and irecv() methods.\n"
   "\n"
@@ -503,7 +593,7 @@
   "skeleton/content mechanism.\n";
 
 const char* skeleton_docstring =
- "The skeleton function retrieves the skeleton_proxy for its object\n"
+ "The skeleton function retrieves the SkeletonProxy for its object\n"
   "parameter, allowing the transmission of the skeleton (or \"shape\")\n"
   "of the object separately from its data. The skeleton/content mechanism\n"
   "is useful when a large data structure remains structurally the same\n"
@@ -534,7 +624,7 @@
  * status documentation *
  ***********************************************************/
 const char* status_docstring =
- "The status class stores information about a given message, including\n"
+ "The Status class stores information about a given message, including\n"
   "its source, tag, and whether the message transmission was cancelled\n"
   "or resulted in an error.\n";
 
@@ -554,7 +644,7 @@
  * timer documentation *
  ***********************************************************/
 const char* timer_docstring =
- "The timer class is a simple wrapper around the MPI timing facilities.\n";
+ "The Timer class is a simple wrapper around the MPI timing facilities.\n";
 
 const char* timer_default_constructor_docstring =
   "Initializes the timer. After this call, elapsed == 0.\n";

Modified: trunk/libs/mpi/src/python/module.cpp
==============================================================================
--- trunk/libs/mpi/src/python/module.cpp (original)
+++ trunk/libs/mpi/src/python/module.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -27,6 +27,7 @@
 extern void export_request();
 extern void export_status();
 extern void export_timer();
+extern void export_nonblocking();
 
 extern const char* module_docstring;
 
@@ -35,7 +36,7 @@
   // Setup module documentation
   scope().attr("__doc__") = module_docstring;
   scope().attr("__author__") = "Douglas Gregor <doug.gregor_at_[hidden]>";
- scope().attr("__date__") = "$LastChangedDate: 2006-07-16 15:25:47 -0400 (Sun, 16 Jul 2006) $";
+ scope().attr("__date__") = "$LastChangedDate$";
   scope().attr("__version__") = "$Revision$";
   scope().attr("__copyright__") = "Copyright (C) 2006 Douglas Gregor";
   scope().attr("__license__") = "http://www.boost.org/LICENSE_1_0.txt";
@@ -48,6 +49,7 @@
   export_request();
   export_status();
   export_timer();
+ export_nonblocking();
 }
 
 } } } // end namespace boost::mpi::python

Modified: trunk/libs/mpi/src/python/py_communicator.cpp
==============================================================================
--- trunk/libs/mpi/src/python/py_communicator.cpp (original)
+++ trunk/libs/mpi/src/python/py_communicator.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -14,6 +14,7 @@
 #include <boost/python.hpp>
 #include <boost/mpi.hpp>
 #include <boost/mpi/python/serialize.hpp>
+#include "request_with_value.hpp"
 
 using namespace boost::python;
 using namespace boost::mpi;
@@ -49,14 +50,12 @@
     return result;
 }
 
-object
+request_with_value
 communicator_irecv(const communicator& comm, int source, int tag)
 {
- using boost::python::make_tuple;
-
- object result;
- object req(comm.irecv(source, tag, result));
- req.attr("value") = result;
+ boost::shared_ptr<object> result(new object());
+ request_with_value req(comm.irecv(source, tag, *result));
+ req.m_internal_value = result;
   return req;
 }
 
@@ -76,7 +75,7 @@
   using boost::python::arg;
   using boost::python::object;
   
- class_<communicator> comm("communicator", communicator_docstring);
+ class_<communicator> comm("Communicator", communicator_docstring);
   comm
     .def(init<>())
     .add_property("rank", &communicator::rank, communicator_rank_docstring)

Modified: trunk/libs/mpi/src/python/py_exception.cpp
==============================================================================
--- trunk/libs/mpi/src/python/py_exception.cpp (original)
+++ trunk/libs/mpi/src/python/py_exception.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -30,9 +30,8 @@
 
 str exception_str(const exception& e)
 {
- return str("MPI routine `" + std::string(e.routine()) +
- "' returned error code " +
- lexical_cast<std::string>(e.result_code()));
+ return str(std::string(e.what()) +
+ " (code " + lexical_cast<std::string>(e.result_code())+")");
 }
 
 void export_exception()
@@ -42,10 +41,10 @@
   
   object type =
     class_<exception>
- ("exception", exception_docstring, no_init)
+ ("Exception", exception_docstring, no_init)
       .add_property("what", &exception::what, exception_what_docstring)
       .add_property("routine", &exception::what, exception_routine_docstring)
- .add_property("result_code", &exception::what,
+ .add_property("result_code", &exception::result_code,
                     exception_result_code_docstring)
       .def("__str__", &exception_str)
     ;

Added: trunk/libs/mpi/src/python/py_nonblocking.cpp
==============================================================================
--- (empty file)
+++ trunk/libs/mpi/src/python/py_nonblocking.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -0,0 +1,255 @@
+// (C) Copyright 2007
+// Douglas Gregor <doug.gregor -at- gmail.com>
+// Andreas Kloeckner <inform -at- tiker.net>
+
+// Use, modification and distribution is subject to the Boost Software
+// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+// Authors: Douglas Gregor, Andreas Kloeckner
+
+/** @file py_nonblocking.cpp
+ *
+ * This file reflects the Boost.MPI nonblocking operations into Python
+ * functions.
+ */
+
+#include <vector>
+#include <iterator>
+#include <algorithm>
+#include <boost/operators.hpp>
+#include <boost/python.hpp>
+#include <boost/python/stl_iterator.hpp>
+#include <boost/python/suite/indexing/vector_indexing_suite.hpp>
+#include <boost/mpi.hpp>
+#include "request_with_value.hpp"
+
+using namespace std;
+using namespace boost::python;
+using namespace boost::mpi;
+
+
+
+
+namespace
+{
+ template <class ValueType, class RequestIterator>
+ class py_call_output_iterator :
+ public boost::output_iterator_helper<
+ py_call_output_iterator<ValueType, RequestIterator> >
+ {
+ private:
+ object m_callable;
+ RequestIterator m_request_iterator;
+
+ public:
+ explicit py_call_output_iterator(object callable,
+ const RequestIterator &req_it)
+ : m_callable(callable), m_request_iterator(req_it)
+ { }
+
+ py_call_output_iterator &operator=(ValueType const &v)
+ {
+ m_callable((m_request_iterator++)->get_value_or_none(), v);
+ return *this;
+ }
+ };
+
+
+
+
+ typedef std::vector<python::request_with_value> request_list;
+ typedef py_call_output_iterator<status, request_list::iterator>
+ status_value_iterator;
+
+
+
+
+ std::auto_ptr<request_list> make_request_list_from_py_list(object iterable)
+ {
+ std::auto_ptr<request_list> result(new request_list);
+ std::copy(
+ stl_input_iterator<python::request_with_value>(iterable),
+ stl_input_iterator<python::request_with_value>(),
+ back_inserter(*result));
+ return result;
+ }
+
+
+
+
+ class request_list_indexing_suite :
+ public vector_indexing_suite<request_list, false, request_list_indexing_suite>
+ {
+ public:
+ // FIXME: requests are not comparable, thus __contains__ makes no sense.
+ // Unfortunately, indexing_suites insist on having __contains__ available.
+ // Just make it error out for now.
+
+ static bool
+ contains(request_list& container, request const& key)
+ {
+ PyErr_SetString(PyExc_NotImplementedError, "mpi requests are not comparable");
+ throw error_already_set();
+ }
+ };
+
+
+
+
+ void check_request_list_not_empty(const request_list &requests)
+ {
+ if (requests.size() == 0)
+ {
+ PyErr_SetString(PyExc_ValueError, "cannot wait on an empty request vector");
+ throw error_already_set();
+ }
+
+ }
+
+
+
+
+
+ object wrap_wait_any(request_list &requests)
+ {
+ check_request_list_not_empty(requests);
+
+ pair<status, request_list::iterator> result =
+ wait_any(requests.begin(), requests.end());
+
+ return make_tuple(
+ result.second->get_value_or_none(),
+ result.first,
+ distance(requests.begin(), result.second));
+ }
+
+
+
+
+ object wrap_test_any(request_list &requests)
+ {
+ check_request_list_not_empty(requests);
+ ::boost::optional<pair<status, request_list::iterator> > result =
+ test_any(requests.begin(), requests.end());
+
+ if (result)
+ return make_tuple(
+ result->second->get_value_or_none(),
+ result->first,
+ distance(requests.begin(), result->second));
+ else
+ return object();
+ }
+
+
+
+
+
+ void wrap_wait_all(request_list &requests, object py_callable)
+ {
+ check_request_list_not_empty(requests);
+ if (py_callable != object())
+ wait_all(requests.begin(), requests.end(),
+ status_value_iterator(py_callable, requests.begin()));
+ else
+ wait_all(requests.begin(), requests.end());
+ }
+
+
+
+
+ bool wrap_test_all(request_list &requests, object py_callable)
+ {
+ check_request_list_not_empty(requests);
+ if (py_callable != object())
+ return test_all(requests.begin(), requests.end(),
+ status_value_iterator(py_callable, requests.begin()));
+ else
+ return test_all(requests.begin(), requests.end());
+ }
+
+
+
+
+ int wrap_wait_some(request_list &requests, object py_callable)
+ {
+ check_request_list_not_empty(requests);
+ request_list::iterator first_completed;
+ if (py_callable != object())
+ first_completed = wait_some(requests.begin(), requests.end(),
+ status_value_iterator(py_callable, requests.begin())).second;
+ else
+ first_completed = wait_some(requests.begin(), requests.end());
+
+ return distance(requests.begin(), first_completed);
+ }
+
+
+
+
+ int wrap_test_some(request_list &requests, object py_callable)
+ {
+ check_request_list_not_empty(requests);
+ request_list::iterator first_completed;
+ if (py_callable != object())
+ first_completed = test_some(requests.begin(), requests.end(),
+ status_value_iterator(py_callable, requests.begin())).second;
+ else
+ first_completed = test_some(requests.begin(), requests.end());
+
+ return distance(requests.begin(), first_completed);
+ }
+}
+
+
+
+
+namespace boost { namespace mpi { namespace python {
+
+extern const char* request_list_init_docstring;
+extern const char* request_list_append_docstring;
+
+extern const char* nonblocking_wait_any_docstring;
+extern const char* nonblocking_test_any_docstring;
+extern const char* nonblocking_wait_all_docstring;
+extern const char* nonblocking_test_all_docstring;
+extern const char* nonblocking_wait_some_docstring;
+extern const char* nonblocking_test_some_docstring;
+
+void export_nonblocking()
+{
+ using boost::python::arg;
+
+ {
+ typedef request_list cl;
+ class_<cl>("RequestList", "A list of Request objects.")
+ .def("__init__", make_constructor(make_request_list_from_py_list),
+ /*arg("iterable"),*/ request_list_init_docstring)
+ .def(request_list_indexing_suite())
+ ;
+ }
+
+ def("wait_any", wrap_wait_any,
+ (arg("requests")),
+ nonblocking_wait_any_docstring);
+ def("test_any", wrap_test_any,
+ (arg("requests")),
+ nonblocking_test_any_docstring);
+
+ def("wait_all", wrap_wait_all,
+ (arg("requests"), arg("callable") = object()),
+ nonblocking_wait_all_docstring);
+ def("test_all", wrap_test_all,
+ (arg("requests"), arg("callable") = object()),
+ nonblocking_test_all_docstring);
+
+ def("wait_some", wrap_wait_some,
+ (arg("requests"), arg("callable") = object()),
+ nonblocking_wait_some_docstring);
+ def("test_some", wrap_test_some,
+ (arg("requests"), arg("callable") = object()),
+ nonblocking_test_some_docstring);
+}
+
+} } }

Modified: trunk/libs/mpi/src/python/py_request.cpp
==============================================================================
--- trunk/libs/mpi/src/python/py_request.cpp (original)
+++ trunk/libs/mpi/src/python/py_request.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -13,52 +13,90 @@
  */
 #include <boost/python.hpp>
 #include <boost/mpi.hpp>
+#include "request_with_value.hpp"
 
 using namespace boost::python;
 using namespace boost::mpi;
 
-namespace boost { namespace mpi { namespace python {
-
-extern const char* request_docstring;
-extern const char* request_wait_docstring;
-extern const char* request_test_docstring;
-extern const char* request_cancel_docstring;
+const object python::request_with_value::get_value() const
+{
+ if (m_internal_value.get())
+ return *m_internal_value;
+ else if (m_external_value)
+ return *m_external_value;
+ else
+ {
+ PyErr_SetString(PyExc_ValueError, "request value not available");
+ throw boost::python::error_already_set();
+ }
+}
 
-object request_wait(object req_obj)
+const object python::request_with_value::get_value_or_none() const
 {
- request& req = extract<request&>(req_obj)();
- status stat = req.wait();
- if (PyObject_HasAttrString(req_obj.ptr(), "value"))
- return boost::python::make_tuple(stat, req_obj.attr("value"));
+ if (m_internal_value.get())
+ return *m_internal_value;
+ else if (m_external_value)
+ return *m_external_value;
   else
- return object(stat);
+ return object();
 }
 
-object request_test(object req_obj)
+const object python::request_with_value::wrap_wait()
 {
- request& req = extract<request&>(req_obj)();
+ status stat = request::wait();
+ if (m_internal_value.get() || m_external_value)
+ return boost::python::make_tuple(get_value(), stat);
+ else
+ return object(stat);
+}
 
- if (optional<status> stat = req.test())
- {
- if (PyObject_HasAttrString(req_obj.ptr(), "value"))
- return boost::python::make_tuple(stat, req_obj.attr("value"));
- else
- return object(stat);
- }
+const object python::request_with_value::wrap_test()
+{
+ ::boost::optional<status> stat = request::test();
+ if (stat)
+ {
+ if (m_internal_value.get() || m_external_value)
+ return boost::python::make_tuple(get_value(), *stat);
+ else
+ return object(*stat);
+ }
   else
     return object();
 }
 
+
+namespace boost { namespace mpi { namespace python {
+
+extern const char* request_docstring;
+extern const char* request_with_value_docstring;
+extern const char* request_wait_docstring;
+extern const char* request_test_docstring;
+extern const char* request_cancel_docstring;
+extern const char* request_value_docstring;
+
 void export_request()
 {
   using boost::python::arg;
   using boost::python::object;
   
- class_<request>("request", request_docstring, no_init)
- .def("wait", &request_wait, request_wait_docstring)
- .def("test", &request_test, request_test_docstring)
- .def("cancel", &request::cancel, request_cancel_docstring)
- ;
+ {
+ typedef request cl;
+ class_<cl>("Request", request_docstring, no_init)
+ .def("wait", &cl::wait, request_wait_docstring)
+ .def("test", &cl::test, request_test_docstring)
+ .def("cancel", &cl::cancel, request_cancel_docstring)
+ ;
+ }
+ {
+ typedef request_with_value cl;
+ class_<cl, bases<request> >(
+ "RequestWithValue", request_with_value_docstring, no_init)
+ .def("wait", &cl::wrap_wait, request_wait_docstring)
+ .def("test", &cl::wrap_test, request_test_docstring)
+ ;
+ }
+
+ implicitly_convertible<request, request_with_value>();
 }
 
 } } } // end namespace boost::mpi::python

Modified: trunk/libs/mpi/src/python/py_timer.cpp
==============================================================================
--- trunk/libs/mpi/src/python/py_timer.cpp (original)
+++ trunk/libs/mpi/src/python/py_timer.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -32,7 +32,7 @@
   using boost::python::arg;
   using boost::python::object;
   
- class_<timer>("timer", timer_docstring)
+ class_<timer>("Timer", timer_docstring)
     .def(init<>())
     .def("restart", &timer::restart, timer_restart_docstring)
     .add_property("elapsed", &timer::elapsed, timer_elapsed_docstring)

Added: trunk/libs/mpi/src/python/request_with_value.hpp
==============================================================================
--- (empty file)
+++ trunk/libs/mpi/src/python/request_with_value.hpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -0,0 +1,71 @@
+// (C) Copyright 2006
+// Douglas Gregor <doug.gregor -at- gmail.com>
+// Andreas Kloeckner <inform -at- tiker.net>
+
+// Use, modification and distribution is subject to the Boost Software
+// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+// Authors: Douglas Gregor, Andreas Kloeckner
+
+#ifndef BOOST_MPI_PYTHON_REQUEST_WITH_VALUE_HPP
+#define BOOST_MPI_PYTHON_REQUEST_WITH_VALUE_HPP
+
+#include <boost/python.hpp>
+#include <boost/mpi.hpp>
+
+namespace boost { namespace mpi { namespace python {
+
+ /** This wrapper adds a @c boost::python::object value to the @c
+ * boost::mpi::request structure, for the benefit of @c irecv() requests.
+ *
+ * In order to be able to return the value of his requests to the user, we
+ * need a handle that we can update to contain the transmitted value once the
+ * request completes. Since we're passing the address on to irecv to fill at
+ * any time in the future, this address may not change over time.
+ *
+ * There are two possible cases:
+ * - plain irecv()
+ * - skeleton-content irecv()
+ *
+ * In the first case, we need to own the storage from this object, the
+ * m_internal_value is used for this. In the second case, the updated
+ * python::object is part of a boost::mpi::python::content object: the
+ * m_external_value field handles this case. Furthermore, in the latter case,
+ * we now have a lifetime dependency on that content object; this can be
+ * handled with the BPL's with_custodian_and_ward facility.
+ *
+ * Since requests and request_with_value are supposed to be copyconstructible,
+ * we can't put the handle immediately inside this instance. Moreover, since
+ * we need to be able to put request_with_value inside request_vectors, any
+ * values we own must be held in a shared_ptr instance.
+ */
+
+ class request_with_value : public request
+ {
+ private:
+ boost::shared_ptr<boost::python::object> m_internal_value;
+ boost::python::object *m_external_value;
+
+ public:
+ request_with_value()
+ : m_external_value(0)
+ { }
+ request_with_value(const request &req)
+ : request(req), m_external_value(0)
+ { }
+
+ const boost::python::object get_value() const;
+ const boost::python::object get_value_or_none() const;
+
+ const boost::python::object wrap_wait();
+ const boost::python::object wrap_test();
+
+ friend request_with_value communicator_irecv(const communicator &, int, int);
+ friend request_with_value communicator_irecv_content(
+ const communicator&, int, int, content&);
+ };
+
+} } }
+
+#endif

Modified: trunk/libs/mpi/src/python/skeleton_and_content.cpp
==============================================================================
--- trunk/libs/mpi/src/python/skeleton_and_content.cpp (original)
+++ trunk/libs/mpi/src/python/skeleton_and_content.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -16,6 +16,7 @@
 #include <typeinfo>
 #include <list>
 #include "utility.hpp"
+#include "request_with_value.hpp"
 
 using namespace boost::python;
 using namespace boost::mpi;
@@ -114,14 +115,12 @@
 /// Receive the content of a Python object. The request object's value
 /// attribute will reference the object whose content is being
 /// received, not the content wrapper.
-object
+request_with_value
 communicator_irecv_content(const communicator& comm, int source, int tag,
- const content& c)
+ content& c)
 {
- using boost::python::make_tuple;
-
- object req(comm.irecv(source, tag, c.base()));
- req.attr("value") = c.object;
+ request_with_value req(comm.irecv(source, tag, c.base()));
+ req.m_external_value = &c.object;
   return req;
 }
 
@@ -140,7 +139,7 @@
   // Expose the object_without_skeleton exception
   object type =
     class_<object_without_skeleton>
- ("object_without_skeleton", object_without_skeleton_docstring, no_init)
+ ("ObjectWithoutSkeleton", object_without_skeleton_docstring, no_init)
       .def_readonly("object", &object_without_skeleton::value,
                     object_without_skeleton_object_docstring)
       .def("__str__", &object_without_skeleton_str)
@@ -150,11 +149,11 @@
   // Expose the Python variants of "skeleton_proxy" and "content", and
   // their generator functions.
   detail::skeleton_proxy_base_type =
- class_<skeleton_proxy_base>("skeleton_proxy", skeleton_proxy_docstring,
+ class_<skeleton_proxy_base>("SkeletonProxy", skeleton_proxy_docstring,
                                 no_init)
       .def_readonly("object", &skeleton_proxy_base::object,
                     skeleton_proxy_object_docstring);
- class_<content>("content", content_docstring, no_init);
+ class_<content>("Content", content_docstring, no_init);
   def("skeleton", &skeleton, arg("object"), skeleton_docstring);
   def("get_content", &get_content, arg("object"), get_content_docstring);
 
@@ -166,7 +165,9 @@
          (arg("source") = any_source, arg("tag") = any_tag, arg("buffer"),
           arg("return_status") = false))
     .def("irecv", communicator_irecv_content,
- (arg("source") = any_source, arg("tag") = any_tag, arg("buffer")));
+ (arg("source") = any_source, arg("tag") = any_tag, arg("buffer")),
+ with_custodian_and_ward_postcall<0, 4>()
+ );
 }
 
 } } } // end namespace boost::mpi::python

Modified: trunk/libs/mpi/src/python/status.cpp
==============================================================================
--- trunk/libs/mpi/src/python/status.cpp (original)
+++ trunk/libs/mpi/src/python/status.cpp 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -30,7 +30,7 @@
   using boost::python::arg;
   using boost::python::object;
   
- class_<status>("status", status_docstring, no_init)
+ class_<status>("Status", status_docstring, no_init)
     .add_property("source", &status::source, status_source_docstring)
     .add_property("tag", &status::tag, status_tag_docstring)
     .add_property("error", &status::error, status_error_docstring)

Added: trunk/libs/mpi/test/python/nonblocking_test.py
==============================================================================
--- (empty file)
+++ trunk/libs/mpi/test/python/nonblocking_test.py 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -0,0 +1,131 @@
+# (C) Copyright 2007
+# Andreas Kloeckner <inform -at- tiker.net>
+#
+# Use, modification and distribution is subject to the Boost Software
+# License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
+# http://www.boost.org/LICENSE_1_0.txt)
+#
+# Authors: Andreas Kloeckner
+
+
+
+
+import boost.mpi as mpi
+import random
+import sys
+
+MAX_GENERATIONS = 20
+TAG_DEBUG = 0
+TAG_DATA = 1
+TAG_TERMINATE = 2
+TAG_PROGRESS_REPORT = 3
+
+
+
+
+class TagGroupListener:
+ """Class to help listen for only a given set of tags.
+
+ This is contrived: Typicallly you could just listen for
+ mpi.any_tag and filter."""
+ def __init__(self, comm, tags):
+ self.tags = tags
+ self.comm = comm
+ self.active_requests = {}
+
+ def wait(self):
+ for tag in self.tags:
+ if tag not in self.active_requests:
+ self.active_requests[tag] = self.comm.irecv(tag=tag)
+ requests = mpi.RequestList(self.active_requests.values())
+ data, status, index = mpi.wait_any(requests)
+ del self.active_requests[status.tag]
+ return status, data
+
+ def cancel(self):
+ for r in self.active_requests.itervalues():
+ r.cancel()
+ #r.wait()
+ self.active_requests = {}
+
+
+
+def rank0():
+ sent_histories = (mpi.size-1)*15
+ print "sending %d packets on their way" % sent_histories
+ send_reqs = mpi.RequestList()
+ for i in range(sent_histories):
+ dest = random.randrange(1, mpi.size)
+ send_reqs.append(mpi.world.isend(dest, TAG_DATA, []))
+
+ mpi.wait_all(send_reqs)
+
+ completed_histories = []
+ progress_reports = {}
+ dead_kids = []
+
+ tgl = TagGroupListener(mpi.world,
+ [TAG_DATA, TAG_DEBUG, TAG_PROGRESS_REPORT, TAG_TERMINATE])
+
+ def is_complete():
+ for i in progress_reports.values():
+ if i != sent_histories:
+ return False
+ return len(dead_kids) == mpi.size-1
+
+ while True:
+ status, data = tgl.wait()
+
+ if status.tag == TAG_DATA:
+ #print "received completed history %s from %d" % (data, status.source)
+ completed_histories.append(data)
+ if len(completed_histories) == sent_histories:
+ print "all histories received, exiting"
+ for rank in range(1, mpi.size):
+ mpi.world.send(rank, TAG_TERMINATE, None)
+ elif status.tag == TAG_PROGRESS_REPORT:
+ progress_reports[len(data)] = progress_reports.get(len(data), 0) + 1
+ elif status.tag == TAG_DEBUG:
+ print "[DBG %d] %s" % (status.source, data)
+ elif status.tag == TAG_TERMINATE:
+ dead_kids.append(status.source)
+ else:
+ print "unexpected tag %d from %d" % (status.tag, status.source)
+
+ if is_complete():
+ break
+
+ print "OK"
+
+def comm_rank():
+ while True:
+ data, status = mpi.world.recv(return_status=True)
+ if status.tag == TAG_DATA:
+ mpi.world.send(0, TAG_PROGRESS_REPORT, data)
+ data.append(mpi.rank)
+ if len(data) >= MAX_GENERATIONS:
+ dest = 0
+ else:
+ dest = random.randrange(1, mpi.size)
+ mpi.world.send(dest, TAG_DATA, data)
+ elif status.tag == TAG_TERMINATE:
+ from time import sleep
+ mpi.world.send(0, TAG_TERMINATE, 0)
+ break
+ else:
+ print "[DIRECTDBG %d] unexpected tag %d from %d" % (mpi.rank, status.tag, status.source)
+
+
+def main():
+ # this program sends around messages consisting of lists of visited nodes
+ # randomly. After MAX_GENERATIONS, they are returned to rank 0.
+
+ if mpi.rank == 0:
+ rank0()
+ else:
+ comm_rank()
+
+
+
+if __name__ == "__main__":
+ main()

Modified: trunk/tools/build/v2/tools/mpi.jam
==============================================================================
--- trunk/tools/build/v2/tools/mpi.jam (original)
+++ trunk/tools/build/v2/tools/mpi.jam 2008-02-07 16:08:09 EST (Thu, 07 Feb 2008)
@@ -271,6 +271,12 @@
       command = [ common.get-invocation-command mpi : mpiCC ] ;
     }
 
+ if ! $(mpicxx) && ! $(command)
+ {
+ # Try "mpicxx", which is used by OpenMPI and MPICH2
+ command = [ common.get-invocation-command mpi : mpicxx ] ;
+ }
+
     local result ;
     local compile_flags ;
     local link_flags ;


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk