Boost logo

Boost :

Subject: Re: [boost] Boost.Pipeline -- scheduling of segments
From: christophe.j.henry_at_[hidden]
Date: 2014-07-08 17:00:20


>Maybe I'm making it myself too easy but I'd see every pipeline stage as a
>scheduler, say, for Asynchronous a stealing threadpool scheduler(with one
>or more threads), every stage getting a job transforming input data and
>posting to the queue of the next scheduler a functor doing the next stage
>transformation, etc.
>Then I'd create a composite in one line of code to make sure work-stealing
>happens and that would be it for the infrastructure.

Purely for the fun, it took me a few minutes to write such pipeline, a
simple version using a thread for every stage, then one with work stealing.
There are a tons of stuff to improve, for example strings should be moved
but I hope you get the idea.
Now one "just" needs to write the syntactic sugar to have beautiful
pipelines.

Cheers,
Christophe

The simple version is:

#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>

#include <boost/asynchronous/scheduler/threadpool_scheduler.hpp>
#include <boost/asynchronous/queue/lockfree_queue.hpp>
#include <boost/asynchronous/scheduler_shared_proxy.hpp>
#include <boost/asynchronous/post.hpp>
#include <boost/algorithm/string/trim.hpp>

using namespace std;

struct pipeline
{
    void process(std::vector<std::string> const& input)
    {
        // create a scheduler for every stage, use only one thread
        auto scheduler1 =
boost::asynchronous::create_shared_scheduler_proxy(
                            new boost::asynchronous::threadpool_scheduler<
                                    boost::asynchronous::lockfree_queue<>
>(1));

        auto scheduler2 =
boost::asynchronous::create_shared_scheduler_proxy(
                            new boost::asynchronous::threadpool_scheduler<
                                    boost::asynchronous::lockfree_queue<>
>(1));

        auto scheduler3 =
boost::asynchronous::create_shared_scheduler_proxy(
                            new boost::asynchronous::threadpool_scheduler<
                                    boost::asynchronous::lockfree_queue<>
>(1));

        // job for first stage
        auto grep = [scheduler2,scheduler3](std::string const&
re,std::string const& item)
        {
            std::regex regex(re);
            if (std::regex_match(item, regex))
            {
                // job for second stage
                auto trim = [scheduler3](std::string const& item)
                {
                    std::string tc(boost::algorithm::trim_copy(item));
                    // 3rd stage job, cout
                    boost::asynchronous::post_future(scheduler3,
[tc](){std::cout << "->" << tc << endl;});
                };
                auto trim_ = std::bind(trim, std::move(item));
                boost::asynchronous::post_future(scheduler2, trim_);
            }
        };

        for(auto s : input)
        {
            auto grep_error = std::bind(grep, "Error.*", std::move(s));
            boost::asynchronous::post_future(scheduler1, grep_error);
        }
    }
};

int main()
{
    std::vector<std::string> input = {
        "Error: foobar",
        "Error. foo",
        " Warning: barbaz",
        "Notice: qux",
        "\tError: abc"
      };

    pipeline p;
    p.process(input);

    // we are going to shutdown, schedulers will all block until completely
done
    return 0;
}

And the stealing version is:

#include <iostream>
#include <string>
#include <vector>
#include <regex>
#include <functional>

#include
<boost/asynchronous/scheduler/stealing_multiqueue_threadpool_scheduler.hpp>
#include <boost/asynchronous/scheduler/composite_threadpool_scheduler.hpp>
#include <boost/asynchronous/queue/lockfree_queue.hpp>
#include <boost/asynchronous/scheduler_shared_proxy.hpp>
#include <boost/asynchronous/post.hpp>
#include <boost/algorithm/string/trim.hpp>

using namespace std;

struct pipeline
{
    pipeline()
    {
        auto scheduler1 =
boost::asynchronous::create_shared_scheduler_proxy(
                            new
boost::asynchronous::stealing_multiqueue_threadpool_scheduler<
                                    boost::asynchronous::lockfree_queue<>
>(1));

        auto scheduler2 =
boost::asynchronous::create_shared_scheduler_proxy(
                            new
boost::asynchronous::stealing_multiqueue_threadpool_scheduler<
                                    boost::asynchronous::lockfree_queue<>
>(1));

        auto scheduler3 =
boost::asynchronous::create_shared_scheduler_proxy(
                            new
boost::asynchronous::stealing_multiqueue_threadpool_scheduler<
                                    boost::asynchronous::lockfree_queue<>
>(1));

        // composite pool made of the previous pools
        // keeping it alive will ensure automatic work-stealing between
pools.
        m_composite = boost::asynchronous::create_shared_scheduler_proxy(
                            new
boost::asynchronous::composite_threadpool_scheduler<>
(scheduler1,scheduler2,scheduler3));
    }
    void process(std::vector<std::string> const& input)
    {
        auto grep = [this](std::string const& re,std::string const& item)
        {
            std::regex regex(re);
            if (std::regex_match(item, regex))
            {
                auto trim = [this](std::string const& item)
                {
                    std::string tc(boost::algorithm::trim_copy(item));
                    // post to third pool of composite
                    boost::asynchronous::post_future(m_composite,
[tc](){std::cout << "->" << tc << endl;},"",3);
                };
                auto trim_ = std::bind(trim, std::move(item));
                // post to second pool of composite
                boost::asynchronous::post_future(m_composite, trim_,"",2);
            }
        };

        for(auto s : input)
        {
            auto grep_error = std::bind(grep, "Error.*", std::move(s));
            // post to first pool of composite
            boost::asynchronous::post_future(m_composite, grep_error,"",1);
        }
    }

private:
    boost::asynchronous::any_shared_scheduler_proxy<> m_composite;
};

int main()
{
    std::vector<std::string> input = {
        "Error: foobar",
        "Error. foo",
        " Warning: barbaz",
        "Notice: qux",
        "\tError: abc"
      };

    pipeline p;
    p.process(input);

    // we are going to shutdown, schedulers will all block until completely
done
    return 0;
}


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk