Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r48654 - in sandbox/SOC/2007/signals: boost/dataflow/signals/component libs/dataflow/doc libs/dataflow/doc/signals libs/dataflow/example/threadpool
From: stipe_at_[hidden]
Date: 2008-09-07 16:53:06


Author: srajko
Date: 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
New Revision: 48654
URL: http://svn.boost.org/trac/boost/changeset/48654

Log:
updated threading docs
Text files modified:
   sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp | 2
   sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 | 1
   sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk | 5 +
   sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk | 25 +++++++
   sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk | 136 ++++++++++++++++++++++++++++++++++++++++
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp | 83 ++++++++++++++++++++----
   6 files changed, 236 insertions(+), 16 deletions(-)

Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -70,6 +70,8 @@
                 cond.notify_all();
                 delete thread_object;
         }
+ /// If the timed_generator was enabled with a signal count, this function
+ /// will wait until the specified number of signals was emitted.
     void wait_until_completed()
     {
         boost::mutex::scoped_lock lock(m_completion_mutex);

Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -23,6 +23,7 @@
       [ glob ../../../boost/dataflow/signals/component/multiplexer.hpp ]
       [ glob ../../../boost/dataflow/signals/component/mutex.hpp ]
       [ glob ../../../boost/dataflow/signals/component/condition.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/timed_generator.hpp ]
       [ glob ../../../boost/dataflow/signals/component/function.hpp ]
       [ glob ../../../boost/dataflow/signals/component/socket_sender.hpp ]
       [ glob ../../../boost/dataflow/signals/component/socket_receiver.hpp ]

Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -93,12 +93,12 @@
 [template instantiator[] [link dataflow.signals.components.generic.instantiator instantiator]]
 [template modifier[] [link dataflow.signals.components.generic.modifier modifier]]
 [template storage[] [link dataflow.signals.components.properties.storage storage]]
-[template timed_storage[] [link dataflow.signals.components.properties.timed_storage timed_storage]]
 [template counter[] [link dataflow.signals.components.properties.counter counter]]
 [template junction[] [link dataflow.signals.components.flow.junction junction]]
 [template multiplexer[] [link dataflow.signals.components.flow.multiplexer multiplexer]]
 [template mutex[] [link dataflow.signals.components.threading.mutex mutex]]
 [template condition[] [link dataflow.signals.components.threading.condition condition]]
+[template timed_generator[] [link dataflow.signals.components.threading.timed_generator timed_generator]]
 [template function[] [link dataflow.signals.components.adapters.function function]]
 [template socket_sender[] [link dataflow.signals.components.network.socket_sender socket_sender]]
 [template socket_receiver[] [link dataflow.signals.components.network.socket_receiver socket_receiver]]
@@ -114,6 +114,8 @@
 [template BoostFusion[] [@http://spirit.sourceforge.net/dl_more/fusion_v2/libs/fusion/doc/html/index.html Boost.Fusion]]
 [template BoostIostreams[] [@http://www.boost.org/libs/iostreams/doc/index.html]]
 [template BoostSignals[] [@http://www.boost.org/doc/html/signals.html Boost.Signals]]
+[template ThreadSafeSignals[] [@http://www.comedi.org/projects/thread_safe_signals/libs/thread_safe_signals/doc/html/index.html thread_safe_signals]]
+[template Threadpool[] [@http://www.boostpro.com/vault/index.php?&direction=0&order=&directory=Concurrent%20Programming Threadpool]]
 [template BoostThread[] [@http://www.boost.org/doc/html/thread.html Boost.Thread]]
 [template BoostPhoenix2[] [@http://spirit.sourceforge.net/ Boost.Phoenix2]]
 [template BoostPhoenix[] [@http://spirit.sourceforge.net/ Boost.Phoenix]]
@@ -192,6 +194,7 @@
 [import ../example/signals/conditional_example.cpp]
 [import ../example/signals/instantiator_example.cpp]
 [import ../example/signals/modifier_example.cpp]
+[import ../example/threadpool/threadpool_component_example.cpp]
 
 [import ../../../boost/dataflow/vtk/support.hpp]
 [import ../example/VTK/Cone.cxx]

Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -12,8 +12,9 @@
     [[[counter]][Counts the number of signals passing through.]]
     [[[junction]][Convenient when multiple producers need to be connected to the same set of consumers. Also has gate functionality.]]
     [[[multiplexer]][Allows selection of which of the input ports is forwarded]]
- [[[mutex]][Provides mutexing on incoming signals for multithreaded environments]]
+ [[[mutex]][Provides mutex locking on incoming signals for multithreaded environments]]
     [[[condition]][Signals a threading condition whenever a signal is received]]
+ [[[timed_generator]][Periodically generates signals in its own thread]]
     [[[function]][Allows any Boost.Function object to be applied to a passing signal]]
     [[[chain]][Chains a number of components together into a new component]]
     [[[socket_sender] and [socket_receiver]][Allow a signal dataflow network to
@@ -299,6 +300,28 @@
 
 [endsect][/condition]
 
+[section timed_generator]
+[*See also]: [classref boost::signals::timed_generator timed_generator class reference.]
+
+[heading Model of]
+* [SignalFilterComponent]
+
+[heading Inherits]
+* [storage]
+
+[heading Description]
+
+The [timed_generator] class periodically emits stored values through a signal,
+and in its own thread.
+
+[heading Example]
+
+See the example provided for the [instantiator] component for an example
+of using [timed_generator].
+
+[endsect][/timed_generator]
+
+
 [endsect][/threading]
 
 [section:adapters Adapters]

Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -132,5 +132,141 @@
 
 [endsect]
 
+[section Threading]
+
+[DataflowSignals] can be used in multi-threaded environments. If you do so,
+it is recommended you use [ThreadSafeSignals] rather than
+[BoostSignals]. [ThreadSafeSignals] can be installed as a drop-in replacement
+for [BoostSignals], but to use it with [DataflowSignals], you currently
+need to use the following `#define`s before including any [DataflowSignals]
+or [ThreadSafeSignals]/[BoostSignals] headers:
+
+ #define signalslib signals
+ #define signals signals
+
+This will cause [ThreadSafeSignals] to use the `boost::signals` namespace, rather
+than the `boost::signalslib` namespace it uses by default.
+
+[heading Provided components]
+
+[DataflowSignals] provides some components specifically intended for
+multi-threaded applications. These are:
+
+* [mutex] (provides mutexing locking on incoming signals)
+* [condition] (notifies a threading condition whenever a signal is received)
+* [timed_generator] (periodically generates signals in its own thread)
+
+The documentation pages for each of these components provide more information.
+
+[heading Example of Creating a New Threadpool Component]
+
+Here is an example of developing a new component specifically intended for
+threading. The component makes use of the [Threadpool] library by
+Oliver Kowalke. We will call it `async_component`.
+
+The purpose of the component is to make a signal call asynchronous. Instead
+of the consumer component processing the signal immediately, the signal call
+will be added to a thread pool and processed later. `async_component` is
+a class template, templated on the threadpool type, and the type of the
+underlying component that will process the asynchronous signal.
+
+Let's begin with the use code that this component will allow us to use. The
+three examples provided are:
+
+* mixing synchronous and asynchronous signals
+* assigning priorities to tasks / signals
+* using asynchronous signals for cyclic networks
+
+[heading mixing synchronous and asynchronous signals]
+
+Our
+first example will be just a simple network that involves both synchronous
+and asynchronous filters. The filters will use a simple function that
+performs an increment:
+
+[threadpool_component_example_use_inc_fn]
+
+Here is the simple example:
+
+[threadpool_component_example_use_simple]
+
+And a sample output of a run (output contains `adding task` whenever a new
+task has been added to the pool):
+
+[pre
+adding task
+adding task
+filter: 1
+filter: 2
+filter: 3
+filter: 1
+adding task
+filter: 2
+]
+
+[heading assigning priorities to tasks / signals]
+
+The second example involves a priority pool that allows us to specify the
+priority of a task. This example and the next will use the following filter
+component, which just prints some texts, waits a little, and forwards the signal.
+This example only uses the text printing capability of the component:
+
+[threadpool_component_example_use_printer]
+
+The example uses the priorities to enforce an order of execution that prints
+a particular message:
+
+[threadpool_component_example_use_priority]
+
+The output is:
+
+[pre
+adding task
+adding task
+adding task
+adding task
+Hello
+<>
+World
+!
+]
+
+[heading using asynchronous signals for cyclic networks]
+
+The final example demonstrates the use of a threadpool and asynchronous signals
+to allow cyclic networks. The following example induces a cycle consisting
+of `ticker1`, `ticker2`, and `ticker3`.
+
+[threadpool_component_example_use_cyclic]
+
+[pre
+adding task
+tick 1...
+adding task
+tick 2...
+adding task
+tick 3...
+adding task
+tick 1...
+adding task
+tick 2...
+adding task
+]
+
+[heading async_component implementation]
+
+Finally, for those interested, here is the implementation of the
+`async_component` class template. Since it operates on an underlying
+component, the implementation is not trivial. Nonetheless, the [DataflowSignals]
+building blocks still allow the implementation to be rather concise, considering
+that:
+
+* `async_component` can be used on a numer of underlying components, regardless
+ of signal signature.
+* `async_component` works with both priority and non-priority thread pools.
+
+[threadpool_component_example_implementation]
+
+[endsect]
 
 [endsect][/tutorial]

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -1,3 +1,5 @@
+//[ threadpool_component_example_implementation
+
 // Copyright 2008 Stjepan Rajko.
 // Distributed under the Boost Software License, Version 1.0. (See
 // accompanying file LICENSE_1_0.txt or copy at
@@ -11,10 +13,9 @@
 #include <boost/fusion/include/fused.hpp>
 #include <boost/fusion/include/join.hpp>
 #include <boost/fusion/include/make_vector.hpp>
+
 #define signalslib signals
 #define signals signals
-#include <boost/thread_safe_signal.hpp>
-//#include <boost/signals.hpp>
 
 #include <boost/dataflow/signals/component/storage.hpp>
 #include <boost/dataflow/signals/component/function.hpp>
@@ -38,6 +39,8 @@
 
 namespace detail {
 
+ // async_component_impl_base will contain the implementation details
+ // shared between the priority and non-priority versions
     template<typename Threadpool, typename Component>
     class async_component_impl_base
     // filter_base is like filter, but it doesn't come with it's own signal.
@@ -51,6 +54,8 @@
         typedef typename Component::signal_type signal_type;
         typedef void result_type;
         
+ // the constructors will just initialize the threadpool reference,
+ // the underlying component, and m_component_function
         template<typename T0>
         async_component_impl_base(Threadpool &threadpool, const T0 &t0)
             : m_component(t0), m_threadpool(threadpool)
@@ -84,10 +89,10 @@
         boost::function<signature_type> m_component_function;
         
     private:
+ // record the appropriate operator() overload of Component into m_component_function,
+ // so we can submit it as a task later
         void init_m_component_function()
         {
- // record the appropriate operator() overload of Component into m_component_function,
- // so we can submit it as a task later
             typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;
 
             m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
@@ -98,12 +103,14 @@
     template<typename Threadpool, typename Component, typename Enable=void>
     class async_component_impl;
     
+ // the non-priority version (for non-priority thread pools)
     template<typename Threadpool, typename Component>
     class async_component_impl<Threadpool, Component,
         typename boost::disable_if<tp::has_priority<Threadpool> >::type>
         : public async_component_impl_base<Threadpool, Component>
     {
     public:
+ // forwarding constructors
         template<typename T0>
         async_component_impl(Threadpool &threadpool, const T0 &t0)
             : async_component_impl_base<Threadpool, Component>(threadpool, t0)
@@ -114,6 +121,8 @@
             : async_component_impl_base<Threadpool, Component>(threadpool, t0, t1)
         {}
         
+ // when receiving the signal (which we get in a Fusion sequence)
+ // we will submit the task
         template <class Seq>
         void operator()(const Seq &vec_par) const
         {
@@ -144,12 +153,15 @@
     {
         typedef typename tp::priority_type<Threadpool>::type priority_type;
     public:
+ // constructor stores the priority, and forwards the rest
         template<typename T0>
         async_component_impl(Threadpool &threadpool, const priority_type &priority, const T0 &t0)
             : async_component_impl_base<Threadpool, Component>(threadpool, t0)
             , m_priority(priority)
         {}
         
+ // when receiving the signal (which we get in a Fusion sequence)
+ // we will submit the task with the stored priority.
         template <class Seq>
         void operator()(const Seq &vec_par) const
         {
@@ -180,6 +192,10 @@
 
 // our new async_component class - it will create a new task for the underlying
 // component when it's operator() is called.
+// unfused_inherited is an adaptor provided by Dataflow.Signals,
+// which will allow the above implementation (which uses fused signals)
+// to work with unfused signal signatures (fused / unfused in the Boost.Fusion
+// sense)
 template<typename Threadpool, typename Component>
 class async_component : public boost::fusion::unfused_inherited<
     detail::async_component_impl<Threadpool,Component>,
@@ -190,6 +206,7 @@
         typename boost::function_types::parameter_types<typename Component::signature_type>::type>
         base_type;
 public:
+ // just forwarding constructors
     template<typename T0>
     async_component(Threadpool &threadpool, const T0 &t0)
         : base_type(threadpool, t0)
@@ -221,6 +238,10 @@
             boost::bind(&Component::send, boost::ref(component)), priority));
 }
 
+//]
+
+//[ threadpool_component_example_use_inc_fn
+
 // just an operation to work with
 int inc_fn(int x)
 {
@@ -228,7 +249,11 @@
     return x+1;
 }
 
-// a component that displays text, waits an optional period, and then sends a signal
+//]
+
+//[ threadpool_component_example_use_printer
+
+// a component that displays text, waits an optional period, and then forwards the signal
 class printer : public boost::signals::filter<printer, void()>
 {
 public:
@@ -246,22 +271,34 @@
     unsigned m_wait_milliseconds;
 };
 
+//]
+
 int main( int argc, char *argv[])
 {
- typedef
+//[ threadpool_component_example_use_simple
+ // our Threadpool type - simple FIFO thread pool.
+ typedef
         tp::pool<
             tp::fixed,
             tp::unbounded_channel< tp::fifo >
> threadpool_type;
-
+
+ // we limit the number of threads to 5
     threadpool_type pool(tp::max_poolsize(5));
 
+ // the component types:
+ // The source will store an int
     typedef boost::signals::storage<void(int)> source_type;
+ // A filter will process the int
     typedef boost::signals::function<void(int), int(int)> filter_type;
+ // This filter will process the int asynchronously
     typedef async_component<threadpool_type, filter_type> async_filter_type;
 
- // our components
- source_type source(1);
+ // our components:
+ // We start with a 0
+ source_type source(0);
+
+ // an assortment of synchronous and asynchronous increase-filters
     async_filter_type increase1(pool, inc_fn);
     filter_type increase2(inc_fn);
     filter_type increase3(inc_fn);
@@ -284,30 +321,39 @@
 
     // submit the first task
     submit(pool, source);
-
+//]
     // wait a little
     boost::this_thread::sleep(boost::posix_time::seconds(1));
 
     // --------------------------------------------------------------
     std::cout << "------------------------------------" << std::endl;
 
+//[ threadpool_component_example_use_priority
+
+ // a pool that uses priorities for tasks
     typedef
         tp::pool<
             tp::fixed,
             tp::unbounded_channel< tp::priority< int > >
> priority_threadpool_type;
-
+
+ // we limit the pool to one thread
     priority_threadpool_type priority_pool(tp::max_poolsize(1));
     
+ // components:
     typedef boost::signals::storage<void()> print_starter_type;
     typedef async_component<priority_threadpool_type, printer> async_printer_type;
     
+ // this will start the execution
     print_starter_type print_starter;
+ // this will print the text - we assign priorities so that the print
+ // order is "Hello","<>","World!","!":
     async_printer_type print1(priority_pool, 3, "World");
     async_printer_type print2(priority_pool, 2, "<>");
     async_printer_type print3(priority_pool, 1, "Hello");
     async_printer_type print4(priority_pool, 4, "!");
     
+ // our network:
     print_starter
         | print1
         | print2
@@ -322,24 +368,29 @@
         
     // submit the first task
     submit(priority_pool, 1, print_starter);
-
+//]
     // wait a little
     boost::this_thread::sleep(boost::posix_time::seconds(1));
         
     // --------------------------------------------------------------
     std::cout << "------------------------------------" << std::endl;
 
+//[ threadpool_component_example_use_cyclic
+
+ // our components:
     typedef boost::signals::storage<void()> tick_starter_type;
     typedef async_component<threadpool_type, printer> ticker_type;
     
- // our components
+ // this will start the execution
     tick_starter_type tick_starter;
+ // each component will print a line and wait a second before
+ // forwarding the signal
     ticker_type
         ticker1(pool, "tick 1...", 1000),
         ticker2(pool, "tick 2...", 1000),
         ticker3(pool, "tick 3...", 1000);
 
- // our network
+ // our network (note the ticker1, ticker2, ticker3 cycle)
     tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
     // this was equivalent to:
     // connect(tick_starter, ticker1);
@@ -353,6 +404,10 @@
     // wait a little
     boost::this_thread::sleep(boost::posix_time::seconds(5));
 
+ // shutdown the pool (no more new tasks will be processed)
     pool.shutdown();
+
+ //]
     return 0;
 }
+


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk