Boost logo

Boost Users :

Subject: Re: [Boost-users] [mapreduce] Prim Calculator
From: Craig Henderson (cdm.henderson_at_[hidden])
Date: 2009-08-21 15:52:46


> -----Original Message-----
> From: boost-users-bounces_at_[hidden] [mailto:boost-users-
> bounces_at_[hidden]] On Behalf Of Christian Henning
> Sent: 21 August 2009 15:05
> To: boost-users_at_[hidden]
> Subject: Re: [Boost-users] [mapreduce] Prim Calculator
>
> Hi Craig, what you're suggesting is exactly what I do. For reasons I
> don't understand I'm getting the following assertion:
>
> Running Parallel Prime_Calculator MapReduce...Assertion failed:
> map_key != typename map_task_type::key_type(), file
> c:\chh\boost\boost\mapreduce\job.hpp, line 210
>
> I don't understand why that happens? Can you help me out here.
>

Hi Christian
The assertion is an incorrect error condition which meant that the key could
not be a default constructed valued (in the case of your numeric key, zero).
I have updated the sandbox - thanks for finding this. Get the latest version
of job.hpp & in_memory.hpp from the sandbox.

There are a couple other problems in your implementation, too.
* number_source constructor should initialize _current to be start, not
zero.
* using a global to hold prime_numbers isn't thread-safe, and adding locking
will introduce contention. The MapReduce library provides mechanisms to
avoid this. In reduce_task::reduce, replace the line
    copy( it, ite, back_inserter( prime_numbers ));
with
    for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0));
which will emit the final results.

To display the results:
    for (prime_calculator::job::const_result_iterator
it=job.begin_results(); it!=job.end_results(); ++it)
        std::cout << it->first << " ";

For performance, the datasource should return a range of integers so each
task tests many prime numbers rather than one at a time. This will reduce
lock contention in retrieving map keys.

Finally, 0 & 1 are not prime numbers - this is a bug in your is_prime
function.

Regards
-- Craig

#include <algorithm>
#include <cmath>
#include <numeric>

#include <boost/mapreduce.hpp>

namespace prime_calculator
{

std::size_t is_prime( std::size_t number ) {
    long n = static_cast<long>( number );

    if( number == 0 )
        return 1;

    n = std::abs( n );
    std::size_t sqrt_number = static_cast< std::size_t >( std::sqrt(
static_cast< double >( n )));

    for( std::size_t i = 2; i <= sqrt_number; i++ )
    {
        if( n % i == 0 )
            return 0;
    }

    return 1;
}

template< typename MapTask >
class number_source : boost::noncopyable {
public:

    number_source( std::size_t start
                 , std::size_t end
                 )
    : _start( start )
    , _end ( end )
    , _current( start ) // CH
    {}

    const bool setup_key( typename MapTask::key_type& key ) const
    {
        if( _current < _end )
        {
            key = _current;
            return true;
        }
        else
        {
            return false;
        }
    }

    const bool get_data( typename MapTask::key_type& key
                       , typename MapTask::value_type& value
                       )
    {
        if( _current < _end )
        {
            value.first = _current;
            value.second = _current;
            _current++;

            return true;
        }
        else
        {
            return false;
        }
    }

private:

    std::size_t _start;
    std::size_t _end;
    std::size_t _current;
};

struct map_task : public boost::mapreduce::map_task< std::size_t
     // MapKey
                                                   , std::pair< std::size_t
                                                              , std::size_t
>
     // MapValue
> {
    template<typename Runtime>
    static void map( Runtime& runtime
                   , const std::size_t& /*key*/
                   , value_type& value
                   )
    {
        runtime.emit_intermediate( is_prime( value.first ), value.first );
    }
};

//std::vector< std::size_t > prime_numbers; // CH

struct reduce_task : public boost::mapreduce::reduce_task< std::size_t
                                                         , unsigned
> {
    template< typename Runtime
            , typename It
>
    static void reduce( Runtime& runtime
                      , const std::size_t& key
                      , It it
                      , const It ite
                      )
    {
        if( key > 0 )
        {
// copy( it, ite, back_inserter( prime_numbers )); // CH
            for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0));
// CH
        }
    }
};

typedef
boost::mapreduce::job< prime_calculator::map_task
                     , prime_calculator::reduce_task
                     , boost::mapreduce::null_combiner
                     , prime_calculator::number_source<
prime_calculator::map_task >
> job;

} // namespace prime_calculator

int main(int argc, char* argv[])
{
    boost::mapreduce::specification spec;

    boost::mapreduce::results result;
    prime_calculator::job::datasource_type datasource( 0, 1000 );

    spec.map_tasks = 0;
    spec.reduce_tasks = std::max( 1U, boost::thread::hardware_concurrency()
);

    std::cout << "\nRunning Parallel Prime_Calculator MapReduce...";
    prime_calculator::job job( datasource, spec );
    job.run<
boost::mapreduce::schedule_policy::cpu_parallel<prime_calculator::job> >(
result );
    std::cout << "\nMapReduce Finished.";

    for (prime_calculator::job::const_result_iterator
it=job.begin_results(); it!=job.end_results(); ++it)
        std::cout << it->first << " ";

        return 0;
}


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net