|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r48654 - in sandbox/SOC/2007/signals: boost/dataflow/signals/component libs/dataflow/doc libs/dataflow/doc/signals libs/dataflow/example/threadpool
From: stipe_at_[hidden]
Date: 2008-09-07 16:53:06
Author: srajko
Date: 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
New Revision: 48654
URL: http://svn.boost.org/trac/boost/changeset/48654
Log:
updated threading docs
Text files modified:
sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp | 2
sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 | 1
sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk | 5 +
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk | 25 +++++++
sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk | 136 ++++++++++++++++++++++++++++++++++++++++
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp | 83 ++++++++++++++++++++----
6 files changed, 236 insertions(+), 16 deletions(-)
Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/timed_generator.hpp 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -70,6 +70,8 @@
cond.notify_all();
delete thread_object;
}
+ /// If the timed_generator was enabled with a signal count, this function
+ /// will wait until the specified number of signals was emitted.
void wait_until_completed()
{
boost::mutex::scoped_lock lock(m_completion_mutex);
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/Jamfile.v2 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -23,6 +23,7 @@
[ glob ../../../boost/dataflow/signals/component/multiplexer.hpp ]
[ glob ../../../boost/dataflow/signals/component/mutex.hpp ]
[ glob ../../../boost/dataflow/signals/component/condition.hpp ]
+ [ glob ../../../boost/dataflow/signals/component/timed_generator.hpp ]
[ glob ../../../boost/dataflow/signals/component/function.hpp ]
[ glob ../../../boost/dataflow/signals/component/socket_sender.hpp ]
[ glob ../../../boost/dataflow/signals/component/socket_receiver.hpp ]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/dataflow.qbk 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -93,12 +93,12 @@
[template instantiator[] [link dataflow.signals.components.generic.instantiator instantiator]]
[template modifier[] [link dataflow.signals.components.generic.modifier modifier]]
[template storage[] [link dataflow.signals.components.properties.storage storage]]
-[template timed_storage[] [link dataflow.signals.components.properties.timed_storage timed_storage]]
[template counter[] [link dataflow.signals.components.properties.counter counter]]
[template junction[] [link dataflow.signals.components.flow.junction junction]]
[template multiplexer[] [link dataflow.signals.components.flow.multiplexer multiplexer]]
[template mutex[] [link dataflow.signals.components.threading.mutex mutex]]
[template condition[] [link dataflow.signals.components.threading.condition condition]]
+[template timed_generator[] [link dataflow.signals.components.threading.timed_generator timed_generator]]
[template function[] [link dataflow.signals.components.adapters.function function]]
[template socket_sender[] [link dataflow.signals.components.network.socket_sender socket_sender]]
[template socket_receiver[] [link dataflow.signals.components.network.socket_receiver socket_receiver]]
@@ -114,6 +114,8 @@
[template BoostFusion[] [@http://spirit.sourceforge.net/dl_more/fusion_v2/libs/fusion/doc/html/index.html Boost.Fusion]]
[template BoostIostreams[] [@http://www.boost.org/libs/iostreams/doc/index.html]]
[template BoostSignals[] [@http://www.boost.org/doc/html/signals.html Boost.Signals]]
+[template ThreadSafeSignals[] [@http://www.comedi.org/projects/thread_safe_signals/libs/thread_safe_signals/doc/html/index.html thread_safe_signals]]
+[template Threadpool[] [@http://www.boostpro.com/vault/index.php?&direction=0&order=&directory=Concurrent%20Programming Threadpool]]
[template BoostThread[] [@http://www.boost.org/doc/html/thread.html Boost.Thread]]
[template BoostPhoenix2[] [@http://spirit.sourceforge.net/ Boost.Phoenix2]]
[template BoostPhoenix[] [@http://spirit.sourceforge.net/ Boost.Phoenix]]
@@ -192,6 +194,7 @@
[import ../example/signals/conditional_example.cpp]
[import ../example/signals/instantiator_example.cpp]
[import ../example/signals/modifier_example.cpp]
+[import ../example/threadpool/threadpool_component_example.cpp]
[import ../../../boost/dataflow/vtk/support.hpp]
[import ../example/VTK/Cone.cxx]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/components.qbk 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -12,8 +12,9 @@
[[[counter]][Counts the number of signals passing through.]]
[[[junction]][Convenient when multiple producers need to be connected to the same set of consumers. Also has gate functionality.]]
[[[multiplexer]][Allows selection of which of the input ports is forwarded]]
- [[[mutex]][Provides mutexing on incoming signals for multithreaded environments]]
+ [[[mutex]][Provides mutex locking on incoming signals for multithreaded environments]]
[[[condition]][Signals a threading condition whenever a signal is received]]
+ [[[timed_generator]][Periodically generates signals in its own thread]]
[[[function]][Allows any Boost.Function object to be applied to a passing signal]]
[[[chain]][Chains a number of components together into a new component]]
[[[socket_sender] and [socket_receiver]][Allow a signal dataflow network to
@@ -299,6 +300,28 @@
[endsect][/condition]
+[section timed_generator]
+[*See also]: [classref boost::signals::timed_generator timed_generator class reference.]
+
+[heading Model of]
+* [SignalFilterComponent]
+
+[heading Inherits]
+* [storage]
+
+[heading Description]
+
+The [timed_generator] class periodically emits stored values through a signal,
+and in its own thread.
+
+[heading Example]
+
+See the example provided for the [instantiator] component for an example
+of using [timed_generator].
+
+[endsect][/timed_generator]
+
+
[endsect][/threading]
[section:adapters Adapters]
Modified: sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/doc/signals/tutorial.qbk 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -132,5 +132,141 @@
[endsect]
+[section Threading]
+
+[DataflowSignals] can be used in multi-threaded environments. If you do so,
+it is recommended you use [ThreadSafeSignals] rather than
+[BoostSignals]. [ThreadSafeSignals] can be installed as a drop-in replacement
+for [BoostSignals], but to use it with [DataflowSignals], you currently
+need to use the following `#define`s before including any [DataflowSignals]
+or [ThreadSafeSignals]/[BoostSignals] headers:
+
+ #define signalslib signals
+ #define signals signals
+
+This will cause [ThreadSafeSignals] to use the `boost::signals` namespace, rather
+than the `boost::signalslib` namespace it uses by default.
+
+[heading Provided components]
+
+[DataflowSignals] provides some components specifically intended for
+multi-threaded applications. These are:
+
+* [mutex] (provides mutexing locking on incoming signals)
+* [condition] (notifies a threading condition whenever a signal is received)
+* [timed_generator] (periodically generates signals in its own thread)
+
+The documentation pages for each of these components provide more information.
+
+[heading Example of Creating a New Threadpool Component]
+
+Here is an example of developing a new component specifically intended for
+threading. The component makes use of the [Threadpool] library by
+Oliver Kowalke. We will call it `async_component`.
+
+The purpose of the component is to make a signal call asynchronous. Instead
+of the consumer component processing the signal immediately, the signal call
+will be added to a thread pool and processed later. `async_component` is
+a class template, templated on the threadpool type, and the type of the
+underlying component that will process the asynchronous signal.
+
+Let's begin with the use code that this component will allow us to use. The
+three examples provided are:
+
+* mixing synchronous and asynchronous signals
+* assigning priorities to tasks / signals
+* using asynchronous signals for cyclic networks
+
+[heading mixing synchronous and asynchronous signals]
+
+Our
+first example will be just a simple network that involves both synchronous
+and asynchronous filters. The filters will use a simple function that
+performs an increment:
+
+[threadpool_component_example_use_inc_fn]
+
+Here is the simple example:
+
+[threadpool_component_example_use_simple]
+
+And a sample output of a run (output contains `adding task` whenever a new
+task has been added to the pool):
+
+[pre
+adding task
+adding task
+filter: 1
+filter: 2
+filter: 3
+filter: 1
+adding task
+filter: 2
+]
+
+[heading assigning priorities to tasks / signals]
+
+The second example involves a priority pool that allows us to specify the
+priority of a task. This example and the next will use the following filter
+component, which just prints some texts, waits a little, and forwards the signal.
+This example only uses the text printing capability of the component:
+
+[threadpool_component_example_use_printer]
+
+The example uses the priorities to enforce an order of execution that prints
+a particular message:
+
+[threadpool_component_example_use_priority]
+
+The output is:
+
+[pre
+adding task
+adding task
+adding task
+adding task
+Hello
+<>
+World
+!
+]
+
+[heading using asynchronous signals for cyclic networks]
+
+The final example demonstrates the use of a threadpool and asynchronous signals
+to allow cyclic networks. The following example induces a cycle consisting
+of `ticker1`, `ticker2`, and `ticker3`.
+
+[threadpool_component_example_use_cyclic]
+
+[pre
+adding task
+tick 1...
+adding task
+tick 2...
+adding task
+tick 3...
+adding task
+tick 1...
+adding task
+tick 2...
+adding task
+]
+
+[heading async_component implementation]
+
+Finally, for those interested, here is the implementation of the
+`async_component` class template. Since it operates on an underlying
+component, the implementation is not trivial. Nonetheless, the [DataflowSignals]
+building blocks still allow the implementation to be rather concise, considering
+that:
+
+* `async_component` can be used on a numer of underlying components, regardless
+ of signal signature.
+* `async_component` works with both priority and non-priority thread pools.
+
+[threadpool_component_example_implementation]
+
+[endsect]
[endsect][/tutorial]
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp 2008-09-07 16:53:05 EDT (Sun, 07 Sep 2008)
@@ -1,3 +1,5 @@
+//[ threadpool_component_example_implementation
+
// Copyright 2008 Stjepan Rajko.
// Distributed under the Boost Software License, Version 1.0. (See
// accompanying file LICENSE_1_0.txt or copy at
@@ -11,10 +13,9 @@
#include <boost/fusion/include/fused.hpp>
#include <boost/fusion/include/join.hpp>
#include <boost/fusion/include/make_vector.hpp>
+
#define signalslib signals
#define signals signals
-#include <boost/thread_safe_signal.hpp>
-//#include <boost/signals.hpp>
#include <boost/dataflow/signals/component/storage.hpp>
#include <boost/dataflow/signals/component/function.hpp>
@@ -38,6 +39,8 @@
namespace detail {
+ // async_component_impl_base will contain the implementation details
+ // shared between the priority and non-priority versions
template<typename Threadpool, typename Component>
class async_component_impl_base
// filter_base is like filter, but it doesn't come with it's own signal.
@@ -51,6 +54,8 @@
typedef typename Component::signal_type signal_type;
typedef void result_type;
+ // the constructors will just initialize the threadpool reference,
+ // the underlying component, and m_component_function
template<typename T0>
async_component_impl_base(Threadpool &threadpool, const T0 &t0)
: m_component(t0), m_threadpool(threadpool)
@@ -84,10 +89,10 @@
boost::function<signature_type> m_component_function;
private:
+ // record the appropriate operator() overload of Component into m_component_function,
+ // so we can submit it as a task later
void init_m_component_function()
{
- // record the appropriate operator() overload of Component into m_component_function,
- // so we can submit it as a task later
typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;
m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
@@ -98,12 +103,14 @@
template<typename Threadpool, typename Component, typename Enable=void>
class async_component_impl;
+ // the non-priority version (for non-priority thread pools)
template<typename Threadpool, typename Component>
class async_component_impl<Threadpool, Component,
typename boost::disable_if<tp::has_priority<Threadpool> >::type>
: public async_component_impl_base<Threadpool, Component>
{
public:
+ // forwarding constructors
template<typename T0>
async_component_impl(Threadpool &threadpool, const T0 &t0)
: async_component_impl_base<Threadpool, Component>(threadpool, t0)
@@ -114,6 +121,8 @@
: async_component_impl_base<Threadpool, Component>(threadpool, t0, t1)
{}
+ // when receiving the signal (which we get in a Fusion sequence)
+ // we will submit the task
template <class Seq>
void operator()(const Seq &vec_par) const
{
@@ -144,12 +153,15 @@
{
typedef typename tp::priority_type<Threadpool>::type priority_type;
public:
+ // constructor stores the priority, and forwards the rest
template<typename T0>
async_component_impl(Threadpool &threadpool, const priority_type &priority, const T0 &t0)
: async_component_impl_base<Threadpool, Component>(threadpool, t0)
, m_priority(priority)
{}
+ // when receiving the signal (which we get in a Fusion sequence)
+ // we will submit the task with the stored priority.
template <class Seq>
void operator()(const Seq &vec_par) const
{
@@ -180,6 +192,10 @@
// our new async_component class - it will create a new task for the underlying
// component when it's operator() is called.
+// unfused_inherited is an adaptor provided by Dataflow.Signals,
+// which will allow the above implementation (which uses fused signals)
+// to work with unfused signal signatures (fused / unfused in the Boost.Fusion
+// sense)
template<typename Threadpool, typename Component>
class async_component : public boost::fusion::unfused_inherited<
detail::async_component_impl<Threadpool,Component>,
@@ -190,6 +206,7 @@
typename boost::function_types::parameter_types<typename Component::signature_type>::type>
base_type;
public:
+ // just forwarding constructors
template<typename T0>
async_component(Threadpool &threadpool, const T0 &t0)
: base_type(threadpool, t0)
@@ -221,6 +238,10 @@
boost::bind(&Component::send, boost::ref(component)), priority));
}
+//]
+
+//[ threadpool_component_example_use_inc_fn
+
// just an operation to work with
int inc_fn(int x)
{
@@ -228,7 +249,11 @@
return x+1;
}
-// a component that displays text, waits an optional period, and then sends a signal
+//]
+
+//[ threadpool_component_example_use_printer
+
+// a component that displays text, waits an optional period, and then forwards the signal
class printer : public boost::signals::filter<printer, void()>
{
public:
@@ -246,22 +271,34 @@
unsigned m_wait_milliseconds;
};
+//]
+
int main( int argc, char *argv[])
{
- typedef
+//[ threadpool_component_example_use_simple
+ // our Threadpool type - simple FIFO thread pool.
+ typedef
tp::pool<
tp::fixed,
tp::unbounded_channel< tp::fifo >
> threadpool_type;
-
+
+ // we limit the number of threads to 5
threadpool_type pool(tp::max_poolsize(5));
+ // the component types:
+ // The source will store an int
typedef boost::signals::storage<void(int)> source_type;
+ // A filter will process the int
typedef boost::signals::function<void(int), int(int)> filter_type;
+ // This filter will process the int asynchronously
typedef async_component<threadpool_type, filter_type> async_filter_type;
- // our components
- source_type source(1);
+ // our components:
+ // We start with a 0
+ source_type source(0);
+
+ // an assortment of synchronous and asynchronous increase-filters
async_filter_type increase1(pool, inc_fn);
filter_type increase2(inc_fn);
filter_type increase3(inc_fn);
@@ -284,30 +321,39 @@
// submit the first task
submit(pool, source);
-
+//]
// wait a little
boost::this_thread::sleep(boost::posix_time::seconds(1));
// --------------------------------------------------------------
std::cout << "------------------------------------" << std::endl;
+//[ threadpool_component_example_use_priority
+
+ // a pool that uses priorities for tasks
typedef
tp::pool<
tp::fixed,
tp::unbounded_channel< tp::priority< int > >
> priority_threadpool_type;
-
+
+ // we limit the pool to one thread
priority_threadpool_type priority_pool(tp::max_poolsize(1));
+ // components:
typedef boost::signals::storage<void()> print_starter_type;
typedef async_component<priority_threadpool_type, printer> async_printer_type;
+ // this will start the execution
print_starter_type print_starter;
+ // this will print the text - we assign priorities so that the print
+ // order is "Hello","<>","World!","!":
async_printer_type print1(priority_pool, 3, "World");
async_printer_type print2(priority_pool, 2, "<>");
async_printer_type print3(priority_pool, 1, "Hello");
async_printer_type print4(priority_pool, 4, "!");
+ // our network:
print_starter
| print1
| print2
@@ -322,24 +368,29 @@
// submit the first task
submit(priority_pool, 1, print_starter);
-
+//]
// wait a little
boost::this_thread::sleep(boost::posix_time::seconds(1));
// --------------------------------------------------------------
std::cout << "------------------------------------" << std::endl;
+//[ threadpool_component_example_use_cyclic
+
+ // our components:
typedef boost::signals::storage<void()> tick_starter_type;
typedef async_component<threadpool_type, printer> ticker_type;
- // our components
+ // this will start the execution
tick_starter_type tick_starter;
+ // each component will print a line and wait a second before
+ // forwarding the signal
ticker_type
ticker1(pool, "tick 1...", 1000),
ticker2(pool, "tick 2...", 1000),
ticker3(pool, "tick 3...", 1000);
- // our network
+ // our network (note the ticker1, ticker2, ticker3 cycle)
tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
// this was equivalent to:
// connect(tick_starter, ticker1);
@@ -353,6 +404,10 @@
// wait a little
boost::this_thread::sleep(boost::posix_time::seconds(5));
+ // shutdown the pool (no more new tasks will be processed)
pool.shutdown();
+
+ //]
return 0;
}
+
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk