
AMDG gabe_rosser wrote:
Apologies for my ignorance but I would be extremely grateful if someone would kindly point me in the right direction.
I am trying to parallelize my Monte Carlo simulation. The bottleneck in the process is the generation of Normally distributed random numbers (using boost.random libraries), so I was hoping to substantially improve the efficiency of the process by setting 2 or 3 cores of my quad core processor to the the task of generating these numbers while one core gets on with the rest of the algorithm.
The simplest way I can envisage doing this is by allocating shared memory that is simultaneously written (by rand number generating threads) and read (by main thread), with some method of ensuring that the main thread doesn't read too far ahead (perhaps locking or just a simple while loop?).
Boost.thread alone doesn't seem to be enough - an attempt to do exactly this resulted in a substantial slow down (with or without locking - without may result in undefined behaviour but I tried it anyway to see if it was causing the slowdown). Do I need to investigate shared pointers? I really need some large container in shared memory which I can fill in any order, much as would be the case if I were appending to and reading from a file, but hopefully faster!
Any suggestions please? Sorry for the lack of understanding!
The synchronization is too fine-grained. Even without the overhead of locking, the cache is probably unhappy because of the false-sharing. The following is slightly faster for me using the multi threaded version. #include <boost/thread/mutex.hpp> #include <boost/thread/condition.hpp> #include <boost/spirit/home/phoenix/stl/container.hpp> #include <boost/spirit/home/phoenix/operator/logical.hpp> #include <boost/spirit/home/phoenix/core/reference.hpp> #include <boost/random.hpp> #include <boost/timer.hpp> #include <boost/noncopyable.hpp> #include <deque> template<class T> class queue_writer; template<class T> class simple_queue { public: simple_queue(T* buffer) : data(buffer), index(0), size_(0) {} void push_back(const T& arg) { data[size_++] = arg; } T& front() { return(data[index]); } void pop_front() { ++index; } bool empty() const { return(index == size_); } std::size_t size() const { return(size_ - index); } private: T* data; std::size_t index; std::size_t size_; }; template<class T> class queue_reader : boost::noncopyable { template<class T> friend class queue_writer; public: queue_reader() {} T pop() { if(local_data.empty()) { boost::unique_lock<boost::mutex> l(mutex); condition.wait(l, !boost::phoenix::empty(boost::phoenix::ref(shared_data))); std::swap(shared_data, local_data); } T result = local_data.front().front(); local_data.front().pop_front(); if(local_data.front().empty()) { local_data.pop_front(); } return(result); } private: // strong guarantee void push_impl(simple_queue<T>& arg) { if(!arg.empty()) { boost::unique_lock<boost::mutex> l(mutex); shared_data.push_back(arg); condition.notify_all(); } } boost::mutex mutex; boost::condition condition; std::deque<simple_queue<T> > shared_data; std::deque<simple_queue<T> > local_data; }; template<class T> class queue_writer { public: queue_writer(queue_reader<T>& q, std::size_t s) : data(new T[s]), index(0), impl(&q), block_size(s) {} void push(const T& arg) { data.push_back(arg); if(data.size() == block_size) { impl->push_impl(data); data = simple_queue<T>(new T[block_size]); } } ~queue_writer() { impl->push_impl(data); } private: simple_queue<T> data; std::size_t index; queue_reader<T>* impl; std::size_t block_size; }; static const int N = 100000000; void reader_thread(queue_reader<double>& q) { for(int i = 0; i < N; ++i) { q.pop(); } } void writer_thread(queue_reader<double>& qr, boost::mt19937& prng) { boost::variate_generator<boost::mt19937&, boost::exponential_distribution<> > gen(prng, boost::exponential_distribution<>()); queue_writer<double> q(qr, 1024); for(int i = 0; i < N / 2; ++i) { q.push(gen()); } } int main() { boost::timer timer; #if 0 boost::mt19937 prng; boost::variate_generator<boost::mt19937&, boost::exponential_distribution<> > gen(prng, boost::exponential_distribution<>()); for(int i = 0; i < N; ++i) { gen(); } #else boost::mt19937 prng1(57); boost::mt19937 prng2(38); queue_reader<double> queue; boost::thread t1(&writer_thread, boost::ref(queue), prng1); boost::thread t2(&writer_thread, boost::ref(queue), prng2); reader_thread(queue); t1.join(); t2.join(); #endif std::cout << timer.elapsed() << std::endl; } In Christ, Steven Watanabe