Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55510 - sandbox/libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-10 15:28:10


Author: chenderson
Date: 2009-08-10 15:28:09 EDT (Mon, 10 Aug 2009)
New Revision: 55510
URL: http://svn.boost.org/trac/boost/changeset/55510

Log:
Multi-test addition
Text files modified:
   sandbox/libs/mapreduce/test/mrtest.cpp | 271 +++++++++++++++++++++------------------
   1 files changed, 149 insertions(+), 122 deletions(-)

Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-08-10 15:28:09 EDT (Mon, 10 Aug 2009)
@@ -12,9 +12,6 @@
 
 // configuration options
 #define WORD_COUNT_MEMORY_MAP_FILE
-#define USE_WORDCOUNT_COMBINER
-#define USE_IN_MEMORY_INTERMEDIATES
-//#define WRITE_OUTPUT_FILES
 
 #if defined(_DEBUG)
 # if 0
@@ -161,33 +158,7 @@
     }
 }
 
-
-class combiner;
-
-typedef map_task<map_value_type> map_task_type;
-
-typedef
-boost::mapreduce::job<
- wordcount::map_task_type
- , wordcount::reduce_task
-#ifdef USE_WORDCOUNT_COMBINER
- , wordcount::combiner
-#else
- , boost::mapreduce::null_combiner
-#endif
- , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
-#ifdef USE_IN_MEMORY_INTERMEDIATES
- , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type, wordcount::reduce_task>
-#else
- , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
-#endif
-#if defined(USE_IN_MEMORY_INTERMEDIATES) && defined(WRITE_OUTPUT_FILES)
- , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task>
-#endif
-> job;
-
-
-
+template<typename ReduceTask>
 class combiner
 {
   public:
@@ -198,19 +169,19 @@
         intermediate_store.combine(instance);
     }
 
- void start(job::reduce_task_type::key_type const &)
+ void start(typename ReduceTask::key_type const &)
     {
         total_ = 0;
     }
 
     template<typename IntermediateStore>
- void finish(job::reduce_task_type::key_type const &key, IntermediateStore &intermediate_store)
+ void finish(typename ReduceTask::key_type const &key, IntermediateStore &intermediate_store)
     {
         if (total_ > 0)
             intermediate_store.insert(key, total_);
     }
 
- void operator()(job::reduce_task_type::value_type const &value)
+ void operator()(typename ReduceTask::value_type const &value)
     {
         total_ += value;
     }
@@ -222,9 +193,122 @@
     unsigned total_;
 };
 
+typedef map_task<map_value_type> map_task_type;
+
 } // namespace wordcount
 
 
+template<typename Job>
+void run_test(boost::mapreduce::specification spec)
+{
+ boost::mapreduce::results result;
+
+ std::cout << "\n" << typeid(Job).name() << "\n";
+
+ try
+ {
+#ifdef RUN_SEQUENTIAL_MAP_REDUCE
+ std::cout << "\nRunning Sequential MapReduce...";
+
+ spec.map_tasks = 1;
+ spec.reduce_tasks = 1;
+
+ Job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::sequential<Job> >(result);
+ std::cout << "\nSequential MapReduce Finished.";
+#else
+ std::cout << "\nRunning CPU Parallel MapReduce...";
+
+ // this method can be called, but since we want access to the result data,
+ // we need to have a job object to interrogate
+ //boost::mapreduce::run<Job>(spec, result);
+
+ typename Job::datasource_type datasource(spec);
+ Job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::cpu_parallel<Job> >(result);
+ std::cout << "\nCPU Parallel MapReduce Finished.\n";
+#endif
+
+ std::cout << "\nMapReduce statistics:";
+ std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n Map:";
+ std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
+
+ std::cout << "\n\n Reduce:";
+ std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
+ if (result.reduce_times.size() > 0)
+ {
+ std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
+ }
+
+ typename Job::const_result_iterator it = job.begin_results();
+ typename Job::const_result_iterator ite = job.end_results();
+ if (it != ite)
+ {
+ typedef std::list<typename Job::keyvalue_t> frequencies_t;
+ frequencies_t frequencies;
+ frequencies.push_back(*it);
+ frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
+ for (++it; it!=ite; ++it)
+ {
+ if (frequencies.size() < 10) // show top 10
+ {
+ frequencies.push_back(*it);
+ if (it->second < it_smallest->second)
+ it_smallest = frequencies.rbegin();
+ }
+ else if (it->second > it_smallest->second)
+ {
+ *it_smallest = *it;
+ it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<typename Job::keyvalue_t>);
+ }
+ }
+
+ frequencies.sort(boost::mapreduce::detail::greater_2nd<typename Job::keyvalue_t>);
+ std::cout << "\n\nMapReduce results:";
+ for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
+ std::cout << "\n" << freq->first << "\t" << freq->second;
+ }
+ }
+ catch (std::exception &e)
+ {
+ std::cout << std::endl << "Error running MapReduce: " << e.what();
+ }
+}
+
+//typedef
+//boost::mapreduce::job<
+// wordcount::map_task_type
+// , wordcount::reduce_task
+//#ifdef USE_WORDCOUNT_COMBINER
+// , wordcount::combiner<wordcount::reduce_task>
+//#else
+// , boost::mapreduce::null_combiner
+//#endif
+// , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+//#ifdef USE_IN_MEMORY_INTERMEDIATES
+// , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type, wordcount::reduce_task>
+//#else
+// , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
+//#endif
+//#if defined(USE_IN_MEMORY_INTERMEDIATES) && defined(WRITE_OUTPUT_FILES)
+// , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task>
+//#endif
+//> job;
 
 int main(int argc, char **argv)
 {
@@ -241,27 +325,8 @@
     }
 
     boost::mapreduce::specification spec;
- boost::mapreduce::results result;
 
     spec.input_directory = argv[1];
- wordcount::job::datasource_type datasource(spec);
-
- std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
- std::cout << "\n" << typeid(wordcount::job).name() << "\n";
-
-#ifdef RUN_SEQUENTIAL_MAP_REDUCE
- std::cout << "\nRunning Sequential MapReduce...";
-
- spec.map_tasks = 1;
- spec.reduce_tasks = 1;
-
- wordcount::job job(datasource, spec);
- try
- {
- job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
- std::cout << "\nSequential MapReduce Finished.";
-#else
- std::cout << "\nRunning CPU Parallel MapReduce...";
 
     if (argc > 2)
         spec.map_tasks = atoi(argv[2]);
@@ -271,76 +336,38 @@
     else
         spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
 
- // this method can be called, but since we want access to the result data,
- // we need to have a job object to interrogate
- //boost::mapreduce::run<wordcount::job>(spec, result);
-
- wordcount::job job(datasource, spec);
- try
- {
- job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
- std::cout << "\nCPU Parallel MapReduce Finished.\n";
-#endif
- }
- catch (std::exception &e)
- {
- std::cout << std::endl << "Error running MapReduce: " << e.what();
- }
-
- std::cout << "\nMapReduce statistics:";
- std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
- std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
- std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
- std::cout << "\n\n Map:";
- std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
- std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
- std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
- std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
- std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
-
- std::cout << "\n\n Reduce:";
- std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
- std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
- std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
- std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
- std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
- if (result.reduce_times.size() > 0)
- {
- std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
- }
-
- wordcount::job::const_result_iterator it = job.begin_results();
- wordcount::job::const_result_iterator ite = job.end_results();
- if (it != ite)
- {
- typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
- frequencies_t frequencies;
- frequencies.push_back(*it);
- frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
- for (++it; it!=ite; ++it)
- {
- if (frequencies.size() < 10) // show top 10
- {
- frequencies.push_back(*it);
- if (it->second < it_smallest->second)
- it_smallest = frequencies.rbegin();
- }
- else if (it->second > it_smallest->second)
- {
- *it_smallest = *it;
- it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
- }
- }
-
- frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
- std::cout << "\n\nMapReduce results:";
- for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
- std::cout << "\n" << freq->first << "\t" << freq->second;
- }
+ std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
+ run_test<
+ boost::mapreduce::job<
+ wordcount::map_task_type
+ , wordcount::reduce_task>
+ >(spec);
+
+ run_test<
+ boost::mapreduce::job<
+ wordcount::map_task_type
+ , wordcount::reduce_task
+ , wordcount::combiner<wordcount::reduce_task> >
+ >(spec);
+
+ run_test<
+ boost::mapreduce::job<
+ wordcount::map_task_type
+ , wordcount::reduce_task
+ , wordcount::combiner<wordcount::reduce_task>
+ , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+ , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type, wordcount::reduce_task>
+ , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task> >
+ >(spec);
+
+ run_test<
+ boost::mapreduce::job<
+ wordcount::map_task_type
+ , wordcount::reduce_task
+ , wordcount::combiner<wordcount::reduce_task>
+ , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+ , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task> >
+ >(spec);
 
     return 0;
 }


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk