Boost logo

Boost Users :

Subject: Re: [Boost-users] [Thread] (Newb) shared memory in multithread process
From: Steven Watanabe (watanabesj_at_[hidden])
Date: 2009-07-09 16:33:39


AMDG

gabe_rosser wrote:
> Apologies for my ignorance but I would be extremely grateful if someone
> would kindly point me in the right direction.
>
> I am trying to parallelize my Monte Carlo simulation. The bottleneck in the
> process is the generation of Normally distributed random numbers (using
> boost.random libraries), so I was hoping to substantially improve the
> efficiency of the process by setting 2 or 3 cores of my quad core processor
> to the the task of generating these numbers while one core gets on with the
> rest of the algorithm.
>
> The simplest way I can envisage doing this is by allocating shared memory
> that is simultaneously written (by rand number generating threads) and read
> (by main thread), with some method of ensuring that the main thread doesn't
> read too far ahead (perhaps locking or just a simple while loop?).
>
> Boost.thread alone doesn't seem to be enough - an attempt to do exactly this
> resulted in a substantial slow down (with or without locking - without may
> result in undefined behaviour but I tried it anyway to see if it was causing
> the slowdown). Do I need to investigate shared pointers? I really need
> some large container in shared memory which I can fill in any order, much as
> would be the case if I were appending to and reading from a file, but
> hopefully faster!
>
> Any suggestions please? Sorry for the lack of understanding!
>
>

The synchronization is too fine-grained. Even without the
overhead of locking, the cache is probably unhappy because
of the false-sharing.

The following is slightly faster for me using the multi threaded version.

#include <boost/thread/mutex.hpp>
#include <boost/thread/condition.hpp>
#include <boost/spirit/home/phoenix/stl/container.hpp>
#include <boost/spirit/home/phoenix/operator/logical.hpp>
#include <boost/spirit/home/phoenix/core/reference.hpp>
#include <boost/random.hpp>
#include <boost/timer.hpp>
#include <boost/noncopyable.hpp>
#include <deque>

template<class T>
class queue_writer;

template<class T>
class simple_queue {
public:
    simple_queue(T* buffer) : data(buffer), index(0), size_(0) {}
    void push_back(const T& arg) {
        data[size_++] = arg;
    }
    T& front() { return(data[index]); }
    void pop_front() { ++index; }
    bool empty() const { return(index == size_); }
    std::size_t size() const { return(size_ - index); }
private:
    T* data;
    std::size_t index;
    std::size_t size_;
};

template<class T>
class queue_reader : boost::noncopyable {
    template<class T>
    friend class queue_writer;
public:
    queue_reader() {}
    T pop() {
        if(local_data.empty()) {
            boost::unique_lock<boost::mutex> l(mutex);
            condition.wait(l,
                !boost::phoenix::empty(boost::phoenix::ref(shared_data)));
            std::swap(shared_data, local_data);
        }
        T result = local_data.front().front();
        local_data.front().pop_front();
        if(local_data.front().empty()) {
            local_data.pop_front();
        }
        return(result);
    }
private:
    // strong guarantee
    void push_impl(simple_queue<T>& arg) {
        if(!arg.empty()) {
            boost::unique_lock<boost::mutex> l(mutex);
            shared_data.push_back(arg);
            condition.notify_all();
        }
    }

    boost::mutex mutex;
    boost::condition condition;
    std::deque<simple_queue<T> > shared_data;

    std::deque<simple_queue<T> > local_data;
};

template<class T>
class queue_writer {
public:
    queue_writer(queue_reader<T>& q, std::size_t s)
        : data(new T[s]), index(0), impl(&q), block_size(s) {}
    void push(const T& arg) {
        data.push_back(arg);
        if(data.size() == block_size) {
            impl->push_impl(data);
            data = simple_queue<T>(new T[block_size]);
        }
    }
    ~queue_writer() {
        impl->push_impl(data);
    }
private:
    simple_queue<T> data;
    std::size_t index;
    queue_reader<T>* impl;
    std::size_t block_size;
};

static const int N = 100000000;

void reader_thread(queue_reader<double>& q) {
    for(int i = 0; i < N; ++i) {
        q.pop();
    }
}

void writer_thread(queue_reader<double>& qr, boost::mt19937& prng) {
    boost::variate_generator<boost::mt19937&,
        boost::exponential_distribution<> >
        gen(prng, boost::exponential_distribution<>());
    queue_writer<double> q(qr, 1024);

    for(int i = 0; i < N / 2; ++i) {
        q.push(gen());
    }
}

int main() {
    boost::timer timer;
#if 0
    boost::mt19937 prng;
    boost::variate_generator<boost::mt19937&,
        boost::exponential_distribution<> >
        gen(prng, boost::exponential_distribution<>());
    for(int i = 0; i < N; ++i) {
        gen();
    }
#else
    boost::mt19937 prng1(57);
    boost::mt19937 prng2(38);
    queue_reader<double> queue;
    boost::thread t1(&writer_thread, boost::ref(queue), prng1);
    boost::thread t2(&writer_thread, boost::ref(queue), prng2);
    reader_thread(queue);
    t1.join();
    t2.join();
#endif
    std::cout << timer.elapsed() << std::endl;
}

In Christ,
Steven Watanabe


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net