Boost logo

Boost Users :

Subject: Re: [Boost-users] [mapreduce] Prim Calculator
From: Christian Henning (chhenning_at_[hidden])
Date: 2009-08-20 17:46:01


Hi Craig, I did my best in translating the is_prime problem into a
map_reduce solution. The problem is simple: Find all prime numbers for
a given range of numbers, for instance, 0...1,000. Below is the source
code that I came up with. I cannot attach files from my work, so,
please excuse the bad formatting.

The map function takes a number and emits a key/value pair of (
size_t, number ). The size_t key basically states weather or not a the
number is prime. I don't quite understand how do the reduce function?
I can see that in your word count example you accumulate the same
words to calculate the count. What I want is to pushing back the prime
numbers into a vector that I can print out at the end of program.

#include <algorithm>
#include <cmath>
#include <numeric>

#include <boost/mapreduce.hpp>

namespace prime_calculator
{

std::size_t is_prime( std::size_t number )
{
    long n = static_cast<long>( number );

    if( number == 0 )
        return 1;

    n = std::abs( n );
    std::size_t sqrt_number = static_cast< std::size_t >( std::sqrt(
static_cast< double >( n )));

    for( std::size_t i = 2; i <= sqrt_number; i++ )
    {
        if( n % i == 0 )
            return 0;
    }

    return 1;
}

template< typename MapTask >
class number_source : boost::noncopyable
{
public:

    number_source( std::size_t start
                 , std::size_t end
                 )
    : _start( start )
    , _end ( end )
    , _current( 0 )
    {}

    const bool setup_key( typename MapTask::key_type& key ) const
    {
        if( _current < _end )
        {
            key = _current;
            return true;
        }
        else
        {
            return false;
        }
    }

    const bool get_data( typename MapTask::key_type& key
                       , typename MapTask::value_type& value
                       )
    {
        if( _current < _end )
        {
            value.first = _current;
            value.second = _current;
            _current++;

            return true;
        }
        else
        {
            return false;
        }
    }

private:

    std::size_t _start;
    std::size_t _end;
    std::size_t _current;
};

struct map_task : public boost::mapreduce::map_task< std::size_t
     // MapKey
                                                   , std::pair< std::size_t
                                                              , std::size_t
>
     // MapValue
>
{
    template<typename Runtime>
    static void map( Runtime& runtime
                   , const std::size_t& /*key*/
                   , value_type& value
                   )
    {
        runtime.emit_intermediate( is_prime( value.first ), value.first );
    }
};

std::vector< std::size_t > prime_numbers;

struct reduce_task : public boost::mapreduce::reduce_task< std::size_t
                                                         , unsigned
>
{
    template< typename Runtime
            , typename It
>
    static void reduce( Runtime& runtime
                      , const std::size_t& key
                      , It it
                      , const It ite
                      )
    {
        if( key > 0 )
        {
            copy( it, ite, back_inserter( prime_numbers ));
        }
    }
};

typedef
boost::mapreduce::job< prime_calculator::map_task
                     , prime_calculator::reduce_task
                     , boost::mapreduce::null_combiner
                     , prime_calculator::number_source<
prime_calculator::map_task >
> job;

} // namespace prime_calculator

int _tmain(int argc, _TCHAR* argv[])
{
    boost::mapreduce::specification spec;

    boost::mapreduce::results result;
    prime_calculator::job::datasource_type datasource( 0, 1000 );

    spec.map_tasks = 0;
    spec.reduce_tasks = std::max( 1U, boost::thread::hardware_concurrency() );

    std::cout << "\nRunning Parallel Prime_Calculator MapReduce...";
    prime_calculator::job job( datasource, spec );
    job.run< boost::mapreduce::schedule_policy::cpu_parallel<prime_calculator::job
> >( result );
    std::cout << "\nMapReduce Finished.";

        return 0;
}

Thanks,
Christian


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net