Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r64210 - in sandbox/SOC/2010/process: boost/process boost/process/detail libs/process/example
From: boris_at_[hidden]
Date: 2010-07-20 20:37:11


Author: bschaeling
Date: 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
New Revision: 64210
URL: http://svn.boost.org/trac/boost/changeset/64210

Log:
Added support for asynchronous waiting on POSIX platforms
Text files modified:
   sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp | 130 ++++++++++++++++++++++++---------------
   sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp | 5 -
   sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp | 91 +++++++++++++++++++--------
   sandbox/SOC/2010/process/boost/process/operations.hpp | 26 ++++---
   sandbox/SOC/2010/process/boost/process/process.hpp | 4
   sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp | 8 -
   sandbox/SOC/2010/process/libs/process/example/wait_child.cpp | 6 -
   7 files changed, 164 insertions(+), 106 deletions(-)

Modified: sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp (original)
+++ sandbox/SOC/2010/process/boost/process/detail/basic_status_service.hpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -34,6 +34,8 @@
 #include <algorithm>
 
 #if defined(BOOST_POSIX_API)
+# include <sys/types.h>
+# include <sys/wait.h>
 #elif defined(BOOST_WINDOWS_API)
 # include <windows.h>
 #else
@@ -49,17 +51,18 @@
     : public boost::asio::detail::service_base<StatusImplementation>
 {
 public:
-
-
- typedef boost::shared_ptr<StatusImplementation> implementation_type;
-
     explicit basic_status_service(boost::asio::io_service &io_service)
         : boost::asio::detail::service_base<StatusImplementation>(io_service),
- run_(true),
         work_thread_(&basic_status_service<StatusImplementation>::work_thread, this)
+#if defined(BOOST_POSIX_API)
+ , interrupt_pid_(-1),
+ pids_(0)
+#elif defined(BOOST_WINDOWS_API)
+ , run_(true)
+#endif
     {
 #if defined(BOOST_WINDOWS_API)
- handles_.push_back(::CreateEvent(NULL, FALSE, FALSE, NULL));
+ handles_.push_back(CreateEvent(NULL, FALSE, FALSE, NULL));
         if (handles_[0] == NULL)
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("CreateEvent() failed");
 #endif
@@ -70,14 +73,11 @@
         stop_work_thread();
         work_thread_.join();
 #if defined(BOOST_WINDOWS_API)
- if (!::CloseHandle(handles_[0]))
- BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("CloseHandle() failed");
-#elif defined(BOOST_POSIX_API)
- if (::close(handles_[0]) == -1)
- BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("close() failed");
+ CloseHandle(handles_[0]);
 #endif
     }
 
+ typedef boost::shared_ptr<StatusImplementation> implementation_type;
 
     void construct(implementation_type &impl)
     {
@@ -88,43 +88,45 @@
 
     void destroy(implementation_type &impl)
     {
- // impl->destroy();
+ boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
+ std::vector<implementation_type>::iterator it = std::find(impls_.begin(), impls_.end(), impl);
+ if (it != impls_.end())
+ impls_.erase(it);
         impl.reset();
- // remove impl from impls_
     }
 
     int wait(implementation_type &impl, pid_type pid)
     {
- /*
         boost::system::error_code ec;
- int exit_code = impl->wait(pid, ec);
+ int status = impl.wait(pid, ec);
         boost::asio::detail::throw_error(ec);
- */
- int exit_code = 0;
- return exit_code;
+ return status;
     }
 
     template <typename Handler>
     void async_wait(implementation_type &impl, pid_type pid, Handler handler)
     {
-#if defined(BOOST_WINDOWS_API)
- HANDLE handle = ::OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, FALSE, pid);
+#if defined(BOOST_POSIX_API)
+ boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
+ if (!work_)
+ work_.reset(new boost::asio::io_service::work(this->get_io_service()));
+ ++pids_;
+ impl->async_wait(pid, this->get_io_service().wrap(handler));
+#elif defined(BOOST_WINDOWS_API)
+ HANDLE handle = OpenProcess(SYNCHRONIZE | PROCESS_QUERY_INFORMATION, FALSE, pid);
         if (handle == NULL)
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("OpenProcess() failed");
-#elif defined(BOOST_POSIX_API)
- behavior::stream::native_type handle = pid;
-#endif
         boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
- interrupt_work_thread();
- work_thread_cond_.wait(work_thread_mutex_);
         if (!work_)
             work_.reset(new boost::asio::io_service::work(this->get_io_service()));
+ interrupt_work_thread();
+ work_thread_cond_.wait(work_thread_mutex_);
         handles_.push_back(handle);
         impl->async_wait(handle, this->get_io_service().wrap(handler));
         work_thread_cond_.notify_all();
+#endif
     }
 
-
 private:
     void shutdown_service()
     {
@@ -132,10 +134,31 @@
 
     void work_thread()
     {
- while (running())
+ for (;;)
         {
-#if defined(BOOST_WINDOWS_API)
- DWORD res = ::WaitForMultipleObjects(handles_.size(), &handles_[0], FALSE, INFINITE);
+#if defined(BOOST_POSIX_API)
+ int status;
+ pid_t pid = wait(&status);
+ if (pid == -1)
+ {
+ if (errno != EINTR)
+ BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("wait(2) failed");
+ }
+ else if (interrupted(pid))
+ {
+ // On POSIX the only reason to interrupt is to break out.
+ break;
+ }
+ else
+ {
+ boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
+ for (std::vector<implementation_type>::iterator it = impls_.begin(); it != impls_.end(); ++it)
+ (*it)->complete(pid, status);
+ if (--pids_ == 0)
+ work_.reset();
+ }
+#elif defined(BOOST_WINDOWS_API)
+ DWORD res = WaitForMultipleObjects(handles_.size(), &handles_[0], FALSE, INFINITE);
             if (res == WAIT_FAILED)
                 BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("WaitForMultipleObjects() failed");
             else if (res - WAIT_OBJECT_0 == 0)
@@ -150,7 +173,7 @@
             {
                 HANDLE handle = handles_[res - WAIT_OBJECT_0];
                 DWORD exit_code;
- if (!::GetExitCodeProcess(handle, &exit_code))
+ if (!GetExitCodeProcess(handle, &exit_code))
                     BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("GetExitCodeProcess() failed");
                 boost::unique_lock<boost::mutex> lock(work_thread_mutex_);
                 for (std::vector<implementation_type>::iterator it = impls_.begin(); it != impls_.end(); ++it)
@@ -161,53 +184,58 @@
                 if (handles_.size() == 1)
                     work_.reset();
             }
-#elif defined(BOOST_POSIX_API)
- //linux here
 #endif
         }
     }
 
- bool running()
- {
- // Access to run_ is sychronized with stop_work_thread().
- boost::mutex::scoped_lock lock(work_thread_mutex_);
- return run_;
- }
-
     void interrupt_work_thread()
     {
+#if defined(BOOST_POSIX_API)
+ // By creating a child process which immediately exits we interrupt wait().
+ interrupt_pid_ = create_child("/usr/sh").get_id();
+#elif defined(BOOST_WINDOWS_API)
         // By signaling the event in the first slot WaitForMultipleObjects() will return.
         // The work thread won't do anything except checking if it should continue to run.
-#if defined(BOOST_WINDOWS_API)
- if (!::SetEvent(handles_[0]))
+ if (!SetEvent(handles_[0]))
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("SetEvent() failed");
-#elif defined(BOOST_POSIX_API)
- //linux here
 #endif
     }
 
+#if defined(BOOST_POSIX_API)
+ bool interrupted(pid_t pid)
+ {
+ boost::mutex::scoped_lock lock(work_thread_mutex_);
+ return interrupt_pid_ == pid;
+ }
+#endif
+
     void stop_work_thread()
     {
- // Access to run_ is sychronized with running().
         boost::mutex::scoped_lock lock(work_thread_mutex_);
+#if defined(BOOST_WINDOWS_API)
+ // Access to run_ must be sychronized with running().
         run_ = false;
+#endif
+ // Access to interrupt_pid_ must be sychronized with interrupted().
         interrupt_work_thread();
     }
 
     boost::scoped_ptr<boost::asio::io_service::work> work_;
     std::vector<implementation_type> impls_;
     boost::mutex work_thread_mutex_;
- boost::condition_variable_any work_thread_cond_;
- bool run_;
     boost::thread work_thread_;
- std::vector<behavior::stream::native_type> handles_;
-
-
-
+#if defined(BOOST_POSIX_API)
+ pid_t interrupt_pid_;
+ int pids_;
+#elif defined(BOOST_WINDOWS_API)
+ bool run_;
+ boost::condition_variable_any work_thread_cond_;
+ std::vector<HANDLE> handles_;
+#endif
 };
 
 }
 }
 }
 
-#endif
+#endif

Modified: sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp (original)
+++ sandbox/SOC/2010/process/boost/process/detail/posix_helpers.hpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -99,9 +99,4 @@
 }
 }
 
-void posix_remap(int native_handle, int new_handle)
-{
- if (::dup2(new_handle, native_handle) == -1)
- BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
-}
 #endif

Modified: sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp (original)
+++ sandbox/SOC/2010/process/boost/process/detail/status_impl.hpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -22,11 +22,11 @@
 #include <boost/process/config.hpp>
 #include <boost/system/error_code.hpp>
 #include <boost/ptr_container/ptr_unordered_map.hpp>
-#include <boost/bind.hpp>
-#include <boost/process/stream_behavior.hpp>
 #include <algorithm>
 
 #if defined(BOOST_POSIX_API)
+# include <sys/types.h>
+# include <sys/wait.h>
 #elif defined(BOOST_WINDOWS_API)
 # include <windows.h>
 #else
@@ -37,13 +37,15 @@
 namespace process {
 namespace detail {
 
+#elif defined(BOOST_POSIX_API)
+typedef pid_t phandle;
+#elif defined(BOOST_WINDOWS_API)
+typedef HANDLE phandle;
+#endif
+
 struct operation
 {
- #if defined(BOOST_WINDOWS_API)
- virtual void operator()(DWORD exit_code)
- #elif defined BOOST_POSIX_API
- virtual void operator()(unsigned int exit_code)
- #endif
+ virtual void operator()(int exit_code)
     {
     }
 };
@@ -57,11 +59,7 @@
     {
     }
 
- #if defined(BOOST_WINDOWS_API)
- void operator()(DWORD exit_code)
- #elif defined BOOST_POSIX_API
- void operator()(unsigned int exit_code)
- #endif
+ void operator()(int exit_code)
     {
         handler_(boost::system::error_code(), exit_code);
     }
@@ -73,33 +71,72 @@
 class status_impl
 {
 public:
+ int wait(pid_type pid, boost::system::error_code ec)
+ {
+#if defined(BOOST_POSIX_API)
+ pid_t p;
+ int status;
+ do
+ {
+ p = waitpid(pid, &status, 0);
+ } while (p == -1 && errno == EINTR);
+ if (p == -1)
+ {
+ ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "waitpid(2) failed");
+ return -1;
+ }
+ return status;
+#elif defined(BOOST_WINDOWS_API)
+ HANDLE h = OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, FALSE, pid);
+ if (h == NULL)
+ {
+ ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "OpenProcess() failed");
+ return -1;
+ }
+
+ if (WaitForSingleObject(h, INFINITE) == WAIT_FAILED)
+ {
+ CloseHandle(h);
+ ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "WaitForSingleObject() failed");
+ return -1;
+ }
+
+ DWORD exit_code;
+ if (!GetExitCodeProcess(h, &exit_code))
+ {
+ CloseHandle(h);
+ ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "GetExitCodeProcess() failed");
+ return -1;
+ }
+ if (!CloseHandle(h))
+ {
+ ec = boost::system::system_error(boost::system::error_code(errno, boost::system::get_system_category()), BOOST_PROCESS_SOURCE_LOCATION "CloseHandle() failed");
+ return -1;
+ }
+ return exit_code;
+#endif
+ }
+
     template <typename Handler>
- void async_wait(behavior::stream::native_type handle, Handler handler)
+ void async_wait(phandle ph, Handler handler)
     {
- ops_.insert(handle, new wrapped_handler<Handler>(handler));
+ ops_.insert(ph, new wrapped_handler<Handler>(handler));
     }
 
- #if defined(BOOST_WINDOWS_API)
- void complete(behavior::stream::native_type handle, DWORD exit_code)
- #elif defined(BOOST_POSIX_API)
- void complete(behavior::stream::native_type handle, unsigned int exit_code)
- #endif
+ void complete(phandle ph, int exit_code)
     {
- boost::iterator_range<operations_type::iterator> r = ops_.equal_range(handle);
+ boost::iterator_range<operations_type::iterator> r = ops_.equal_range(ph);
         for (operations_type::iterator it = r.begin(); it != r.end(); ++it)
             (*it->second)(exit_code);
         ops_.erase(r.begin(), r.end());
- #if defined(BOOST_WINDOWS_API)
- if (!::CloseHandle(handle))
+#if defined(BOOST_WINDOWS_API)
+ if (!CloseHandle(ph))
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("CloseHandle() failed");
- #elif defined(BOOST_POSIX_API)
- if (::close(handle) == -1)
- BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("close() failed");
- #endif
+#endif
     }
 
 private:
- typedef boost::ptr_unordered_multimap<behavior::stream::native_type, operation> operations_type;
+ typedef boost::ptr_unordered_multimap<phandle, operation> operations_type;
     operations_type ops_;
 };
 

Modified: sandbox/SOC/2010/process/boost/process/operations.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/operations.hpp (original)
+++ sandbox/SOC/2010/process/boost/process/operations.hpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -200,29 +200,33 @@
 
             int stdin_fd = ctx.stdin_behavior->get_child_end();
             if (stdin_fd != -1 && stdin_fd < maxdescs)
- closeflags[stdin_fd] = false;
+ {
+ if (::dup2(stdin_fd, STDIN_FILENO) == -1)
+ BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
+ closeflags[STDIN_FILENO] = false;
+ }
 
             int stdout_fd = ctx.stdout_behavior->get_child_end();
             if (stdout_fd != -1 && stdout_fd < maxdescs)
- closeflags[stdout_fd] = false;
+ {
+ if (::dup2(stdout_fd, STDOUT_FILENO) == -1)
+ BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
+ closeflags[STDOUT_FILENO] = false;
+ }
 
             int stderr_fd = ctx.stderr_behavior->get_child_end();
             if (stderr_fd != -1 && stderr_fd < maxdescs)
- closeflags[stderr_fd] = false;
+ {
+ if (::dup2(stderr_fd, STDERR_FILENO) == -1)
+ BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("dup2() failed");
+ closeflags[STDERR_FILENO] = false;
+ }
 
             for (int i = 0; i < maxdescs; ++i)
             {
                 if (closeflags[i])
                     ::close(i);
             }
-
- if(closeflags[stdin_fd] == false)
- posix_remap(STDIN_FILENO, stdin_fd);
- if(closeflags[stdout_fd] == false)
- posix_remap(STDOUT_FILENO, stdout_fd);
- if(closeflags[stderr_fd] == false)
- posix_remap(STDERR_FILENO, stderr_fd);
-
         }
         catch (const boost::system::system_error &e)
         {

Modified: sandbox/SOC/2010/process/boost/process/process.hpp
==============================================================================
--- sandbox/SOC/2010/process/boost/process/process.hpp (original)
+++ sandbox/SOC/2010/process/boost/process/process.hpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -137,8 +137,7 @@
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("waitpid(2) failed");
         return s;
 #elif defined(BOOST_WINDOWS_API)
-
- HANDLE h = ::OpenProcess(PROCESS_QUERY_INFORMATION |SYNCHRONIZE , FALSE, id_);
+ HANDLE h = ::OpenProcess(PROCESS_QUERY_INFORMATION | SYNCHRONIZE, FALSE, id_);
         if (h == NULL)
             BOOST_PROCESS_THROW_LAST_SYSTEM_ERROR("OpenProcess() failed");
 
@@ -149,7 +148,6 @@
         }
 
         DWORD exit_code;
-
         if (!::GetExitCodeProcess(h, &exit_code))
         {
             ::CloseHandle(h);

Modified: sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp
==============================================================================
--- sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp (original)
+++ sandbox/SOC/2010/process/libs/process/example/read_from_child.cpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -18,13 +18,11 @@
 
 int main()
 {
-
     std::string exe = find_executable_in_path("hostname");
- context ctx;
- ctx.stdout_behavior = behavior::pipe::def(behavior::pipe::stream_type::output_stream);
-
+ context ctx;
+ ctx.stdout_behavior = behavior::pipe::def(behavior::pipe::stream_type::output_stream);
     child c = create_child(exe,ctx);
     pistream &is = c.get_stdout();
     std::cout << is.rdbuf();
- c.wait();
+ c.wait();
 }

Modified: sandbox/SOC/2010/process/libs/process/example/wait_child.cpp
==============================================================================
--- sandbox/SOC/2010/process/libs/process/example/wait_child.cpp (original)
+++ sandbox/SOC/2010/process/libs/process/example/wait_child.cpp 2010-07-20 20:37:10 EDT (Tue, 20 Jul 2010)
@@ -20,8 +20,6 @@
 {
     std::string exe = find_executable_in_path("hostname");
     child c = create_child(exe);
-
- int exit_c = c.wait();
- std::cout << exit_c << std::endl;
-
+ int exit_code = c.wait();
+ std::cout << exit_code << std::endl;
 }


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk