|
Boost Users : |
Subject: Re: [Boost-users] [mapreduce] Prim Calculator
From: Craig Henderson (cdm.henderson_at_[hidden])
Date: 2009-08-21 15:52:46
> -----Original Message-----
> From: boost-users-bounces_at_[hidden] [mailto:boost-users-
> bounces_at_[hidden]] On Behalf Of Christian Henning
> Sent: 21 August 2009 15:05
> To: boost-users_at_[hidden]
> Subject: Re: [Boost-users] [mapreduce] Prim Calculator
>
> Hi Craig, what you're suggesting is exactly what I do. For reasons I
> don't understand I'm getting the following assertion:
>
> Running Parallel Prime_Calculator MapReduce...Assertion failed:
> map_key != typename map_task_type::key_type(), file
> c:\chh\boost\boost\mapreduce\job.hpp, line 210
>
> I don't understand why that happens? Can you help me out here.
>
Hi Christian
The assertion is an incorrect error condition which meant that the key could
not be a default constructed valued (in the case of your numeric key, zero).
I have updated the sandbox - thanks for finding this. Get the latest version
of job.hpp & in_memory.hpp from the sandbox.
There are a couple other problems in your implementation, too.
* number_source constructor should initialize _current to be start, not
zero.
* using a global to hold prime_numbers isn't thread-safe, and adding locking
will introduce contention. The MapReduce library provides mechanisms to
avoid this. In reduce_task::reduce, replace the line
copy( it, ite, back_inserter( prime_numbers ));
with
for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0));
which will emit the final results.
To display the results:
for (prime_calculator::job::const_result_iterator
it=job.begin_results(); it!=job.end_results(); ++it)
std::cout << it->first << " ";
For performance, the datasource should return a range of integers so each
task tests many prime numbers rather than one at a time. This will reduce
lock contention in retrieving map keys.
Finally, 0 & 1 are not prime numbers - this is a bug in your is_prime
function.
Regards
-- Craig
#include <algorithm>
#include <cmath>
#include <numeric>
#include <boost/mapreduce.hpp>
namespace prime_calculator
{
std::size_t is_prime( std::size_t number ) {
long n = static_cast<long>( number );
if( number == 0 )
return 1;
n = std::abs( n );
std::size_t sqrt_number = static_cast< std::size_t >( std::sqrt(
static_cast< double >( n )));
for( std::size_t i = 2; i <= sqrt_number; i++ )
{
if( n % i == 0 )
return 0;
}
return 1;
}
template< typename MapTask >
class number_source : boost::noncopyable {
public:
number_source( std::size_t start
, std::size_t end
)
: _start( start )
, _end ( end )
, _current( start ) // CH
{}
const bool setup_key( typename MapTask::key_type& key ) const
{
if( _current < _end )
{
key = _current;
return true;
}
else
{
return false;
}
}
const bool get_data( typename MapTask::key_type& key
, typename MapTask::value_type& value
)
{
if( _current < _end )
{
value.first = _current;
value.second = _current;
_current++;
return true;
}
else
{
return false;
}
}
private:
std::size_t _start;
std::size_t _end;
std::size_t _current;
};
struct map_task : public boost::mapreduce::map_task< std::size_t
// MapKey
, std::pair< std::size_t
, std::size_t
>
// MapValue
> {
template<typename Runtime>
static void map( Runtime& runtime
, const std::size_t& /*key*/
, value_type& value
)
{
runtime.emit_intermediate( is_prime( value.first ), value.first );
}
};
//std::vector< std::size_t > prime_numbers; // CH
struct reduce_task : public boost::mapreduce::reduce_task< std::size_t
, unsigned
> {
template< typename Runtime
, typename It
>
static void reduce( Runtime& runtime
, const std::size_t& key
, It it
, const It ite
)
{
if( key > 0 )
{
// copy( it, ite, back_inserter( prime_numbers )); // CH
for_each(it, ite, boost::bind(&Runtime::emit, &runtime, _1, 0));
// CH
}
}
};
typedef
boost::mapreduce::job< prime_calculator::map_task
, prime_calculator::reduce_task
, boost::mapreduce::null_combiner
, prime_calculator::number_source<
prime_calculator::map_task >
> job;
} // namespace prime_calculator
int main(int argc, char* argv[])
{
boost::mapreduce::specification spec;
boost::mapreduce::results result;
prime_calculator::job::datasource_type datasource( 0, 1000 );
spec.map_tasks = 0;
spec.reduce_tasks = std::max( 1U, boost::thread::hardware_concurrency()
);
std::cout << "\nRunning Parallel Prime_Calculator MapReduce...";
prime_calculator::job job( datasource, spec );
job.run<
boost::mapreduce::schedule_policy::cpu_parallel<prime_calculator::job> >(
result );
std::cout << "\nMapReduce Finished.";
for (prime_calculator::job::const_result_iterator
it=job.begin_results(); it!=job.end_results(); ++it)
std::cout << it->first << " ";
return 0;
}
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net