Boost logo

Boost :

From: Yuval Ronen (ronen_yuval_at_[hidden])
Date: 2007-05-06 12:31:29


Here are a few classes which I think are very useful:

1. A thread_pool class. I think there's not much I can tell you that you
don't know about it. It's something that has threads, and can accept
tasks to be pushed into a queue, and executed in turn when a thread is
available. It can also be suspended (stop handling tasks, but keep the
threads alive) and resumed.

2. Two 'dispatcher' classes, one for plain threads, and one for
thread_pools. They accept a single task, or several tasks, to be
executed, either by dedicated threads (the thread_dispatcher class), or
by a thread_pool (the thread_pool_dispatcher class). These classes
provide a wait() method that block until all tasks are done.

IMO, these classes fill a hole in the Boost.Thread library, and can be a
basic for the future<> template (I have to admit I didn't follow the
future<> discussion, so I hope there's nothing here to contradict any
consensus reached in that discussion, if any).

A few notes:

1. The semahpore class is there because it was needed for the
thread_pool_dispather, but I think it will be a good general purpose
addition to Boost.Thread. If I'm the only one thinking so, it can be
moved as implementation detail of thread_pool_dispather.

2. The thread_task typedef should really be removed. Instead, a 'task'
typedef should be added to boost::thread. So all occurances of
'thread_task' should be replaced with 'thread::task'.

3. I'm not sure about the name 'dispatcher', but couldn't find anything
I'm happy with. Something that catches the essence of the classes. Any
suggestions are welcome.

4. The thread_pool class can benefit from two additional features:

4.A. add_threads(size_t) and remove_threads(size_t). I think I can add
those if found useful.

4.B. pre_loop and post_loop hooks to be executed by each of the worker
threads in the thread_pool, immediately after creation, and before
termination. I actually have a use case for those, but I don't want to
distract the discussion from the main things.

5. The asserts in thread_pool::~thread_pool, thread_pool::suspend,
thread_pool::resume, thread_dispatcher::~thread_dispatcher,
thread_pool_dispatcher::~thread_pool_dispatcher are supposed to catch
user errors. These asserts are questionable. I'm not sure whether those
use cases are indeed errors, and if so, should assert be the right way
to deal with them. I chose to be a hard-liner for now, with an option to
soften.

6. The other asserts are there to catch errors in my code.

7. Some or maybe all of the asserts should be replaced by BOOST_ASSERT.
I'm not sure of the Boost convention here.

I hope it will be found useful!


#ifndef BOOST_THREAD_SEMAPHORE_HPP
#define BOOST_THREAD_SEMAPHORE_HPP

#include <cstdlib>
#include <cassert>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>

namespace boost {

class semaphore
{
public:
        semaphore(std::size_t count) : m_count(count), m_mutex(), m_condition() { }

        void lock()
        {
                mutex::scoped_lock sl(m_mutex);
                while (m_count == 0)
                        m_condition.wait(sl);
                assert(m_count > 0);
                m_count--;
        }

        void unlock()
        {
                mutex::scoped_lock sl(m_mutex);
                m_count++;
                m_condition.notify_one();
        }

private:
        std::size_t m_count;
        mutex m_mutex;
        condition m_condition;
};

} // namespace boost

#endif


#ifndef BOOST_THREAD_DISPATCHER_HPP
#define BOOST_THREAD_DISPATCHER_HPP

#include <cassert>
#include <set>
#include <algorithm>
#include <boost/noncopyable.hpp>
#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/thread_task.hpp>

namespace boost {

class thread_dispatcher : private noncopyable
{
public:
        thread_dispatcher(const thread_task &t) :
                m_threads()
        {
                create_thread(t);
        }

        template <typename FwdIter>
        thread_dispatcher(FwdIter task_begin, FwdIter task_end) :
                m_threads()
        {
                std::for_each(task_begin, task_end, bind(&thread_dispatcher::create_thread, this, _1));
        }

        ~thread_dispatcher()
        {
                assert(m_threads.empty());
        }

        void wait()
        {
                std::for_each(m_threads.begin(), m_threads.end(), bind(&thread::join, _1));
                m_threads.clear();
        }

private:
        std::set< shared_ptr<thread> > m_threads;

        void create_thread(const thread_task &t)
        {
                m_threads.insert(shared_ptr<thread>(new thread(t)));
        }
};

} // namespace boost

#endif


#ifndef BOOST_THREAD_POOL_HPP
#define BOOST_THREAD_POOL_HPP

#include <cstdlib>
#include <cassert>
#include <set>
#include <deque>
#include <algorithm>
#include <boost/noncopyable.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/thread_task.hpp>

namespace boost {

class thread_pool : private noncopyable
{
        enum state { active_, suspended_, terminating_ };

public:
        typedef function<void ()> task;

        thread_pool(std::size_t thread_count) :
                m_mutex(),
                m_condition(),
                m_threads(),
                m_tasks(),
                m_state(active_)
        {
                thread_task f = bind(&thread_pool::thread_func, this);
                while (thread_count > 0)
                {
                        m_threads.insert(shared_ptr<thread>(new thread(f)));
                        thread_count--;
                }
        }

        ~thread_pool()
        {
                {
                        mutex::scoped_lock sl(m_mutex);
                        assert(m_state == active_);
                        m_state = terminating_;
                        m_condition.notify_all();
                }
                std::for_each(m_threads.begin(), m_threads.end(), bind(&thread::join, _1));
        }

        bool suspended() const
        {
                mutex::scoped_lock sl(m_mutex);
                assert(m_state == active_ || m_state == suspended_);
                return m_state == suspended_;
        }

        void suspend()
        {
                mutex::scoped_lock sl(m_mutex);
                assert(m_state == active_);
                m_state = suspended_;
        }

        void resume()
        {
                mutex::scoped_lock sl(m_mutex);
                assert(m_state == suspended_);
                m_state = active_;
                m_condition.notify_all();
        }

        void enqueue(const task &t)
        {
                mutex::scoped_lock sl(m_mutex);
                assert(m_state == active_ || m_state == suspended_);
                m_tasks.push_back(t);
                m_condition.notify_one();
        }

        void discard_pending_tasks()
        {
                mutex::scoped_lock sl(m_mutex);
                assert(m_state == active_ || m_state == suspended_);
                m_tasks.clear();
        }

private:
        mutable mutex m_mutex;
        condition m_condition;
        std::set< shared_ptr<thread> > m_threads;
        std::deque< task > m_tasks;
        state m_state;

        void thread_func()
        {
                task t;
                while (t = get_task(), t)
                {
                        t();
                        t.clear();
                }
        }

        task get_task()
        {
                mutex::scoped_lock sl(m_mutex);
                while (m_state == suspended_ || m_state == active_ && m_tasks.empty())
                        m_condition.wait(sl);
                assert(m_state != suspended_);
                if (!m_tasks.empty())
                {
                        task t = m_tasks.front();
                        m_tasks.pop_front();
                        return t;
                }
                assert(m_state == terminating_ && m_tasks.empty());
                return task();
        }
};

} // namespace boost

#endif


#ifndef BOOST_THREAD_POOL_DISPATCHER_HPP
#define BOOST_THREAD_POOL_DISPATCHER_HPP

#include <cstdlib>
#include <cassert>
#include <algorithm>
#include <boost/noncopyable.hpp>
#include <boost/bind.hpp>
#include <boost/thread/semaphore.hpp>
#include <boost/thread/thread_pool.hpp>

namespace boost {

class thread_pool_dispatcher : private noncopyable
{
public:
        thread_pool_dispatcher(const thread_pool::task &t, thread_pool &pool) :
                m_task_count(0), m_sem(0)
        {
                enqueue(t, pool);
        }

        template <typename FwdIter>
        thread_pool_dispatcher(FwdIter task_begin, FwdIter task_end, thread_pool &pool) :
                m_task_count(0), m_sem(0)
        {
                std::for_each(task_begin, task_end, bind(&thread_pool_dispatcher::enqueue, this, _1, ref(pool)));
        }

        ~thread_pool_dispatcher()
        {
                assert(m_task_count == 0);
        }

        void wait()
        {
                while (m_task_count > 0)
                {
                        m_sem.lock();
                        m_task_count--;
                }
        }

private:
        std::size_t m_task_count;
        semaphore m_sem;

        void enqueue(const thread_pool::task &t, thread_pool &pool)
        {
                m_task_count++;
                pool.enqueue(bind(&thread_pool_dispatcher::execute_and_notify, this, t));
        }

        void execute_and_notify(const thread_pool::task &t)
        {
                t();
                m_sem.unlock();
        }
};

} // namespace boost

#endif


#ifndef BOOST_THREAD_TASK_HPP
#define BOOST_THREAD_TASK_HPP

#include <boost/function.hpp>

namespace boost {

typedef function<void ()> thread_task;

} // namespace boost

#endif


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk