Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55452 - sandbox/libs/mapreduce/examples/wordcount
From: cdm.henderson_at_[hidden]
Date: 2009-08-07 17:11:29


Author: chenderson
Date: 2009-08-07 17:11:28 EDT (Fri, 07 Aug 2009)
New Revision: 55452
URL: http://svn.boost.org/trac/boost/changeset/55452

Log:
Added output of Top 10 most popular words
Text files modified:
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp | 89 ++++++++++++++++++++++++++-------------
   1 files changed, 60 insertions(+), 29 deletions(-)

Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp (original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp 2009-08-07 17:11:28 EDT (Fri, 07 Aug 2009)
@@ -147,6 +147,7 @@
     std::cout << "\n" << typeid(wordcount::job).name() << "\n";
 
     boost::mapreduce::results result;
+ wordcount::job::datasource_type datasource(spec);
     try
     {
         if (argc > 2)
@@ -157,40 +158,70 @@
         else
             spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
 
- std::cout << "\nRunning CPU Parallel MapReduce...";
- boost::mapreduce::run<wordcount::job>(spec, result);
- std::cout << "\nCPU Parallel MapReduce Finished.";
+ std::cout << "\nRunning Parallel WordCount MapReduce...";
+ wordcount::job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
+ std::cout << "\nMapReduce Finished.";
+
+ std::cout << std::endl << "\nMapReduce statistics:";
+ std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n Map:";
+ std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+
+ std::cout << "\n\n Reduce:";
+ std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
+ if (result.reduce_times.size() > 0)
+ {
+ std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ }
+
+ wordcount::job::const_result_iterator it = job.begin_results();
+ wordcount::job::const_result_iterator ite = job.end_results();
+ if (it != ite)
+ {
+ typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
+ frequencies_t frequencies;
+ frequencies.push_back(*it);
+ frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
+ for (++it; it!=ite; ++it)
+ {
+ if (frequencies.size() < 10) // show top 10
+ {
+ frequencies.push_back(*it);
+ if (it->second < it_smallest->second)
+ it_smallest = frequencies.rbegin();
+ }
+ else if (it->second > it_smallest->second)
+ {
+ *it_smallest = *it;
+ it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
+ }
+ }
+
+ frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
+ std::cout << "\n\nMapReduce results:";
+ for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
+ std::cout << "\n" << freq->first << "\t" << freq->second;
+ }
     }
     catch (std::exception &e)
     {
         std::cout << std::endl << "Error: " << e.what();
     }
 
- std::cout << std::endl << "\nMapReduce statistics:";
- std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
- std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
- std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
- std::cout << "\n\n Map:";
- std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
- std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
- std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
- std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
- std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
-
- std::cout << "\n\n Reduce:";
- std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
- std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
- std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
- std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
- std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
- if (result.reduce_times.size() > 0)
- {
- std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
- }
-
     return 0;
 }


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk