|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r48653 - in sandbox/SOC/2007/signals: boost/dataflow/signals/component/detail libs/dataflow/example/threadpool libs/dataflow/example/threadpool/boost/tp libs/dataflow/example/threadpool/boost/tp/detail
From: stipe_at_[hidden]
Date: 2008-09-07 13:05:42
Author: srajko
Date: 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
New Revision: 48653
URL: http://svn.boost.org/trac/boost/changeset/48653
Log:
threadpool example works with priority
Added:
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/info.hpp (contents, props changed)
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/info.hpp (contents, props changed)
Text files modified:
sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp | 12 ++
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp | 1
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp | 3
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp | 3
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp | 4
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp | 4
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp | 1
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp | 4
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp | 1
sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp | 196 +++++++++++++++++++++++++++++++--------
10 files changed, 186 insertions(+), 43 deletions(-)
Modified: sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp
==============================================================================
--- sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp (original)
+++ sandbox/SOC/2007/signals/boost/dataflow/signals/component/detail/unfused_inherited.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -99,7 +99,17 @@
unfused_inherited(T1 &t1, T2 &t2)
: Function(t1, t2)
{ }
-
+
+ template<typename T1, typename T2, typename T3>
+ unfused_inherited(const T1 &t1, const T2 &t2, const T3 &t3)
+ : Function(t1, t2, t3)
+ { }
+
+ template<typename T1, typename T2, typename T3>
+ unfused_inherited(T1 &t1, const T2 &t2, const T3 &t3)
+ : Function(t1, t2, t3)
+ { }
+
template<typename F>
struct result
: public Function::template result<F> {};
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/bounded_channel.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -39,6 +39,7 @@
public:
typedef typename queue::item item;
typedef typename queue::iterator iterator;
+ typedef queueing_policy queue_type;
private:
bool active_;
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/info.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/detail/info.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -0,0 +1,21 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_DETAIL_INFO_H
+#define BOOST_TP_DETAIL_INFO_H
+
+namespace boost { namespace tp {
+namespace detail
+{
+
+struct has_priority
+{};
+
+struct has_no_priority
+{};
+
+} } }
+
+#endif // BOOST_TP_DETAIL_INFO_H
+
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/fifo.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -13,12 +13,15 @@
#include <boost/function.hpp>
#include <boost/future/future.hpp>
+#include <boost/tp/detail/info.hpp>
#include <boost/tp/detail/interrupter.hpp>
namespace boost { namespace tp
{
struct fifo
{
+ typedef detail::has_no_priority priority_tag_type;
+
template< typename Callable >
class impl
{
Added: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/info.hpp
==============================================================================
--- (empty file)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/info.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -0,0 +1,35 @@
+// Copyright (c) 2008 Oliver Kowalke. Distributed under the Boost
+// Software License, Version 1.0. (See accompanying file
+// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+
+#ifndef BOOST_TP_INFO_H
+#define BOOST_TP_INFO_H
+
+#include <boost/mpl/bool.hpp>
+#include <boost/type_traits/is_same.hpp>
+
+#include <boost/tp/detail/info.hpp>
+
+namespace boost { namespace tp
+{
+template< typename Pool >
+struct has_priority
+:
+public boost::mpl::bool_<
+ boost::is_same<
+ detail::has_priority,
+ typename Pool::queue_type::priority_tag_type
+ >::value
+>
+{};
+
+template< typename Pool >
+struct priority_type
+{
+ typedef typename Pool::queue_type::attribute_type type;
+};
+
+} }
+
+#endif // BOOST_TP_INFO_H
+
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/lifo.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -13,12 +13,15 @@
#include <boost/function.hpp>
#include <boost/future/future.hpp>
+#include <boost/tp/detail/info.hpp>
#include <boost/tp/detail/interrupter.hpp>
namespace boost { namespace tp
{
struct lifo
{
+ typedef detail::has_no_priority priority_tag_type;
+
template< typename Callable >
class impl
{
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/pool.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -28,6 +28,10 @@
struct pool
: public Strategy::template impl< Channel >
{
+ typedef Strategy strategy_type;
+ typedef Channel channel_type;
+ typedef typename channel_type::queue_type queue_type;
+
#define BOOST_TP_CTOR(z, n, A) \
template< BOOST_PP_ENUM_PARAMS(n, typename A) > \
pool(BOOST_ENUM_TP_CTOR_ARGS(n, A)) \
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/priority.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -15,6 +15,7 @@
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/ordered_index.hpp>
+#include <boost/tp/detail/info.hpp>
#include <boost/tp/detail/interrupter.hpp>
namespace boost { namespace tp
@@ -25,6 +26,9 @@
>
struct priority
{
+ typedef Attr attribute_type;
+ typedef detail::has_priority priority_tag_type;
+
template< typename Callable >
class impl
{
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/rendezvous_channel.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -300,6 +300,7 @@
public:
typedef typename producer_queue::item item;
typedef typename producer_queue::iterator iterator;
+ typedef queueing_policy queue_type;
private:
bool active_;
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/smart.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -14,6 +14,7 @@
#include <boost/multi_index/mem_fun.hpp>
#include <boost/multi_index/ordered_index.hpp>
+#include <boost/tp/detail/info.hpp>
#include <boost/tp/detail/interrupter.hpp>
namespace boost { namespace tp
@@ -26,6 +27,9 @@
>
struct smart
{
+ typedef Attr attribute_type;
+ typedef detail::has_priority priority_tag_type;
+
template< typename Callable >
class impl
{
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/boost/tp/unbounded_channel.hpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -40,6 +40,7 @@
public:
typedef typename queue::item item;
typedef typename queue::iterator iterator;
+ typedef queueing_policy queue_type;
private:
bool active_;
Modified: sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp
==============================================================================
--- sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp (original)
+++ sandbox/SOC/2007/signals/libs/dataflow/example/threadpool/threadpool_component_example.cpp 2008-09-07 13:05:40 EDT (Sun, 07 Sep 2008)
@@ -22,6 +22,7 @@
#include <boost/dataflow/utility/bind_mem_fn.hpp>
#include <boost/dataflow/utility/bind_functor.hpp>
+#include <boost/tp/info.hpp>
#include "boost/tp/fifo.hpp"
#include "boost/tp/lazy.hpp"
#include "boost/tp/pool.hpp"
@@ -38,7 +39,7 @@
namespace detail {
template<typename Threadpool, typename Component>
- class async_component_impl
+ class async_component_impl_base
// filter_base is like filter, but it doesn't come with it's own signal.
// instead, the derived class must provide a default_signal member function
// that refers to the default signal. We will use this to return the
@@ -51,9 +52,40 @@
typedef void result_type;
template<typename T0>
- async_component_impl(Threadpool &threadpool, const T0 &t0)
+ async_component_impl_base(Threadpool &threadpool, const T0 &t0)
: m_component(t0), m_threadpool(threadpool)
{
+ init_m_component_function();
+ }
+
+ template<typename T0, typename T1>
+ async_component_impl_base(Threadpool &threadpool, const T0 &t0, const T1 &t1)
+ : m_component(t0, t1), m_threadpool(threadpool)
+ {
+ init_m_component_function();
+ }
+
+ // with the following, anything that connects to the async_component's default
+ // signal will actually connect to the default signal of the underlying
+ // component
+ typename Component::signal_type &default_signal()
+ {
+ namespace dataflow = boost::dataflow;
+ return dataflow::get_default_port<
+ dataflow::args::left,
+ dataflow::signals::connect_mechanism,
+ dataflow::signals::tag
+ > (m_component);
+ }
+
+ protected:
+ Component m_component;
+ Threadpool &m_threadpool;
+ boost::function<signature_type> m_component_function;
+
+ private:
+ void init_m_component_function()
+ {
// record the appropriate operator() overload of Component into m_component_function,
// so we can submit it as a task later
typedef typename boost::dataflow::utility::slot_type<signature_type, Component>::type mem_fn_type;
@@ -61,6 +93,26 @@
m_component_function = boost::dataflow::utility::bind_mem_fn<mem_fn_type, Component>
(static_cast<mem_fn_type>(&Component::operator()), m_component);
}
+ };
+
+ template<typename Threadpool, typename Component, typename Enable=void>
+ class async_component_impl;
+
+ template<typename Threadpool, typename Component>
+ class async_component_impl<Threadpool, Component,
+ typename boost::disable_if<tp::has_priority<Threadpool> >::type>
+ : public async_component_impl_base<Threadpool, Component>
+ {
+ public:
+ template<typename T0>
+ async_component_impl(Threadpool &threadpool, const T0 &t0)
+ : async_component_impl_base<Threadpool, Component>(threadpool, t0)
+ {}
+
+ template<typename T0, typename T1>
+ async_component_impl(Threadpool &threadpool, const T0 &t0, const T1 &t1)
+ : async_component_impl_base<Threadpool, Component>(threadpool, t0, t1)
+ {}
template <class Seq>
void operator()(const Seq &vec_par) const
@@ -74,32 +126,55 @@
// submit the task (the first parameter to bind is the function,
// and the rest are the bound function arguments).
boost::tp::task< void > t(
- m_threadpool.submit(
+ async_component_impl::m_threadpool.submit(
fused_bind(
boost::fusion::join(
- boost::fusion::make_vector(m_component_function),
+ boost::fusion::make_vector(async_component_impl::m_component_function),
vec_par
) )
));
}
- // with this, anything that connects to the async_component's default
- // signal will actually connect to the default signal of the underlying
- // component
- typename Component::signal_type &default_signal()
+
+ };
+
+ template<typename Threadpool, typename Component>
+ class async_component_impl<Threadpool, Component,
+ typename boost::enable_if<tp::has_priority<Threadpool> >::type>
+ : public async_component_impl_base<Threadpool, Component>
+ {
+ typedef typename tp::priority_type<Threadpool>::type priority_type;
+ public:
+ template<typename T0>
+ async_component_impl(Threadpool &threadpool, const priority_type &priority, const T0 &t0)
+ : async_component_impl_base<Threadpool, Component>(threadpool, t0)
+ , m_priority(priority)
+ {}
+
+ template <class Seq>
+ void operator()(const Seq &vec_par) const
{
- namespace dataflow = boost::dataflow;
- return dataflow::get_default_port<
- dataflow::args::left,
- dataflow::signals::connect_mechanism,
- dataflow::signals::tag
- > (m_component);
+ // add the next function as a task in the pool
+ std::cout << "adding task" << std::endl;
+ // bind_functor is just a function object that calls bind
+ // we create a fused version so we can call it with a fusion sequence
+ boost::fusion::fused<boost::dataflow::utility::bind_functor> fused_bind;
+
+ // submit the task (the first parameter to bind is the function,
+ // and the rest are the bound function arguments).
+ boost::tp::task< void > t(
+ async_component_impl::m_threadpool.submit(
+ fused_bind(
+ boost::fusion::join(
+ boost::fusion::make_vector(async_component_impl::m_component_function),
+ vec_par
+ ) ),
+ m_priority
+ ));
}
-
private:
- Component m_component;
- Threadpool &m_threadpool;
- boost::function<signature_type> m_component_function;
+ priority_type m_priority;
};
+
}
@@ -119,8 +194,33 @@
async_component(Threadpool &threadpool, const T0 &t0)
: base_type(threadpool, t0)
{}
+ template<typename T0, typename T1>
+ async_component(Threadpool &threadpool, const T0 &t0, const T1 &t1)
+ : base_type(threadpool, t0, t1)
+ {}
};
+// a function to submit the first task
+template<typename Threadpool, typename Component>
+void submit(Threadpool &threadpool, Component &component)
+{
+ tp::task< void > task(
+ threadpool.submit(
+ boost::bind(&Component::send, boost::ref(component))));
+}
+
+// a function to submit the first task with a priority
+template<typename Threadpool, typename Component>
+void submit(
+ Threadpool &threadpool,
+ const typename tp::priority_type<Threadpool>::type priority,
+ Component &component)
+{
+ tp::task< void > task(
+ threadpool.submit(
+ boost::bind(&Component::send, boost::ref(component)), priority));
+}
+
// just an operation to work with
int inc_fn(int x)
{
@@ -128,32 +228,24 @@
return x+1;
}
-// a component that displays text, waits a second, and then sends a signal
+// a component that displays text, waits an optional period, and then sends a signal
class printer : public boost::signals::filter<printer, void()>
{
public:
- printer(const std::string &text)
- : m_text(text)
+ printer(const std::string &text, unsigned wait_milliseconds=0)
+ : m_text(text), m_wait_milliseconds(wait_milliseconds)
{}
void operator()()
{
std::cout << m_text << std::endl;
- boost::this_thread::sleep(boost::posix_time::seconds(1));
+ boost::this_thread::sleep(boost::posix_time::milliseconds(m_wait_milliseconds));
out();
}
private:
std::string m_text;
+ unsigned m_wait_milliseconds;
};
-// a function to submit the first task
-template<typename Threadpool, typename Next>
-void submit(Threadpool &threadpool, Next &next)
-{
- tp::task< void > task(
- threadpool.submit(
- boost::bind(&Next::send, boost::ref(next))));
-}
-
int main( int argc, char *argv[])
{
typedef
@@ -178,11 +270,17 @@
// our network
// increase1 >>= increase2 >>= increase3 will be in its own thread
- // increase3 will be in its own thread
// increase4 will be in its own thread
+ // increase5 will be in its own thread
source
| (increase1 >>= increase2 >>= increase3)
| (increase4 >>= increase5);
+ // this was equivalent to:
+ // connect(source, increase1);
+ // connect(increase1, increase2);
+ // connect(increase2, increase3);
+ // connect(source, increase4);
+ // connect(increase4, increase5);
// submit the first task
submit(pool, source);
@@ -191,49 +289,63 @@
boost::this_thread::sleep(boost::posix_time::seconds(1));
// --------------------------------------------------------------
+ std::cout << "------------------------------------" << std::endl;
-/*
- typedef
+ typedef
tp::pool<
tp::fixed,
tp::unbounded_channel< tp::priority< int > >
> priority_threadpool_type;
- priority_threadpool_type priority_pool(tp::max_poolsize(5));
+ priority_threadpool_type priority_pool(tp::max_poolsize(1));
typedef boost::signals::storage<void()> print_starter_type;
typedef async_component<priority_threadpool_type, printer> async_printer_type;
print_starter_type print_starter;
- async_printer_type print1(priority_pool, "World");
- async_printer_type print2(priority_pool, " ");
- async_printer_type print3(priority_pool, "Hello");
- async_printer_type print4(priority_pool, "!");
+ async_printer_type print1(priority_pool, 3, "World");
+ async_printer_type print2(priority_pool, 2, "<>");
+ async_printer_type print3(priority_pool, 1, "Hello");
+ async_printer_type print4(priority_pool, 4, "!");
print_starter
| print1
| print2
| print3
| print4;
+ // this was equivalent to:
+ // connect(print_starter, print1);
+ // connect(print_starter, print2);
+ // connect(print_starter, print3);
+ // connect(print_starter, print4);
+
// submit the first task
- submit(priority_pool, print_starter);
+ submit(priority_pool, 1, print_starter);
// wait a little
boost::this_thread::sleep(boost::posix_time::seconds(1));
-*/
// --------------------------------------------------------------
+ std::cout << "------------------------------------" << std::endl;
typedef boost::signals::storage<void()> tick_starter_type;
typedef async_component<threadpool_type, printer> ticker_type;
// our components
tick_starter_type tick_starter;
- ticker_type ticker1(pool, "tick 1..."), ticker2(pool, "tick 2..."), ticker3(pool, "tick 3...");
+ ticker_type
+ ticker1(pool, "tick 1...", 1000),
+ ticker2(pool, "tick 2...", 1000),
+ ticker3(pool, "tick 3...", 1000);
// our network
tick_starter >>= ticker1 >>= ticker2 >>= ticker3 >>= ticker1;
+ // this was equivalent to:
+ // connect(tick_starter, ticker1);
+ // connect(ticker1, ticker2);
+ // connect(ticker2, ticker3);
+ // connect(ticker3, ticker1);
// submit the first task
submit(pool, tick_starter);
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk