|
Boost : |
Subject: Re: [boost] Boost.Pipeline -- scheduling of segments
From: christophe.j.henry_at_[hidden]
Date: 2014-07-08 17:00:20
>Maybe I'm making it myself too easy but I'd see every pipeline stage as a
>scheduler, say, for Asynchronous a stealing threadpool scheduler(with one
>or more threads), every stage getting a job transforming input data and
>posting to the queue of the next scheduler a functor doing the next stage
>transformation, etc.
>Then I'd create a composite in one line of code to make sure work-stealing
>happens and that would be it for the infrastructure.
Purely for the fun, it took me a few minutes to write such pipeline, a
simple version using a thread for every stage, then one with work stealing.
There are a tons of stuff to improve, for example strings should be moved
but I hope you get the idea.
Now one "just" needs to write the syntactic sugar to have beautiful
pipelines.
Cheers,
Christophe
The simple version is:
#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>
#include <boost/asynchronous/scheduler/threadpool_scheduler.hpp>
#include <boost/asynchronous/queue/lockfree_queue.hpp>
#include <boost/asynchronous/scheduler_shared_proxy.hpp>
#include <boost/asynchronous/post.hpp>
#include <boost/algorithm/string/trim.hpp>
using namespace std;
struct pipeline
{
void process(std::vector<std::string> const& input)
{
// create a scheduler for every stage, use only one thread
auto scheduler1 =
boost::asynchronous::create_shared_scheduler_proxy(
new boost::asynchronous::threadpool_scheduler<
boost::asynchronous::lockfree_queue<>
>(1));
auto scheduler2 =
boost::asynchronous::create_shared_scheduler_proxy(
new boost::asynchronous::threadpool_scheduler<
boost::asynchronous::lockfree_queue<>
>(1));
auto scheduler3 =
boost::asynchronous::create_shared_scheduler_proxy(
new boost::asynchronous::threadpool_scheduler<
boost::asynchronous::lockfree_queue<>
>(1));
// job for first stage
auto grep = [scheduler2,scheduler3](std::string const&
re,std::string const& item)
{
std::regex regex(re);
if (std::regex_match(item, regex))
{
// job for second stage
auto trim = [scheduler3](std::string const& item)
{
std::string tc(boost::algorithm::trim_copy(item));
// 3rd stage job, cout
boost::asynchronous::post_future(scheduler3,
[tc](){std::cout << "->" << tc << endl;});
};
auto trim_ = std::bind(trim, std::move(item));
boost::asynchronous::post_future(scheduler2, trim_);
}
};
for(auto s : input)
{
auto grep_error = std::bind(grep, "Error.*", std::move(s));
boost::asynchronous::post_future(scheduler1, grep_error);
}
}
};
int main()
{
std::vector<std::string> input = {
"Error: foobar",
"Error. foo",
" Warning: barbaz",
"Notice: qux",
"\tError: abc"
};
pipeline p;
p.process(input);
// we are going to shutdown, schedulers will all block until completely
done
return 0;
}
And the stealing version is:
#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>
#include
<boost/asynchronous/scheduler/stealing_multiqueue_threadpool_scheduler.hpp>
#include <boost/asynchronous/scheduler/composite_threadpool_scheduler.hpp>
#include <boost/asynchronous/queue/lockfree_queue.hpp>
#include <boost/asynchronous/scheduler_shared_proxy.hpp>
#include <boost/asynchronous/post.hpp>
#include <boost/algorithm/string/trim.hpp>
using namespace std;
struct pipeline
{
pipeline()
{
auto scheduler1 =
boost::asynchronous::create_shared_scheduler_proxy(
new
boost::asynchronous::stealing_multiqueue_threadpool_scheduler<
boost::asynchronous::lockfree_queue<>
>(1));
auto scheduler2 =
boost::asynchronous::create_shared_scheduler_proxy(
new
boost::asynchronous::stealing_multiqueue_threadpool_scheduler<
boost::asynchronous::lockfree_queue<>
>(1));
auto scheduler3 =
boost::asynchronous::create_shared_scheduler_proxy(
new
boost::asynchronous::stealing_multiqueue_threadpool_scheduler<
boost::asynchronous::lockfree_queue<>
>(1));
// composite pool made of the previous pools
// keeping it alive will ensure automatic work-stealing between
pools.
m_composite = boost::asynchronous::create_shared_scheduler_proxy(
new
boost::asynchronous::composite_threadpool_scheduler<>
(scheduler1,scheduler2,scheduler3));
}
void process(std::vector<std::string> const& input)
{
auto grep = [this](std::string const& re,std::string const& item)
{
std::regex regex(re);
if (std::regex_match(item, regex))
{
auto trim = [this](std::string const& item)
{
std::string tc(boost::algorithm::trim_copy(item));
// post to third pool of composite
boost::asynchronous::post_future(m_composite,
[tc](){std::cout << "->" << tc << endl;},"",3);
};
auto trim_ = std::bind(trim, std::move(item));
// post to second pool of composite
boost::asynchronous::post_future(m_composite, trim_,"",2);
}
};
for(auto s : input)
{
auto grep_error = std::bind(grep, "Error.*", std::move(s));
// post to first pool of composite
boost::asynchronous::post_future(m_composite, grep_error,"",1);
}
}
private:
boost::asynchronous::any_shared_scheduler_proxy<> m_composite;
};
int main()
{
std::vector<std::string> input = {
"Error: foobar",
"Error. foo",
" Warning: barbaz",
"Notice: qux",
"\tError: abc"
};
pipeline p;
p.process(input);
// we are going to shutdown, schedulers will all block until completely
done
return 0;
}
Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk