Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r48653 - in sandbox/SOC/2007/signals: boost/dataflow/signals/component/detail libs/dataflow/example/threadpool libs/dataflow/example/threadpool/boost/tp libs/dataflow/example/threadpool/boost/tp/detail
From: stipe_at_[hidden]
Date: 2008-09-07 13:05:42


Author: srajko
Date: 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
New Revision: 48653
URL: http://svn.boost.org/trac/boost/changeset/48653

Log:
threadpool example works with priority
Added:
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/info.hpp (contents, props changed)
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/info.hpp (contents, props changed)
Text files modified:
   sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp | 12 ++
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp | 1
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp | 3
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp | 3
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp | 4
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp | 4
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp | 1
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp | 4
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp | 1
   sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp | 196 +++++++++++++++++++++++++++++++--------
   10 files changed, 186 insertions(+), 43 deletions(-)

Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -99,7 +99,17 @@
             unfused_inherited(T1 &t1, T2 &t2)
             : Function(t1, t2)
         { }
-
+
+ template<typename T1, typename T2, typename T3>
+ unfused_inherited(const T1 &t1, const T2 &t2, const T3 &t3)
+ : Function(t1, t2, t3)
+ { }
+
+ template<typename T1, typename T2, typename T3>
+ unfused_inherited(T1 &t1, const T2 &t2, const T3 &t3)
+ : Function(t1, t2, t3)
+ { }
+
         template<typename F>
         struct result
             : public Function::template result<F> {};

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -39,6 +39,7 @@
 public:
         typedef typename queue::item item;
         typedef typename queue::iterator iterator;
+ typedef queueing_policy queue_type;
 
 private:
         bool active_;

Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/info.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/info.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -0,0 +1,21 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_INFO_H
+#define BOOST_TP_DETAIL_INFO_H
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct has_priority
+{};
+
+struct has_no_priority
+{};
+
+} } }
+
+#endif // BOOST_TP_DETAIL_INFO_H
+

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -13,12 +13,15 @@
 #include <boost/function.hpp>
 #include <boost/future/future.hpp>
 
+#include <boost/tp/detail/info.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
 {
 struct fifo
 {
+ typedef detail::has_no_priority priority_tag_type;
+
         template< typename Callable >
         class impl
         {

Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/info.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/info.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -0,0 +1,35 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_INFO_H
+#define BOOST_TP_INFO_H
+
+#include <boost/mpl/bool.hpp>
+#include <boost/type_traits/is_same.hpp>
+
+#include <boost/tp/detail/info.hpp>
+
+namespace boost { namespace tp
+{
+template< typename Pool >
+struct has_priority
+:
+public boost::mpl::bool_<
+ boost::is_same<
+ detail::has_priority,
+ typename Pool::queue_type::priority_tag_type
+ >::value
+>
+{};
+
+template< typename Pool >
+struct priority_type
+{
+ typedef typename Pool::queue_type::attribute_type type;
+};
+
+} }
+
+#endif // BOOST_TP_INFO_H
+

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -13,12 +13,15 @@
 #include <boost/function.hpp>
 #include <boost/future/future.hpp>
 
+#include <boost/tp/detail/info.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
 {
 struct lifo
 {
+ typedef detail::has_no_priority priority_tag_type;
+
         template< typename Callable >
         class impl
         {

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -28,6 +28,10 @@
 struct pool
 : public Strategy::template impl< Channel >
 {
+ typedef Strategy strategy_type;
+ typedef Channel channel_type;
+ typedef typename channel_type::queue_type queue_type;
+
 #define BOOST_TP_CTOR(z, n, A) \
         template< BOOST_PP_ENUM_PARAMS(n, typename A) > \
         pool(BOOST_ENUM_TP_CTOR_ARGS(n, A)) \

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -15,6 +15,7 @@
 #include <boost/multi_index/mem_fun.hpp>
 #include <boost/multi_index/ordered_index.hpp>
 
+#include <boost/tp/detail/info.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
@@ -25,6 +26,9 @@
>
 struct priority
 {
+ typedef Attr attribute_type;
+ typedef detail::has_priority priority_tag_type;
+
         template< typename Callable >
         class impl
         {

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -300,6 +300,7 @@
 public:
         typedef typename producer_queue::item item;
         typedef typename producer_queue::iterator iterator;
+ typedef queueing_policy queue_type;
 
 private:
         bool active_;

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -14,6 +14,7 @@
 #include <boost/multi_index/mem_fun.hpp>
 #include <boost/multi_index/ordered_index.hpp>
 
+#include <boost/tp/detail/info.hpp>
 #include <boost/tp/detail/interrupter.hpp>
 
 namespace boost { namespace tp
@@ -26,6 +27,9 @@
>
 struct smart
 {
+ typedef Attr attribute_type;
+ typedef detail::has_priority priority_tag_type;
+
         template< typename Callable >
         class impl
         {

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -40,6 +40,7 @@
 public:
         typedef typename queue::item item;
         typedef typename queue::iterator iterator;
+ typedef queueing_policy queue_type;
 
 private:
         bool active_;

Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -22,6 +22,7 @@
 #include <boost/dataflow/utility/bind_mem_fn.hpp>
 #include <boost/dataflow/utility/bind_functor.hpp>
 
+#include <boost/tp/info.hpp>
 #include "boost/tp/fifo.hpp"
 #include "boost/tp/lazy.hpp"
 #include "boost/tp/pool.hpp"
@@ -38,7 +39,7 @@
 namespace detail {
 
     template<typename Threadpool, typename Component>
- class async_component_impl
+ class async_component_impl_base
     // filter_base is like filter, but it doesn't come with it's own signal.
     // instead, the derived class must provide a default_signal member function
     // that refers to the default signal. We will use this to return the
@@ -51,9 +52,40 @@
         typedef void result_type;
         
         template<typename T0>
- async_component_impl(Threadpool &threadpool, const T0 &t0)
+ async_component_impl_base(Threadpool &threadpool, const T0 &t0)
             : m_component(t0), m_threadpool(threadpool)
         {
+ init_m_component_function();
+ }
+
+ template<typename T0, typename T1>
+ async_component_impl_base(Threadpool &threadpool, const T0 &t0, const T1 &t1)
+ : m_component(t0, t1), m_threadpool(threadpool)
+ {
+ init_m_component_function();
+ }
+
+ // with the following, anything that connects to the async_component's default
+ // signal will actually connect to the default signal of the underlying
+ // component
+ typename Component::signal_type &default_signal()
+ {
+ namespace dataflow = boost::dataflow;
+ return dataflow::get_default_port<
+ dataflow::args::left,
+ dataflow::signals::connect_mechanism,
+ dataflow::signals::tag
+ > (m_component);
+ }
+
+ protected:
+ Component m_component;
+ Threadpool &m_threadpool;
+ boost::function<signature_type> m_component_function;
+
+ private:
+ void init_m_component_function()
+ {
             // record the appropriate operator() overload of Component into m_component_function,
             // so we can submit it as a task later
             typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;
@@ -61,6 +93,26 @@
             m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
                 (static_cast<mem_fn_type>(&Component::operator()), m_component);
         }
+ };
+
+ template<typename Threadpool, typename Component, typename Enable=void>
+ class async_component_impl;
+
+ template<typename Threadpool, typename Component>
+ class async_component_impl<Threadpool, Component,
+ typename boost::disable_if<tp::has_priority<Threadpool> >::type>
+ : public async_component_impl_base<Threadpool, Component>
+ {
+ public:
+ template<typename T0>
+ async_component_impl(Threadpool &threadpool, const T0 &t0)
+ : async_component_impl_base<Threadpool, Component>(threadpool, t0)
+ {}
+
+ template<typename T0, typename T1>
+ async_component_impl(Threadpool &threadpool, const T0 &t0, const T1 &t1)
+ : async_component_impl_base<Threadpool, Component>(threadpool, t0, t1)
+ {}
         
         template <class Seq>
         void operator()(const Seq &vec_par) const
@@ -74,32 +126,55 @@
             // submit the task (the first parameter to bind is the function,
             // and the rest are the bound function arguments).
             boost::tp::task< void > t(
- m_threadpool.submit(
+ async_component_impl::m_threadpool.submit(
                     fused_bind(
                         boost::fusion::join(
- boost::fusion::make_vector(m_component_function),
+ boost::fusion::make_vector(async_component_impl::m_component_function),
                             vec_par
                     ) )
                     ));
         }
- // with this, anything that connects to the async_component's default
- // signal will actually connect to the default signal of the underlying
- // component
- typename Component::signal_type &default_signal()
+
+ };
+
+ template<typename Threadpool, typename Component>
+ class async_component_impl<Threadpool, Component,
+ typename boost::enable_if<tp::has_priority<Threadpool> >::type>
+ : public async_component_impl_base<Threadpool, Component>
+ {
+ typedef typename tp::priority_type<Threadpool>::type priority_type;
+ public:
+ template<typename T0>
+ async_component_impl(Threadpool &threadpool, const priority_type &priority, const T0 &t0)
+ : async_component_impl_base<Threadpool, Component>(threadpool, t0)
+ , m_priority(priority)
+ {}
+
+ template <class Seq>
+ void operator()(const Seq &vec_par) const
         {
- namespace dataflow = boost::dataflow;
- return dataflow::get_default_port<
- dataflow::args::left,
- dataflow::signals::connect_mechanism,
- dataflow::signals::tag
- > (m_component);
+ // add the next function as a task in the pool
+ std::cout << "adding task" << std::endl;
+ // bind_functor is just a function object that calls bind
+ // we create a fused version so we can call it with a fusion sequence
+ boost::fusion::fused<boost::dataflow::utility::bind_functor> fused_bind;
+
+ // submit the task (the first parameter to bind is the function,
+ // and the rest are the bound function arguments).
+ boost::tp::task< void > t(
+ async_component_impl::m_threadpool.submit(
+ fused_bind(
+ boost::fusion::join(
+ boost::fusion::make_vector(async_component_impl::m_component_function),
+ vec_par
+ ) ),
+ m_priority
+ ));
         }
-
     private:
- Component m_component;
- Threadpool &m_threadpool;
- boost::function<signature_type> m_component_function;
+ priority_type m_priority;
     };
+
 }
 
 
@@ -119,8 +194,33 @@
     async_component(Threadpool &threadpool, const T0 &t0)
         : base_type(threadpool, t0)
     {}
+ template<typename T0, typename T1>
+ async_component(Threadpool &threadpool, const T0 &t0, const T1 &t1)
+ : base_type(threadpool, t0, t1)
+ {}
 };
 
+// a function to submit the first task
+template<typename Threadpool, typename Component>
+void submit(Threadpool &threadpool, Component &component)
+{
+ tp::task< void > task(
+ threadpool.submit(
+ boost::bind(&Component::send, boost::ref(component))));
+}
+
+// a function to submit the first task with a priority
+template<typename Threadpool, typename Component>
+void submit(
+ Threadpool &threadpool,
+ const typename tp::priority_type<Threadpool>::type priority,
+ Component &component)
+{
+ tp::task< void > task(
+ threadpool.submit(
+ boost::bind(&Component::send, boost::ref(component)), priority));
+}
+
 // just an operation to work with
 int inc_fn(int x)
 {
@@ -128,32 +228,24 @@
     return x+1;
 }
 
-// a component that displays text, waits a second, and then sends a signal
+// a component that displays text, waits an optional period, and then sends a signal
 class printer : public boost::signals::filter<printer, void()>
 {
 public:
- printer(const std::string &text)
- : m_text(text)
+ printer(const std::string &text, unsigned wait_milliseconds=0)
+ : m_text(text), m_wait_milliseconds(wait_milliseconds)
     {}
     void operator()()
     {
         std::cout << m_text << std::endl;
- boost::this_thread::sleep(boost::posix_time::seconds(1));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(m_wait_milliseconds));
         out();
     }
 private:
     std::string m_text;
+ unsigned m_wait_milliseconds;
 };
 
-// a function to submit the first task
-template<typename Threadpool, typename Next>
-void submit(Threadpool &threadpool, Next &next)
-{
- tp::task< void > task(
- threadpool.submit(
- boost::bind(&Next::send, boost::ref(next))));
-}
-
 int main( int argc, char *argv[])
 {
    typedef
@@ -178,11 +270,17 @@
     
     // our network
     // increase1 >>= increase2 >>= increase3 will be in its own thread
- // increase3 will be in its own thread
     // increase4 will be in its own thread
+ // increase5 will be in its own thread
     source
         | (increase1 >>= increase2 >>= increase3)
         | (increase4 >>= increase5);
+ // this was equivalent to:
+ // connect(source, increase1);
+ // connect(increase1, increase2);
+ // connect(increase2, increase3);
+ // connect(source, increase4);
+ // connect(increase4, increase5);
 
     // submit the first task
     submit(pool, source);
@@ -191,49 +289,63 @@
     boost::this_thread::sleep(boost::posix_time::seconds(1));
 
     // --------------------------------------------------------------
+ std::cout << "------------------------------------" << std::endl;
 
-/*
- typedef
+ typedef
         tp::pool<
             tp::fixed,
             tp::unbounded_channel< tp::priority< int > >
> priority_threadpool_type;
         
- priority_threadpool_type priority_pool(tp::max_poolsize(5));
+ priority_threadpool_type priority_pool(tp::max_poolsize(1));
     
     typedef boost::signals::storage<void()> print_starter_type;
     typedef async_component<priority_threadpool_type, printer> async_printer_type;
     
     print_starter_type print_starter;
- async_printer_type print1(priority_pool, "World");
- async_printer_type print2(priority_pool, " ");
- async_printer_type print3(priority_pool, "Hello");
- async_printer_type print4(priority_pool, "!");
+ async_printer_type print1(priority_pool, 3, "World");
+ async_printer_type print2(priority_pool, 2, "<>");
+ async_printer_type print3(priority_pool, 1, "Hello");
+ async_printer_type print4(priority_pool, 4, "!");
     
     print_starter
         | print1
         | print2
         | print3
         | print4;
+ // this was equivalent to:
+ // connect(print_starter, print1);
+ // connect(print_starter, print2);
+ // connect(print_starter, print3);
+ // connect(print_starter, print4);
+
         
     // submit the first task
- submit(priority_pool, print_starter);
+ submit(priority_pool, 1, print_starter);
 
     // wait a little
     boost::this_thread::sleep(boost::posix_time::seconds(1));
-*/
         
     // --------------------------------------------------------------
+ std::cout << "------------------------------------" << std::endl;
 
     typedef boost::signals::storage<void()> tick_starter_type;
     typedef async_component<threadpool_type, printer> ticker_type;
     
     // our components
     tick_starter_type tick_starter;
- ticker_type ticker1(pool, "tick 1..."), ticker2(pool, "tick 2..."), ticker3(pool, "tick 3...");
+ ticker_type
+ ticker1(pool, "tick 1...", 1000),
+ ticker2(pool, "tick 2...", 1000),
+ ticker3(pool, "tick 3...", 1000);
 
     // our network
     tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
+ // this was equivalent to:
+ // connect(tick_starter, ticker1);
+ // connect(ticker1, ticker2);
+ // connect(ticker2, ticker3);
+ // connect(ticker3, ticker1);
     
     // submit the first task
     submit(pool, tick_starter);


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk