Boost logo

Boost Users :

Subject: [Boost-users] Using boost::asio::io_service with boost::packaged_task
From: Kelvin Chung (kelvSYC_at_[hidden])
Date: 2011-12-05 20:32:51


I'm trying to create a thread pool / task queue like system via
boost::asio::io_service and boost::packaged_task rather than attempt to
make 800+ threads for 800+ tasks that I have to make. (I'm not sure of
the various concurrency issues that I might have, so reducing thread
creation overhead is probably one of my big refactorings in my project.)

A cursory glance from various sources reveals that I could do something
like the following:

template <class Ret>
class TaskWrapper {
        // Wrapper around a boost::shared_ptr<boost::packaged_task<Ret>>, as
boost::asio::io_service::post()
        // requires copyable input
        boost::shared_ptr<boost::packaged_task<Ret>> task;
        
        boost::unique_future<Ret> future;
public:
        TaskWrapper(boost::shared_ptr<boost::packaged_task<Ret>>& task_) :
task_(task) {}
        
        void operator()() { (*task)(); }
        
        boost::unique_future<Ret> get_future() { return future; }
};

class TaskQueue {
        boost::asio::io_service io_service;
        boost::asio::io_service work;
        boost::thread_group threads;
        
        void run() { io_service.run(); }
public:
        TaskQueue(std::size_t size = boost::thread::hardware_concurrency()) :
work(io_service) {
                for (std::size_t i = 0; i < size; ++i) {
                        // Create threads in pool
                        threads.create_thread(boost::bind(&TaskQueue::run, this));
                }
        }
        
        ~TaskQueue() {
                io_service.stop();
                threads.join_all();
        }
        
        template<class Ret>
        boost::unique_future<Ret> schedule(TaskWrapper<Ret>& task) {
                io_service.post(boost::bind(&TaskWrapper<Ret>::operator(),
boost::ref(task)));
                return task.get_future();
        }
};

There are a few questions that I'd like to ask with this:
* Would you recommend this?
* To retrieve the results, I'd still have to extract all the futures
(from TaskQueue::schedule() returns) into a list of
boost::shared_futures, and then call boost::wait_for_all() on the list,
right?
* If a TaskWrapper itself posts other tasks, would a priority inversion
or resource starvation issue occur (I keep thinking that it will, for
some reason). How should I address this?


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net