Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55451 - in sandbox: boost boost/mapreduce boost/mapreduce/intermediates boost/mapreduce/schedule_policy libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-07 16:19:46


Author: chenderson
Date: 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
New Revision: 55451
URL: http://svn.boost.org/trac/boost/changeset/55451

Log:
Added iteration over results
Other minor changes and optimisations
Text files modified:
   sandbox/boost/mapreduce.hpp | 2
   sandbox/boost/mapreduce/datasource.hpp | 7 -
   sandbox/boost/mapreduce/intermediates.hpp | 2
   sandbox/boost/mapreduce/intermediates/in_memory.hpp | 134 ++++++++++++++++++++++++++++++++-
   sandbox/boost/mapreduce/intermediates/local_disk.hpp | 157 ++++++++++++++++++++++++++++++++++-----
   sandbox/boost/mapreduce/job.hpp | 53 ++++++++++--
   sandbox/boost/mapreduce/mergesort.hpp | 6 +
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp | 22 ++++-
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp | 47 ++++++-----
   sandbox/libs/mapreduce/test/mrtest.cpp | 122 ++++++++++++++++++++----------
   10 files changed, 439 insertions(+), 113 deletions(-)

Modified: sandbox/boost/mapreduce.hpp
==============================================================================
--- sandbox/boost/mapreduce.hpp (original)
+++ sandbox/boost/mapreduce.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -39,7 +39,7 @@
 
     specification()
       : map_tasks(0),
- reduce_tasks(10),
+ reduce_tasks(1),
         max_file_segment_size(1048576L), // default 1Gb
         output_filespec("mapreduce_")
     {

Modified: sandbox/boost/mapreduce/datasource.hpp
==============================================================================
--- sandbox/boost/mapreduce/datasource.hpp (original)
+++ sandbox/boost/mapreduce/datasource.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -126,16 +126,13 @@
     if (it->second->offset == it->second->size)
         data_->current_file.clear();
 
- ///!!! parameterise finding a word boundary
- char ch = std::toupper(*value.second);
- while ((ch == '\'' || (ch >= 'A' && ch <= 'Z')) && it->second->offset != it->second->size)
+ // break on a line boundary
+ while (*value.second != '\n' && *value.second != '\r' && it->second->offset != it->second->size)
     {
         ++value.second;
         ++it->second->offset;
- ch = std::toupper(*value.second);
     }
 
-//std::cout<<"\nget_data(): next offset will be " << it->second->offset;
     return true;
 }
 

Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp (original)
+++ sandbox/boost/mapreduce/intermediates.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -11,5 +11,5 @@
 //
  
 #include <boost/mapreduce/hash_partitioner.hpp>
-#include <boost/mapreduce/intermediates/local_disk.hpp>
 #include <boost/mapreduce/intermediates/in_memory.hpp>
+#include <boost/mapreduce/intermediates/local_disk.hpp>

Modified: sandbox/boost/mapreduce/intermediates/in_memory.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/in_memory.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/in_memory.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -13,6 +13,8 @@
 #ifndef BOOST_MAPREDUCE_IN_MEMORY_INTERMEDIATES_HPP
 #define BOOST_MAPREDUCE_IN_MEMORY_INTERMEDIATES_HPP
 
+#include <boost/iterator/iterator_facade.hpp>
+
 namespace boost {
 
 namespace mapreduce {
@@ -23,6 +25,9 @@
 class reduce_null_output
 {
   public:
+ reduce_null_output()
+ { }
+
     reduce_null_output(std::string const &/*output_filespec*/,
                        unsigned const /*partition*/,
                        unsigned const /*num_partitions*/)
@@ -54,21 +59,131 @@
     typedef MapTask map_task_type;
     typedef ReduceTask reduce_task_type;
     typedef reduce_null_output<MapTask, ReduceTask> store_result_type;
+ typedef
+ std::pair<
+ typename reduce_task_type::key_type,
+ typename reduce_task_type::value_type>
+ keyvalue_t;
+
+ class const_result_iterator
+ : public boost::iterator_facade<
+ const_result_iterator,
+ keyvalue_t const,
+ boost::forward_traversal_tag>
+ {
+ friend class boost::iterator_core_access;
+
+ protected:
+ explicit const_result_iterator(in_memory const *outer)
+ : outer_(outer)
+ {
+ BOOST_ASSERT(outer_);
+ iterators_.resize(outer_->num_partitions_);
+ }
+
+ void increment(void)
+ {
+ if (iterators_[index_] != outer_->intermediates_[index_].end())
+ ++iterators_[index_];
+ set_current();
+ }
+
+ bool const equal(const_result_iterator const &other) const
+ {
+ return value_ == other.value_;
+ }
+
+ const_result_iterator &begin(void)
+ {
+ for (unsigned loop=0; loop<outer_->num_partitions_; ++loop)
+ iterators_[loop] = outer_->intermediates_[loop].begin();
+ set_current();
+ return *this;
+ }
+
+ const_result_iterator &end(void)
+ {
+ index_ = 0;
+ value_ = keyvalue_t();
+ iterators_.clear();
+ return *this;
+ }
+
+ keyvalue_t const &dereference(void) const
+ {
+ return value_;
+ }
+
+ void set_current(void)
+ {
+ index_ = 0;
+ current_ = iterators_.begin();
+ for (; index_<outer_->num_partitions_ && iterators_[index_] == outer_->intermediates_[index_].end(); ++index_)
+ ++current_;
+
+ for (unsigned loop=index_+1; loop<outer_->num_partitions_; ++loop)
+ {
+ if (iterators_[loop] != outer_->intermediates_[loop].end() && **current_ > *iterators_[loop])
+ {
+ index_ = loop;
+ current_ = iterators_.begin()+loop;
+ }
+ }
+
+ if (index_ == outer_->num_partitions_)
+ end();
+ else
+ {
+ BOOST_ASSERT((*current_)->second.size() == 1);
+ value_ = std::make_pair((*current_)->first, *(*current_)->second.begin());
+ }
+ }
+
+ private:
+ typedef
+ std::vector<typename intermediates_t::value_type::const_iterator>
+ iterators_t;
+
+ in_memory const *outer_; // parent container
+ iterators_t iterators_; // iterator group
+ unsigned index_; // index of current element
+ keyvalue_t value_; // value of current element
+ typename iterators_t::const_iterator current_; // iterator of current element
+
+ friend class in_memory;
+ };
+ friend class const_result_iterator;
 
- in_memory(unsigned const num_partitions)
+ in_memory(unsigned const num_partitions=1)
       : num_partitions_(num_partitions)
     {
         intermediates_.resize(num_partitions_);
     }
 
+ const_result_iterator begin_results(void) const
+ {
+ return const_result_iterator(this).begin();
+ }
+
+ const_result_iterator end_results(void) const
+ {
+ return const_result_iterator(this).end();
+ }
+
+ void swap(in_memory &other)
+ {
+ std::swap(intermediates_, other.intermediates_);
+ }
+
     template<typename Callback>
- void reduce(unsigned const partition, Callback &callback, results &result)
+ void reduce(unsigned const partition, Callback &callback)
     {
- typename intermediates_t::value_type &map = intermediates_[partition];
+ typename intermediates_t::value_type map;
+ std::swap(map, intermediates_[partition]);
+
         for (typename intermediates_t::value_type::const_iterator it1=map.begin(); it1!=map.end(); ++it1)
         {
             callback(it1->first, it1->second.begin(), it1->second.end());
- ++result.counters.reduce_keys_executed;
         }
     }
 
@@ -99,11 +214,20 @@
         other.intermediates_.clear();
     }
 
+ template<typename StoreResult>
+ bool const insert(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value,
+ StoreResult &store_result)
+ {
+ store_result(key, value);
+ return insert(key, value);
+ }
 
     bool const insert(typename reduce_task_type::key_type const &key,
                       typename reduce_task_type::value_type const &value)
     {
- typename intermediates_t::value_type &map = intermediates_[partitioner_(key, num_partitions_)];
+ unsigned const partition = (num_partitions_ == 1)? 0 : partitioner_(key, num_partitions_);
+ typename intermediates_t::value_type &map = intermediates_[partition];
 
         map.insert(
             make_pair(

Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -141,6 +141,98 @@
     typedef MapTask map_task_type;
     typedef ReduceTask reduce_task_type;
     typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
+ typedef
+ std::pair<
+ typename reduce_task_type::key_type,
+ typename reduce_task_type::value_type>
+ keyvalue_t;
+
+ class const_result_iterator
+ : public boost::iterator_facade<
+ const_result_iterator,
+ keyvalue_t const,
+ boost::forward_traversal_tag>
+ {
+ friend class boost::iterator_core_access;
+
+ protected:
+ explicit const_result_iterator(local_disk const *outer)
+ : outer_(outer)
+ {
+ BOOST_ASSERT(outer_);
+ kvlist_.resize(outer_->num_partitions_);
+ }
+
+ void increment(void)
+ {
+ if (!kvlist_[index_].first->eof())
+ read_record(*kvlist_[index_].first, kvlist_[index_].second.first, kvlist_[index_].second.second);
+ set_current();
+ }
+
+ bool const equal(const_result_iterator const &other) const
+ {
+ return (kvlist_.size() == 0 && other.kvlist_.size() == 0)
+ || (kvlist_.size() > 0
+ && other.kvlist_.size() > 0
+ && kvlist_[index_].second == other.kvlist_[index_].second);
+ }
+
+ const_result_iterator &begin(void)
+ {
+ for (unsigned loop=0; loop<outer_->num_partitions_; ++loop)
+ {
+ kvlist_[loop] = std::make_pair(boost::shared_ptr<std::ifstream>(new std::ifstream), keyvalue_t());
+ kvlist_[loop].first->open(outer_->intermediate_files_.find(loop)->second.first.c_str());
+ BOOST_ASSERT(kvlist_[loop].first->is_open());
+ read_record(*kvlist_[loop].first, kvlist_[loop].second.first, kvlist_[loop].second.second);
+ }
+ set_current();
+ return *this;
+ }
+
+ const_result_iterator &end(void)
+ {
+ index_ = 0;
+ kvlist_.clear();
+ return *this;
+ }
+
+ keyvalue_t const &dereference(void) const
+ {
+ return kvlist_[index_].second;
+ }
+
+ void set_current(void)
+ {
+ index_ = 0;
+ while (index_<outer_->num_partitions_ && kvlist_[index_].first->eof())
+ ++index_;
+
+ for (unsigned loop=index_+1; loop<outer_->num_partitions_; ++loop)
+ {
+ if (!kvlist_[loop].first->eof() && !kvlist_[index_].first->eof() && kvlist_[index_].second > kvlist_[loop].second)
+ index_ = loop;
+ }
+
+ if (index_ == outer_->num_partitions_)
+ end();
+ }
+
+ private:
+ local_disk const *outer_; // parent container
+ unsigned index_; // index of current element
+ typedef
+ std::vector<
+ std::pair<
+ boost::shared_ptr<std::ifstream>,
+ keyvalue_t> >
+ kvlist_t;
+ kvlist_t kvlist_;
+
+ friend class local_disk;
+ };
+ friend class const_result_iterator;
 
     local_disk(unsigned const num_partitions)
       : num_partitions_(num_partitions)
@@ -163,6 +255,25 @@
         }
     }
 
+ const_result_iterator begin_results(void) const
+ {
+ return const_result_iterator(this).begin();
+ }
+
+ const_result_iterator end_results(void) const
+ {
+ return const_result_iterator(this).end();
+ }
+
+ template<typename StoreResult>
+ bool const insert(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value,
+ StoreResult &store_result)
+ {
+ store_result(key, value);
+ return insert(key, value);
+ }
+
     bool const insert(typename reduce_task_type::key_type const &key,
                       typename reduce_task_type::value_type const &value)
     {
@@ -206,25 +317,22 @@
             std::swap(infilename, outfilename);
 
             std::string key, last_key;
+ typename reduce_task_type::value_type value;
             std::ifstream infile(infilename.c_str());
- while (!infile.eof())
+ while (read_record(infile, key, value))
             {
- typename reduce_task_type::value_type value;
- if (read_record(infile, key, value))
+ if (key != last_key && key.length() > 0)
                 {
- if (key != last_key && key.length() > 0)
+ if (last_key.length() > 0)
+ fn_obj.finish(last_key, *this);
+ if (key.length() > 0)
                     {
- if (last_key.length() > 0)
- fn_obj.finish(last_key, *this);
- if (key.length() > 0)
- {
- fn_obj.start(key);
- std::swap(key, last_key);
- }
+ fn_obj.start(key);
+ std::swap(key, last_key);
                     }
-
- fn_obj(value);
                 }
+
+ fn_obj(value);
             }
 
             if (last_key.length() > 0)
@@ -275,18 +383,19 @@
     }
 
     template<typename Callback>
- void reduce(unsigned const partition, Callback &callback, results &result)
+ void reduce(unsigned const partition, Callback &callback)
     {
+ intermediates_t::iterator it = intermediate_files_.find(partition);
+ BOOST_ASSERT(it != intermediate_files_.end());
+
+ std::string filename;
+ std::swap(filename, it->second.first);
+ intermediate_files_.erase(it);
+
         typename reduce_task_type::key_type key;
         typename reduce_task_type::key_type last_key;
         typename reduce_task_type::value_type value;
         std::list<typename reduce_task_type::value_type> values;
-
- std::list<std::string> filenames;
- intermediates_t::const_iterator it = intermediate_files_.find(partition);
- BOOST_ASSERT(it != intermediate_files_.end());
-
- std::string const &filename = it->second.first;
         std::ifstream infile(filename.c_str());
         while (read_record(infile, key, value))
         {
@@ -294,7 +403,6 @@
             {
                 if (length(last_key) > 0)
                 {
- ++result.counters.reduce_keys_executed;
                     callback(last_key, values.begin(), values.end());
                     values.clear();
                 }
@@ -307,12 +415,13 @@
 
         if (length(last_key) > 0)
         {
- ++result.counters.reduce_keys_executed;
             callback(last_key, values.begin(), values.end());
         }
 
         infile.close();
         boost::filesystem::remove(filename.c_str());
+
+ intermediate_files_.find(partition)->second.second->close();
     }
 
   protected:
@@ -365,11 +474,15 @@
     void close_files(void)
     {
         for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
+ {
             if (it->second.second && it->second.second->is_open())
                 it->second.second->close();
+ }
     }
 
   private:
+ typedef enum { map_phase, reduce_phase } phase_t;
+
     unsigned const num_partitions_;
     intermediates_t intermediate_files_;
     PartitionFn partitioner_;

Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -49,7 +49,14 @@
     typedef Datasource datasource_type;
     typedef IntermediateStore intermediate_store_type;
     typedef Combiner combiner_type;
- typedef std::list<std::string> filenames_t;
+
+ typedef
+ typename intermediate_store_type::const_result_iterator
+ const_result_iterator;
+
+ typedef
+ typename intermediate_store_type::keyvalue_t
+ keyvalue_t;
 
   public:
     class map_task_runner : boost::noncopyable
@@ -93,27 +100,42 @@
     {
       public:
         reduce_task_runner(
- std::string const &output_filespec,
- unsigned const partition,
- unsigned const num_partitions)
- : store_result_(output_filespec, partition, num_partitions)
+ std::string const &output_filespec,
+ unsigned const &partition,
+ unsigned const num_partitions,
+ intermediate_store_type &intermediate_store,
+ results &result)
+ : partition_(partition),
+ result_(result),
+ intermediate_store_(intermediate_store),
+ store_result_(output_filespec, partition, num_partitions)
+ {
+ }
+
+ void reduce(void)
         {
+ intermediate_store_.reduce(partition_, *this);
         }
 
         void emit(typename reduce_task_type::key_type const &key,
                   typename reduce_task_type::value_type const &value)
         {
- store_result_(key, value);
+ intermediate_store_.insert(key, value, store_result_);
         }
 
         template<typename It>
         void operator()(typename reduce_task_type::key_type const &key, It it, It ite)
         {
+ ++result_.counters.reduce_keys_executed;
             reduce_task_type::reduce(*this, key, it, ite);
+ ++result_.counters.reduce_keys_completed;
         }
 
       private:
- StoreResult store_result_;
+ unsigned const &partition_;
+ results &result_;
+ intermediate_store_type &intermediate_store_;
+ StoreResult store_result_;
     };
 
     job(datasource_type &datasource, specification const &spec)
@@ -123,6 +145,16 @@
      {
      }
 
+ const_result_iterator begin_results(void) const
+ {
+ return intermediate_store_.begin_results();
+ }
+
+ const_result_iterator end_results(void) const
+ {
+ return intermediate_store_.end_results();
+ }
+
     bool const get_next_map_key(void *&key)
     {
         std::auto_ptr<typename map_task_type::key_type> next_key(new typename map_task_type::key_type);
@@ -218,9 +250,10 @@
             reduce_task_runner runner(
                 specification_.output_filespec,
                 partition,
- number_of_partitions());
-
- intermediate_store_.reduce(partition, runner, result);
+ number_of_partitions(),
+ intermediate_store_,
+ result);
+ runner.reduce();
         }
         catch (std::exception &e)
         {

Modified: sandbox/boost/mapreduce/mergesort.hpp
==============================================================================
--- sandbox/boost/mapreduce/mergesort.hpp (original)
+++ sandbox/boost/mapreduce/mergesort.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -42,6 +42,12 @@
     return first.second < second.second;
 }
 
+template<typename T>
+bool const greater_2nd(T const &first, T const &second)
+{
+ return first.second > second.second;
+}
+
 struct key_offset_compare
 {
   public:

Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -24,7 +24,7 @@
 namespace detail {
 
 template<typename Job>
-inline void run_next_map_task(Job &job, results &result, boost::mutex &m1, boost::mutex &m2)
+inline void run_next_map_task(Job &job, boost::mutex &m1, boost::mutex &m2, results &result)
 {
     try
     {
@@ -48,11 +48,22 @@
 }
 
 template<typename Job>
-inline void run_next_reduce_task(Job &job, unsigned &partition, results &result)
+inline void run_next_reduce_task(Job &job, unsigned &partition, boost::mutex &mutex, results &result)
 {
     try
     {
- job.run_reduce_task(partition, result);
+ while (1)
+ {
+ boost::mutex::scoped_lock l(mutex);
+ unsigned part = partition++;
+ if (part < job.number_of_partitions())
+ {
+ l.unlock();
+ job.run_reduce_task(part, result);
+ }
+ else
+ break;
+ }
     }
     catch (std::exception &e)
     {
@@ -90,9 +101,9 @@
                 new boost::thread(
                     detail::run_next_map_task<Job>,
                     boost::ref(job),
- boost::ref(*this_result),
                     boost::ref(m1),
- boost::ref(m2));
+ boost::ref(m2),
+ boost::ref(*this_result));
             map_threads.add_thread(thread);
         }
         map_threads.join_all();
@@ -116,6 +127,7 @@
                     detail::run_next_reduce_task<Job>,
                     boost::ref(job),
                     boost::ref(partition),
+ boost::ref(m1),
                     boost::ref(*this_result));
             reduce_threads.add_thread(thread);
         }

Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp (original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -152,6 +152,11 @@
         if (argc > 2)
             spec.map_tasks = atoi(argv[2]);
 
+ if (argc > 3)
+ spec.reduce_tasks = atoi(argv[3]);
+ else
+ spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
+
         std::cout << "\nRunning CPU Parallel MapReduce...";
         boost::mapreduce::run<wordcount::job>(spec, result);
         std::cout << "\nCPU Parallel MapReduce Finished.";
@@ -161,30 +166,30 @@
         std::cout << std::endl << "Error: " << e.what();
     }
 
- std::cout << std::endl << "\n" << "MapReduce statistics:";
- std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
- std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";
- std::cout << "\n " << " Reduce phase runtime : " << result.reduce_runtime << " seconds";
- std::cout << "\n\n " << "Map:";
- std::cout << "\n " << "Total Map keys : " << result.counters.map_keys_executed;
- std::cout << "\n " << "Map keys processed : " << result.counters.map_keys_completed;
- std::cout << "\n " << "Map key processing errors : " << result.counters.map_key_errors;
- std::cout << "\n " << "Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
- std::cout << "\n " << "Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n " << "Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n " << "Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << std::endl << "\nMapReduce statistics:";
+ std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n Map:";
+ std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
 
- std::cout << "\n\n " << "Reduce:";
- std::cout << "\n " << "Total Reduce keys : " << result.counters.reduce_keys_executed;
- std::cout << "\n " << "Reduce keys processed : " << result.counters.reduce_keys_completed;
- std::cout << "\n " << "Reduce key processing errors : " << result.counters.reduce_key_errors;
- std::cout << "\n " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
- std::cout << "\n " << "Number of Result Files : " << result.counters.num_result_files;
+ std::cout << "\n\n Reduce:";
+ std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
     if (result.reduce_times.size() > 0)
     {
- std::cout << "\n " << "Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n " << "Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n " << "Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
     }
 
     return 0;

Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-08-07 16:19:44 EDT (Fri, 07 Aug 2009)
@@ -14,13 +14,14 @@
 #define WORD_COUNT_MEMORY_MAP_FILE
 #define USE_WORDCOUNT_COMBINER
 #define USE_IN_MEMORY_INTERMEDIATES
+//#define WRITE_OUTPUT_FILES
 
 #if defined(_DEBUG)
 # if 0
 # define RUN_SEQUENTIAL_MAP_REDUCE
 # endif
 #else
-# define BOOST_DISABLE_ASSERTS
+//# define BOOST_DISABLE_ASSERTS
 #endif
  
 #if !defined(_DEBUG) && !defined(BOOST_DISABLE_ASSERTS)
@@ -180,7 +181,9 @@
 #else
   , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
 #endif
+#if defined(USE_IN_MEMORY_INTERMEDIATES) && defined(WRITE_OUTPUT_FILES)
   , boost::mapreduce::intermediates::reduce_file_output<wordcount::map_task_type, wordcount::reduce_task>
+#endif
> job;
 
 
@@ -230,7 +233,7 @@
     _CrtSetDbgFlag(_CrtSetDbgFlag(_CRTDBG_REPORT_FLAG) | _CRTDBG_LEAK_CHECK_DF);
 #endif
 
- std::cout << "MapReduce Wordcount Application";
+ std::cout << "MapReduce test program";
     if (argc < 2)
     {
         std::cerr << "Usage: wordcount directory [num_map_tasks]\n";
@@ -246,64 +249,97 @@
     std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
     std::cout << "\n" << typeid(wordcount::job).name() << "\n";
 
- try
- {
 #ifdef RUN_SEQUENTIAL_MAP_REDUCE
- std::cout << "\nRunning Sequential MapReduce...";
+ std::cout << "\nRunning Sequential MapReduce...";
 
- spec.map_tasks = 1;
- spec.reduce_tasks = 1;
+ spec.map_tasks = 1;
+ spec.reduce_tasks = 1;
 
- wordcount::job job(datasource, spec);
+ wordcount::job job(datasource, spec);
+ try
+ {
         job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
- std::cout << "\nFinished.";
+ std::cout << "\nSequential MapReduce Finished.";
 #else
- std::cout << "\nRunning CPU Parallel MapReduce...";
- spec.reduce_tasks = 1;
+ std::cout << "\nRunning CPU Parallel MapReduce...";
 
- if (argc > 2)
- spec.map_tasks = atoi(argv[2]);
+ if (argc > 2)
+ spec.map_tasks = atoi(argv[2]);
 
- // this method can be called, but since we want access to the result data,
- // we need to have a job object to interrogate
- //boost::mapreduce::run<wordcount::job>(spec, result);
+ if (argc > 3)
+ spec.reduce_tasks = atoi(argv[3]);
+ else
+ spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
 
- wordcount::job job(datasource, spec);
+ // this method can be called, but since we want access to the result data,
+ // we need to have a job object to interrogate
+ //boost::mapreduce::run<wordcount::job>(spec, result);
+
+ wordcount::job job(datasource, spec);
+ try
+ {
         job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
- std::cout << "\nCPU Parallel MapReduce Finished.";
+ std::cout << "\nCPU Parallel MapReduce Finished.\n";
 #endif
     }
     catch (std::exception &e)
     {
- std::cout << std::endl << "Error: " << e.what();
+ std::cout << std::endl << "Error running MapReduce: " << e.what();
     }
 
- typedef std::pair<wordcount::reduce_task::key_type, wordcount::reduce_task::value_type> keyvalue_t;
-
- std::cout << std::endl << "\n" << "MapReduce statistics:";
- std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
- std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";
- std::cout << "\n " << " Reduce phase runtime : " << result.reduce_runtime << " seconds";
- std::cout << "\n\n " << "Map:";
- std::cout << "\n " << "Total Map keys : " << result.counters.map_keys_executed;
- std::cout << "\n " << "Map keys processed : " << result.counters.map_keys_completed;
- std::cout << "\n " << "Map key processing errors : " << result.counters.map_key_errors;
- std::cout << "\n " << "Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
- std::cout << "\n " << "Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n " << "Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n " << "Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
-
- std::cout << "\n\n " << "Reduce:";
- std::cout << "\n " << "Total Reduce keys : " << result.counters.reduce_keys_executed;
- std::cout << "\n " << "Reduce keys processed : " << result.counters.reduce_keys_completed;
- std::cout << "\n " << "Reduce key processing errors : " << result.counters.reduce_key_errors;
- std::cout << "\n " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
- std::cout << "\n " << "Number of Result Files : " << result.counters.num_result_files;
+ std::cout << "\nMapReduce statistics:";
+ std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n Map:";
+ std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+
+ std::cout << "\n\n Reduce:";
+ std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
     if (result.reduce_times.size() > 0)
     {
- std::cout << "\n " << "Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n " << "Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n " << "Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ }
+
+ wordcount::job::const_result_iterator it = job.begin_results();
+ wordcount::job::const_result_iterator ite = job.end_results();
+ if (it != ite)
+ {
+ typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
+ frequencies_t frequencies;
+ frequencies.push_back(*it);
+ frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
+ for (++it; it!=ite; ++it)
+ {
+ if (frequencies.size() < 10) // show top 10
+ {
+ frequencies.push_back(*it);
+ if (it->second < it_smallest->second)
+ it_smallest = frequencies.rbegin();
+ }
+ else if (it->second > it_smallest->second)
+ {
+ *it_smallest = *it;
+ it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
+ }
+ }
+
+ frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
+ std::cout << "\n\nMapReduce results:";
+ for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
+ std::cout << "\n" << freq->first << "\t" << freq->second;
     }
 
     return 0;


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk