Boost logo

Boost :

Subject: Re: [boost] Boost.Pipeline -- scheduling of segments
From: Hartmut Kaiser (hartmut.kaiser_at_[hidden])
Date: 2014-07-09 12:21:41


> >Maybe I'm making it myself too easy but I'd see every pipeline stage as a
> >scheduler, say, for Asynchronous a stealing threadpool scheduler(with one
> >or more threads), every stage getting a job transforming input data and
> >posting to the queue of the next scheduler a functor doing the next stage
> >transformation, etc.
> >Then I'd create a composite in one line of code to make sure work-
> stealing
> >happens and that would be it for the infrastructure.
>
> Purely for the fun, it took me a few minutes to write such pipeline, a
> simple version using a thread for every stage, then one with work
> stealing.
> There are a tons of stuff to improve, for example strings should be moved
> but I hope you get the idea.
> Now one "just" needs to write the syntactic sugar to have beautiful
> pipelines.

Purely for fun as well and for the sake of completeness (as HPX was
mentioned here before), here is Christophe's code in HPX:

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <iostream>
#include <string>
#include <vector>
#include <regex>

#include <boost/algorithm/string/trim.hpp>

struct pipeline
{
    static void process(std::vector<std::string> const& input)
    {
        // job for first stage
        auto grep = [](std::string const& re, std::string const& item)
        {
            std::regex regex(re);
            if (std::regex_match(item, regex))
            {
                auto trim = [](std::string const& s)
                {
                    return boost::algorithm::trim_copy(s);
                };

                hpx::async(trim, std::move(item))
                    .then(hpx::util::unwrapped(
                        [](std::string const& tc)
                        {
                            std::cout << "->" << tc << std::endl;
                        }));
            }
        };

        std::vector<hpx::future<void> > tasks;
        for(auto s: input)
        {
            tasks.push_back(hpx::async(grep, "Error.*", std::move(s)));
        }

        wait_all(tasks);
    }
};

int main()
{
    std::vector<std::string> input = {
        "Error: foobar",
        "Error. foo",
        " Warning: barbaz",
        "Notice: qux",
        "\tError: abc"
      };

    pipeline::process(input);

    return 0;
}

Regards Hartmut
---------------
http://boost-spirit.com
http://stellar.cct.lsu.edu


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk