Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r48636 - in sandbox/SOC/2007/signals: boost/dataflow/signals/component boost/dataflow/utility libs/dataflow/build/xcodeide/dataflow.xcodeproj libs/dataflow/example/threadpool
From: stipe_at_[hidden]
Date: 2008-09-06 16:05:08


Author: srajko
Date: 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
New Revision: 48636
URL: http://svn.boost.org/trac/boost/changeset/48636

Log:
new threadpool example
Added:
   sandbox/SOC/2007/signals/boost/dataflow/utility/bind_functor.hpp (contents, props changed)
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp (contents, props changed)
Text files modified:
   sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp | 4 ++--
   sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj | 4 ++++
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile | 1 +
   3 files changed, 7 insertions(+), 2 deletions(-)

Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/filter_base.hpp 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -11,7 +11,7 @@
 #include <boost/dataflow/support/port/port_adapter.hpp>
 
 #include <boost/signal.hpp>
-
+#include <boost/mpl/vector.hpp>
 
 namespace boost { namespace signals {
 
@@ -136,7 +136,7 @@
     };
 };
 
-template<typename Filter, typename Signals, typename InSignatures>
+template<typename Filter, typename Signals, typename InSignatures = mpl::vector<> >
 class filter_base : public dataflow::component<filter_component_traits<Filter, Signals, InSignatures> >
 {
 public:

Added: sandbox/SOC/2007/signals/boost/dataflow/utility/bind_functor.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/boost/dataflow/utility/bind_functor.hpp 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -0,0 +1,54 @@
+// Copyright 2008 Stjepan Rajko.
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_DATAFLOW__UTILITY__BIND_FUNCTOR_HPP
+#define BOOST_DATAFLOW__UTILITY__BIND_FUNCTOR_HPP
+
+
+#include <boost/bind.hpp>
+#include <boost/bind/placeholders.hpp>
+#include <boost/function.hpp>
+
+
+namespace boost { namespace dataflow {
+
+namespace utility {
+
+ struct bind_functor
+ {
+ typedef boost::function<void()> result_type;
+
+ template<typename T0>
+ result_type operator()(const T0 &t0)
+ {
+ return boost::bind(t0);
+ }
+
+ template<typename T0, typename T1>
+ result_type operator()(const T0 &t0, const T1 &t1)
+ {
+ return boost::bind(t0, t1);
+ }
+
+ template<typename T0, typename T1, typename T2>
+ result_type operator()(const T0 &t0, const T1 &t1, const T2 &t2)
+ {
+ return boost::bind(t0, t1, t2);
+ }
+
+ template<typename T0, typename T1, typename T2, typename T3>
+ result_type operator()(const T0 &t0, const T1 &t1, const T2 &t2, const T3 &t3)
+ {
+ return boost::bind(t0, t1, t2, t3);
+ }
+
+ //...
+ };
+
+} // namespace utility
+
+} } // namespace boost::dataflow
+
+#endif // BOOST_DATAFLOW__UTILITY__BIND_FUNCTOR_HPP

Modified: sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/build/xcodeide/dataflow.xcodeproj/project.pbxproj 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -143,6 +143,7 @@
                 089C098C0E0B11FF00397123 /* test_network.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_network.cpp; sourceTree = "<group>"; };
                 089CDA940D832AD200731C70 /* tutorial.qbk */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text; path = tutorial.qbk; sourceTree = "<group>"; };
                 089CDAA90D8333CC00731C70 /* unary_operation.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = unary_operation.hpp; sourceTree = "<group>"; };
+ 089DE1580E730C100038115E /* bind_functor.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = bind_functor.hpp; sourceTree = "<group>"; };
                 089E78C20E132E220008C0BB /* dynamic_multi_port.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = dynamic_multi_port.hpp; sourceTree = "<group>"; };
                 089F845A0E2CFAB900F6B668 /* lazy_sequence.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = lazy_sequence.hpp; sourceTree = "<group>"; };
                 089F845E0E2D032E00F6B668 /* test_lazy_sequence.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_lazy_sequence.cpp; sourceTree = "<group>"; };
@@ -200,6 +201,7 @@
                 08A364C10E129DC7001E6002 /* test_dynamic_port.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = test_dynamic_port.cpp; sourceTree = "<group>"; };
                 08A438900E295AE7009845FD /* component.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = component.cpp; sourceTree = "<group>"; };
                 08A439B80E295B8B009845FD /* Jamfile */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.jam; path = Jamfile; sourceTree = "<group>"; };
+ 08A6948E0E71D7100065ECFD /* threadpool_component_example.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = threadpool_component_example.cpp; sourceTree = "<group>"; };
                 08A6B2650E25A566005539F2 /* blueprint_bank.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = blueprint_bank.cpp; sourceTree = "<group>"; };
                 08A6B2660E25A566005539F2 /* blueprint_bank.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; path = blueprint_bank.hpp; sourceTree = "<group>"; };
                 08A6B2670E25A566005539F2 /* blueprint_component.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = blueprint_component.cpp; sourceTree = "<group>"; };
@@ -757,6 +759,7 @@
                                 08E228DF0E6207EC00D1C2AF /* boost */,
                                 08E228FE0E62084100D1C2AF /* Jamfile */,
                                 08E229020E6208AC00D1C2AF /* threadpool_example.cpp */,
+ 08A6948E0E71D7100065ECFD /* threadpool_component_example.cpp */,
                         );
                         path = threadpool;
                         sourceTree = "<group>";
@@ -1052,6 +1055,7 @@
                                 089AE6390D79D95C00AB9DA8 /* member_function_signature.hpp */,
                                 08C9D7DA0D83C5EB00354FF8 /* bind_mem_fn_overload.hpp */,
                                 08AD8AC40D84E3A9008A9764 /* has_call_operator.hpp */,
+ 089DE1580E730C100038115E /* bind_functor.hpp */,
                         );
                         path = utility;
                         sourceTree = "<group>";

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/Jamfile 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -13,3 +13,4 @@
     ;
 
 exe threadpool_example : threadpool_example.cpp ;
+exe threadpool_component_example : threadpool_component_example.cpp ;

Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp 2008-09-06 16:05:07 EDT (Sat, 06 Sep 2008)
@@ -0,0 +1,246 @@
+// Copyright 2008 Stjepan Rajko.
+// Distributed under the Boost Software License, Version 1.0. (See
+// accompanying file LICENSE_1_0.txt or copy at
+// http://www.boost.org/LICENSE_1_0.txt)
+
+#include <iostream>
+#include <cstdlib>
+#include <stdexcept>
+
+#include <boost/future/future.hpp>
+#include <boost/fusion/include/fused.hpp>
+#include <boost/fusion/include/join.hpp>
+#include <boost/fusion/include/make_vector.hpp>
+#define signalslib signals
+#define signals signals
+#include <boost/thread_safe_signal.hpp>
+//#include <boost/signals.hpp>
+
+#include <boost/dataflow/signals/component/storage.hpp>
+#include <boost/dataflow/signals/component/function.hpp>
+#include <boost/dataflow/signals/connection/operators.hpp>
+#include <boost/dataflow/utility/bind_mem_fn.hpp>
+#include <boost/dataflow/utility/bind_functor.hpp>
+
+#include "boost/tp/fifo.hpp"
+#include "boost/tp/lazy.hpp"
+#include "boost/tp/pool.hpp"
+#include "boost/tp/poolsize.hpp"
+#include "boost/tp/bounded_channel.hpp"
+#include "boost/tp/priority.hpp"
+
+
+namespace tp = boost::tp;
+
+template<typename Threadpool, typename Component>
+class async_component;
+
+namespace detail {
+
+ template<typename Threadpool, typename Component>
+ class async_component_impl
+ // filter_base is like filter, but it doesn't come with it's own signal.
+ // instead, the derived class must provide a default_signal member function
+ // that refers to the default signal. We will use this to return the
+ // default signal of the underlying component.
+ : public boost::signals::filter_base<async_component<Threadpool, Component>, typename Component::signal_type >
+ {
+ public:
+ typedef typename Component::signature_type signature_type;
+ typedef typename Component::signal_type signal_type;
+ typedef void result_type;
+
+ template<typename T0>
+ async_component_impl(Threadpool &threadpool, const T0 &t0)
+ : m_component(t0), m_threadpool(threadpool)
+ {
+ // record the appropriate operator() overload of Component into m_component_function,
+ // so we can submit it as a task later
+ typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;
+
+ m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
+ (static_cast<mem_fn_type>(&Component::operator()), m_component);
+ }
+
+ template <class Seq>
+ void operator()(const Seq &vec_par) const
+ {
+ // add the next function as a task in the pool
+ std::cout << "adding task" << std::endl;
+ // bind_functor is just a function object that calls bind
+ // we create a fused version so we can call it with a fusion sequence
+ boost::fusion::fused<boost::dataflow::utility::bind_functor> fused_bind;
+
+ // submit the task (the first parameter to bind is the function,
+ // and the rest are the bound function arguments).
+ boost::tp::task< void > t(
+ m_threadpool.submit(
+ fused_bind(
+ boost::fusion::join(
+ boost::fusion::make_vector(m_component_function),
+ vec_par
+ ) )
+ ));
+ }
+ // with this, anything that connects to the async_component's default
+ // signal will actually connect to the default signal of the underlying
+ // component
+ typename Component::signal_type &default_signal()
+ {
+ namespace dataflow = boost::dataflow;
+ return dataflow::get_default_port<
+ dataflow::args::left,
+ dataflow::signals::connect_mechanism,
+ dataflow::signals::tag
+ > (m_component);
+ }
+
+ private:
+ Component m_component;
+ Threadpool &m_threadpool;
+ boost::function<signature_type> m_component_function;
+ };
+}
+
+
+// our new async_component class - it will create a new task for the underlying
+// component when it's operator() is called.
+template<typename Threadpool, typename Component>
+class async_component : public boost::fusion::unfused_inherited<
+ detail::async_component_impl<Threadpool,Component>,
+ typename boost::function_types::parameter_types<typename Component::signature_type>::type >
+{
+ typedef boost::fusion::unfused_inherited<
+ detail::async_component_impl<Threadpool,Component>,
+ typename boost::function_types::parameter_types<typename Component::signature_type>::type>
+ base_type;
+public:
+ template<typename T0>
+ async_component(Threadpool &threadpool, const T0 &t0)
+ : base_type(threadpool, t0)
+ {}
+};
+
+// just an operation to work with
+int inc_fn(int x)
+{
+ std::cout << "filter: " << x+1 << std::endl;
+ return x+1;
+}
+
+// a component that displays text, waits a second, and then sends a signal
+class printer : public boost::signals::filter<printer, void()>
+{
+public:
+ printer(const std::string &text)
+ : m_text(text)
+ {}
+ void operator()()
+ {
+ std::cout << m_text << std::endl;
+ boost::this_thread::sleep(boost::posix_time::seconds(1));
+ out();
+ }
+private:
+ std::string m_text;
+};
+
+// a function to submit the first task
+template<typename Threadpool, typename Next>
+void submit(Threadpool &threadpool, Next &next)
+{
+ tp::task< void > task(
+ threadpool.submit(
+ boost::bind(&Next::send, boost::ref(next))));
+}
+
+int main( int argc, char *argv[])
+{
+ typedef
+ tp::pool<
+ tp::fixed,
+ tp::unbounded_channel< tp::fifo >
+ > threadpool_type;
+
+ threadpool_type pool(tp::max_poolsize(5));
+
+ typedef boost::signals::storage<void(int)> source_type;
+ typedef boost::signals::function<void(int), int(int)> filter_type;
+ typedef async_component<threadpool_type, filter_type> async_filter_type;
+
+ // our components
+ source_type source(1);
+ async_filter_type increase1(pool, inc_fn);
+ filter_type increase2(inc_fn);
+ filter_type increase3(inc_fn);
+ async_filter_type increase4(pool, inc_fn);
+ async_filter_type increase5(pool, inc_fn);
+
+ // our network
+ // increase1 >>= increase2 >>= increase3 will be in its own thread
+ // increase3 will be in its own thread
+ // increase4 will be in its own thread
+ source
+ | (increase1 >>= increase2 >>= increase3)
+ | (increase4 >>= increase5);
+
+ // submit the first task
+ submit(pool, source);
+
+ // wait a little
+ boost::this_thread::sleep(boost::posix_time::seconds(1));
+
+ // --------------------------------------------------------------
+
+/*
+ typedef
+ tp::pool<
+ tp::fixed,
+ tp::unbounded_channel< tp::priority< int > >
+ > priority_threadpool_type;
+
+ priority_threadpool_type priority_pool(tp::max_poolsize(5));
+
+ typedef boost::signals::storage<void()> print_starter_type;
+ typedef async_component<priority_threadpool_type, printer> async_printer_type;
+
+ print_starter_type print_starter;
+ async_printer_type print1(priority_pool, "World");
+ async_printer_type print2(priority_pool, " ");
+ async_printer_type print3(priority_pool, "Hello");
+ async_printer_type print4(priority_pool, "!");
+
+ print_starter
+ | print1
+ | print2
+ | print3
+ | print4;
+
+ // submit the first task
+ submit(priority_pool, print_starter);
+
+ // wait a little
+ boost::this_thread::sleep(boost::posix_time::seconds(1));
+*/
+
+ // --------------------------------------------------------------
+
+ typedef boost::signals::storage<void()> tick_starter_type;
+ typedef async_component<threadpool_type, printer> ticker_type;
+
+ // our components
+ tick_starter_type tick_starter;
+ ticker_type ticker1(pool, "tick 1..."), ticker2(pool, "tick 2..."), ticker3(pool, "tick 3...");
+
+ // our network
+ tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
+
+ // submit the first task
+ submit(pool, tick_starter);
+
+ // wait a little
+ boost::this_thread::sleep(boost::posix_time::seconds(5));
+
+ pool.shutdown();
+ return 0;
+}


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk