Boost logo

Boost :

Subject: Re: [boost] Proposal: MapReduce library (single machine)
From: Craig Henderson (cdm.henderson_at_[hidden])
Date: 2009-06-17 15:34:30


> Joel Falcou wrote:
> how do you generate worklaod for each processor ?

The current implementation is rudimentary in that it creates n map tasks on
threads and each map task is fed data to process. It does not change the
thread affinity and does not use any knowledge of the data - size, locality
(to reduce disk access contention, for example). The code for the CPU
Parallel scheduler is below. Scheduling is a particular interest of mine,
and will be the basis of a lot of future work. This is just the start.

// Map Tasks
time_t start_time = time(NULL);
boost::thread_group map_threads;
unsigned const map_tasks = (spec.map_tasks==0)? num_cpus :
std::min(num_cpus, spec.map_tasks);
for (unsigned loop=0; loop<map_tasks; ++loop)
{
    boost::shared_ptr<results> this_result(new results);
    all_results.push_back(this_result);

    boost::thread *thread = new boost::thread(detail::run_next_map_task,
boost::ref(job), boost::ref(*this_result), boost::ref(m));
    map_threads.add_thread(thread);
}
map_threads.join_all();
result.map_runtime = time(NULL) - start_time;

// Reduce Tasks
start_time = time(NULL);
boost::thread_group reduce_threads;
unsigned const reduce_tasks = std::min<unsigned const>(num_cpus,
job.number_of_partitions());
unsigned partition = 0;
for (unsigned loop=0; loop<reduce_tasks; ++loop)
{
    boost::shared_ptr<results> this_result(new results);
    all_results.push_back(this_result);

    boost::thread *thread = new boost::thread(detail::run_next_reduce_task,
boost::ref(job), boost::ref(partition), boost::ref(*this_result),
boost::ref(m));
    reduce_threads.add_thread(thread);
}
reduce_threads.join_all();
result.reduce_runtime = time(NULL) - start_time;

-- Craig


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk