Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r86161 - in trunk: boost/thread libs/thread/example libs/thread/test
From: vicente.botet_at_[hidden]
Date: 2013-10-04 16:05:26


Author: viboes
Date: 2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013)
New Revision: 86161
URL: http://svn.boost.org/trac/boost/changeset/86161

Log:
Thread: Added a polymorphic executor and an executor_adaptor.

Added:
   trunk/boost/thread/executor.hpp (contents, props changed)
   trunk/libs/thread/example/executor.cpp (contents, props changed)
Text files modified:
   trunk/boost/thread/executor.hpp | 175 ++++++++++++++++++++++++++++++++++++++++
   trunk/boost/thread/thread_pool.hpp | 50 ++++++++---
   trunk/libs/thread/example/executor.cpp | 149 ++++++++++++++++++++++++++++++++++
   trunk/libs/thread/test/Jamfile.v2 | 4
   4 files changed, 361 insertions(+), 17 deletions(-)

Added: trunk/boost/thread/executor.hpp
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ trunk/boost/thread/executor.hpp 2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013) (r86161)
@@ -0,0 +1,175 @@
+// Copyright (C) 2013 Vicente J. Botet Escriba
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// 2013/09 Vicente J. Botet Escriba
+// Adapt to boost from CCIA C++11 implementation
+
+#ifndef BOOST_THREAD_EXECUTOR_HPP
+#define BOOST_THREAD_EXECUTOR_HPP
+
+#include <boost/thread/detail/config.hpp>
+
+
+#include <boost/thread/detail/delete.hpp>
+#include <boost/thread/detail/move.hpp>
+#include <boost/thread/detail/function_wrapper.hpp>
+
+
+#include <boost/config/abi_prefix.hpp>
+
+namespace boost
+{
+
+ class executor
+ {
+ public:
+ /// type-erasure to store the works to do
+ typedef detail::function_wrapper work;
+
+ /// executor is not copyable.
+ BOOST_THREAD_NO_COPYABLE(executor)
+ executor() {}
+
+ /**
+ * \b Effects: Destroys the executor.
+ *
+ * \b Synchronization: The completion of all the closures happen before the completion of the executor destructor.
+ */
+ virtual ~executor() {};
+
+ /**
+ * \b Effects: close the \c executor for submissions.
+ * The worker threads will work until there is no more closures to run.
+ */
+ virtual void close() = 0;
+
+ /**
+ * \b Returns: whether the pool is closed for submissions.
+ */
+ virtual bool closed() = 0;
+
+ /**
+ * \b Effects: The specified closure will be scheduled for execution at some point in the future.
+ * If invoked closure throws an exception the executor will call std::terminate, as is the case with threads.
+ *
+ * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+ *
+ * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+ * Whatever exception that can be throw while storing the closure.
+ */
+ virtual void submit(BOOST_THREAD_RV_REF(work) closure) = 0;
+
+ /**
+ * \b Requires: \c Closure is a model of Callable(void()) and a model of CopyConstructible/MoveConstructible.
+ *
+ * \b Effects: The specified closure will be scheduled for execution at some point in the future.
+ * If invoked closure throws an exception the thread pool will call std::terminate, as is the case with threads.
+ *
+ * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+ *
+ * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+ * Whatever exception that can be throw while storing the closure.
+ */
+//#if ! defined(BOOST_NO_CXX11_RVALUE_REFERENCES)
+ template <typename Closure>
+ void submit(Closure const& closure)
+ {
+ work w ((closure));
+ submit(boost::move(w));
+ }
+//#endif
+
+ template <typename Closure>
+ void submit(BOOST_THREAD_RV_REF(Closure) closure)
+ {
+ work w = boost::move(closure);
+ submit(boost::move(w));
+ }
+
+ /**
+ * Effects: try to execute one task.
+ * Returns: whether a task has been executed.
+ * Throws: whatever the current task constructor throws or the task() throws.
+ */
+ virtual bool try_executing_one() = 0;
+
+ /**
+ * \b Requires: This must be called from an scheduled task.
+ *
+ * \b Effects: reschedule functions until pred()
+ */
+ template <typename Pred>
+ bool reschedule_until(Pred const& pred)
+ {
+ do {
+ //schedule_one_or_yield();
+ if ( ! try_executing_one())
+ {
+ return false;
+ }
+ } while (! pred());
+ return true;
+ }
+ };
+ template <typename Executor>
+ class executor_adaptor : public executor
+ {
+ Executor ex;
+ public:
+ /// type-erasure to store the works to do
+ typedef detail::function_wrapper work;
+
+ /// executor is not copyable.
+ BOOST_THREAD_NO_COPYABLE(executor_adaptor)
+
+ /**
+ * executor_adaptor constructor
+ */
+ executor_adaptor() : ex() {}
+
+ /**
+ * executor_adaptor constructor
+ */
+#if ! defined(BOOST_NO_CXX11_VARIADIC_TEMPLATES)
+ template <typename ...Args>
+ executor_adaptor(BOOST_THREAD_RV_REF(Args) ... args) : ex(boost::forward<Args>(args)...) {}
+#endif
+ /**
+ * \b Effects: close the \c executor for submissions.
+ * The worker threads will work until there is no more closures to run.
+ */
+ void close() { ex.close(); }
+
+ /**
+ * \b Returns: whether the pool is closed for submissions.
+ */
+ bool closed() { return ex.closed(); }
+
+ /**
+ * \b Effects: The specified closure will be scheduled for execution at some point in the future.
+ * If invoked closure throws an exception the executor will call std::terminate, as is the case with threads.
+ *
+ * \b Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
+ *
+ * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+ * Whatever exception that can be throw while storing the closure.
+ */
+ void submit(BOOST_THREAD_RV_REF(work) closure) { return ex.submit(boost::move(closure)); }
+
+
+ /**
+ * Effects: try to execute one task.
+ * Returns: whether a task has been executed.
+ * Throws: whatever the current task constructor throws or the task() throws.
+ */
+ bool try_executing_one() { return ex.try_executing_one(); }
+
+ };
+
+}
+
+#include <boost/config/abi_suffix.hpp>
+
+#endif

Modified: trunk/boost/thread/thread_pool.hpp
==============================================================================
--- trunk/boost/thread/thread_pool.hpp Fri Oct 4 15:02:11 2013 (r86160)
+++ trunk/boost/thread/thread_pool.hpp 2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013) (r86161)
@@ -52,6 +52,7 @@
     /// A move aware vector
     thread_vector threads;
 
+ public:
     /**
      * Effects: try to execute one task.
      * Returns: whether a task has been executed.
@@ -78,6 +79,7 @@
         return false;
       }
     }
+ private:
     /**
      * Effects: schedule one task or yields
      * Throws: whatever the current task constructor throws or the task() throws.
@@ -89,15 +91,19 @@
           this_thread::yield();
         }
     }
+
     /**
      * The main loop of the worker threads
      */
     void worker_thread()
     {
- while (!is_closed())
+ while (!closed())
       {
         schedule_one_or_yield();
       }
+ while (try_executing_one())
+ {
+ }
     }
 
   public:
@@ -105,7 +111,9 @@
     BOOST_THREAD_NO_COPYABLE(thread_pool)
 
     /**
- * Effects: creates a thread pool that runs closures on @c thread_count threads.
+ * \b Effects: creates a thread pool that runs closures on \c thread_count threads.
+ *
+ * \b Throws: Whatever exception is thrown while initializing the needed resources.
      */
     thread_pool(unsigned const thread_count = thread::hardware_concurrency())
     {
@@ -123,8 +131,9 @@
       }
     }
     /**
- * Effects: Destroys the thread pool.
- * Synchronization: The completion of all the closures happen before the completion of the thread pool destructor.
+ * \b Effects: Destroys the thread pool.
+ *
+ * \b Synchronization: The completion of all the closures happen before the completion of the \c thread_pool destructor.
      */
     ~thread_pool()
     {
@@ -134,7 +143,8 @@
     }
 
     /**
- * Effects: close the thread_pool for submissions. The worker threads will work until
+ * \b Effects: close the \c thread_pool for submissions.
+ * The worker threads will work until there is no more closures to run.
      */
     void close()
     {
@@ -142,19 +152,23 @@
     }
 
     /**
- * Returns: whether the pool is closed for submissions.
+ * \b Returns: whether the pool is closed for submissions.
      */
- bool is_closed()
+ bool closed()
     {
       return work_queue.closed();
     }
 
     /**
- * Effects: The specified function will be scheduled for execution at some point in the future.
- * If invoking closure throws an exception the thread pool will call std::terminate, as is the case with threads.
- * Synchronization: completion of closure on a particular thread happens before destruction of thread's thread local variables.
- * Throws: sync_queue_is_closed if the thread pool is closed.
+ * \b Requires: \c Closure is a model of \c Callable(void()) and a model of \c CopyConstructible/MoveConstructible.
+ *
+ * \b Effects: The specified \c closure will be scheduled for execution at some point in the future.
+ * If invoked closure throws an exception the \c thread_pool will call \c std::terminate, as is the case with threads.
      *
+ * \b Synchronization: completion of \c closure on a particular thread happens before destruction of thread's thread local variables.
+ *
+ * \b Throws: \c sync_queue_is_closed if the thread pool is closed.
+ * Whatever exception that can be throw while storing the closure.
      */
     template <typename Closure>
     void submit(Closure const& closure)
@@ -172,15 +186,21 @@
     }
 
     /**
- * This must be called from an scheduled task.
- * Effects: reschedule functions until pred()
+ * \b Requires: This must be called from an scheduled task.
+ *
+ * \b Effects: reschedule functions until pred()
      */
     template <typename Pred>
- void reschedule_until(Pred const& pred)
+ bool reschedule_until(Pred const& pred)
     {
       do {
- schedule_one_or_yield();
+ //schedule_one_or_yield();
+ if ( ! try_executing_one())
+ {
+ return false;
+ }
       } while (! pred());
+ return true;
     }
 
   };

Added: trunk/libs/thread/example/executor.cpp
==============================================================================
--- /dev/null 00:00:00 1970 (empty, because file is newly added)
+++ trunk/libs/thread/example/executor.cpp 2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013) (r86161)
@@ -0,0 +1,149 @@
+// Copyright (C) 2012-2013 Vicente Botet
+//
+// Distributed under the Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#define BOOST_THREAD_VERSION 4
+#define BOOST_THREAD_USES_LOG
+#define BOOST_THREAD_USES_LOG_THREAD_ID
+
+#include <boost/thread/detail/log.hpp>
+#include <boost/thread/thread_pool.hpp>
+#include <boost/thread/executor.hpp>
+#include <boost/assert.hpp>
+#include <string>
+
+void p1()
+{
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " P1" << BOOST_THREAD_END_LOG;
+}
+
+void p2()
+{
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " P2" << BOOST_THREAD_END_LOG;
+}
+
+void push(boost::container::deque<boost::detail::function_wrapper> &data_, BOOST_THREAD_RV_REF(boost::detail::function_wrapper) closure)
+{
+ try
+ {
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+ boost::detail::function_wrapper v;
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+ //v = boost::move(closure);
+ //v = boost::forward<boost::detail::function_wrapper>(closure);
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+
+ data_.push_back(boost::move(closure));
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+
+ //data_.push_back(boost::forward<boost::detail::function_wrapper>(closure));
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+
+ }
+ catch (std::exception& ex)
+ {
+ BOOST_THREAD_LOG
+ << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+ }
+ catch (...)
+ {
+ BOOST_THREAD_LOG
+ << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+ }
+}
+
+template <typename Closure>
+void submit(boost::container::deque<boost::detail::function_wrapper> &data_, BOOST_THREAD_FWD_REF(Closure) closure)
+{
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+ //work w =boost::move(closure);
+ //work_queue.push(boost::move(w));
+ //push(data_, boost::detail::function_wrapper(boost::forward<Closure>(closure)));
+ boost::detail::function_wrapper v =boost::forward<Closure>(closure);
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+ push(data_, boost::move(v));
+
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+}
+
+int main()
+{
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+#if 0
+ {
+ try
+ {
+ boost::detail::function_wrapper f(&p1);
+
+ boost::container::deque<boost::detail::function_wrapper> data_;
+ data_.push_back(boost::move(f));
+ data_.push_back(boost::detail::function_wrapper(&p1));
+ submit(data_, &p1);
+ }
+ catch (std::exception& ex)
+ {
+ BOOST_THREAD_LOG
+ << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+ }
+ catch (...)
+ {
+ BOOST_THREAD_LOG
+ << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+ }
+
+ typedef boost::container::vector<boost::thread> thread_vector;
+ thread_vector threads;
+
+ }
+#endif
+#if 1
+ {
+ try
+ {
+ boost::executor_adaptor<boost::thread_pool> ea;
+ boost::executor &tp=ea;
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+ tp.submit(&p1);
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << " <MAIN" << BOOST_THREAD_END_LOG;
+ tp.submit(&p2);
+ tp.submit(&p1);
+ tp.submit(&p2);
+ tp.submit(&p1);
+ tp.submit(&p2);
+ tp.submit(&p1);
+ tp.submit(&p2);
+ tp.submit(&p1);
+ tp.submit(&p2);
+ }
+ catch (std::exception& ex)
+ {
+ BOOST_THREAD_LOG
+ << "ERRORRRRR " << ex.what() << "" << BOOST_THREAD_END_LOG;
+ return 1;
+ }
+ catch (...)
+ {
+ BOOST_THREAD_LOG
+ << " ERRORRRRR exception thrown" << BOOST_THREAD_END_LOG;
+ return 2;
+ }
+ }
+#endif
+ BOOST_THREAD_LOG
+ << boost::this_thread::get_id() << "MAIN>" << BOOST_THREAD_END_LOG;
+ return 0;
+}

Modified: trunk/libs/thread/test/Jamfile.v2
==============================================================================
--- trunk/libs/thread/test/Jamfile.v2 Fri Oct 4 15:02:11 2013 (r86160)
+++ trunk/libs/thread/test/Jamfile.v2 2013-10-04 16:05:26 EDT (Fri, 04 Oct 2013) (r86161)
@@ -719,6 +719,7 @@
           [ thread-run2 ../example/lambda_future.cpp : ex_lambda_future ]
           [ thread-run2 ../example/not_interleaved2.cpp : ex_not_interleaved2 ]
           [ thread-run2 ../example/thread_pool.cpp : ex_thread_pool ]
+ [ thread-run2 ../example/executor.cpp : ex_executor ]
 
     ;
 
@@ -798,8 +799,7 @@
           #[ thread-run test_9079_a.cpp ]
           #[ thread-run test_9079_b.cpp ]
           #[ thread-run clang_main.cpp ]
-
-
+ #[ thread-run test_8557.cpp ]
 
     ;
 


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk