Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55175 - in sandbox: boost boost/mapreduce boost/mapreduce/schedule_policy libs/mapreduce libs/mapreduce/doc libs/mapreduce/examples libs/mapreduce/examples/wordcount
From: cdm.henderson_at_[hidden]
Date: 2009-07-30 14:42:26


Author: chenderson
Date: 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
New Revision: 55175
URL: http://svn.boost.org/trac/boost/changeset/55175

Log:
Added parametrised file_handler on the datasource.
Added memory mapped file support as an alternative to to std::ifstream
Added examples directory with wordcount example
Removed test directory
Code clean-up
Added:
   sandbox/libs/mapreduce/examples/
   sandbox/libs/mapreduce/examples/wordcount/
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp (contents, props changed)
   sandbox/libs/mapreduce/examples/wordcount/wordcount.vcproj (contents, props changed)
Text files modified:
   sandbox/boost/mapreduce.hpp | 57 +++++-
   sandbox/boost/mapreduce/datasource.hpp | 180 +++++++++++++++++++--
   sandbox/boost/mapreduce/job.hpp | 174 ++++++++++++--------
   sandbox/boost/mapreduce/mergesort.hpp | 34 +++
   sandbox/boost/mapreduce/platform.hpp | 2
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp | 23 +-
   sandbox/boost/mapreduce/schedule_policy/sequential.hpp | 4
   sandbox/libs/mapreduce/doc/tutorial.html | 65 +++----
   sandbox/libs/mapreduce/doc/wordcount.html | 334 ++++++++++++++++++++++-----------------
   sandbox/libs/mapreduce/mapreduce.sln | 9
   sandbox/libs/mapreduce/mapreduce.vcproj | 4
   11 files changed, 588 insertions(+), 298 deletions(-)

Modified: sandbox/boost/mapreduce.hpp
==============================================================================
--- sandbox/boost/mapreduce.hpp (original)
+++ sandbox/boost/mapreduce.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -19,20 +19,29 @@
 # endif
 #endif
 
+#include <string>
 #include <vector>
+#include <boost/config.hpp>
+#include <boost/noncopyable.hpp>
+#include <boost/cstdint.hpp>
 
 namespace boost {
 
 namespace mapreduce {
 
-struct specification
+struct specification : boost::noncopyable
 {
- unsigned map_tasks;
- unsigned reduce_tasks;
+ unsigned map_tasks; // ideal number of map tasks to use
+ unsigned reduce_tasks; // ideal number of reduce tasks to use
+ boost::uintmax_t max_file_segment_size; // ideal maximum number of bytes in each input file segment
+ std::string output_filespec; // filespec of the output files - can contain a directory path if required
+ std::string input_directory; // directory path to scan for input files
 
     specification()
- : map_tasks(0), // ideal number of map tasks to use
- reduce_tasks(10) // ideal number of reduce tasks to use
+ : map_tasks(0),
+ reduce_tasks(10),
+ max_file_segment_size(1048576L), // default 1Gb
+ output_filespec("mapreduce_")
     {
     }
 };
@@ -43,22 +52,40 @@
     {
         unsigned actual_map_tasks; // number of map tasks actually used
         unsigned actual_reduce_tasks; // number of reduce tasks actually used
- unsigned map_tasks;
- unsigned map_tasks_error;
- unsigned map_tasks_completed;
+
+ // counters for map key processing
+ unsigned map_keys_executed;
+ unsigned map_key_errors;
+ unsigned map_keys_completed;
+
+ // counters for reduce key processing
+ unsigned reduce_keys_executed;
+ unsigned reduce_key_errors;
+ unsigned reduce_keys_completed;
+
         unsigned num_result_files; // number of result files created
 
         tag_counters()
           : actual_map_tasks(0),
             actual_reduce_tasks(0),
- map_tasks(0),
- map_tasks_error(0),
- map_tasks_completed(0),
+ map_keys_executed(0),
+ map_key_errors(0),
+ map_keys_completed(0),
+ reduce_keys_executed(0),
+ reduce_key_errors(0),
+ reduce_keys_completed(0),
             num_result_files(0)
         {
         }
     } counters;
 
+ results()
+ : job_runtime(0),
+ map_runtime(0),
+ reduce_runtime(0)
+ {
+ }
+
     time_t job_runtime;
     time_t map_runtime;
     time_t reduce_runtime;
@@ -66,6 +93,14 @@
     std::vector<time_t> reduce_times;
 };
 
+template<typename Job>
+void run(boost::mapreduce::specification &spec, boost::mapreduce::results &result)
+{
+ Job::datasource_type datasource(spec);
+ Job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::cpu_parallel<Job> >(result);
+}
+
 } // namespace mapreduce
 
 } // namespace boost

Modified: sandbox/boost/mapreduce/datasource.hpp
==============================================================================
--- sandbox/boost/mapreduce/datasource.hpp (original)
+++ sandbox/boost/mapreduce/datasource.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -13,42 +13,184 @@
 #ifndef BOOST_DATASOURCE_SCHEDULE_POLICY_HPP
 #define BOOST_DATASOURCE_SCHEDULE_POLICY_HPP
 
+#include <boost/iostreams/device/mapped_file.hpp>
+
 namespace boost {
 
 namespace mapreduce {
 
 namespace datasource {
 
-template<typename MapTask>
-class directory_iterator : boost::noncopyable
+template<typename Key, typename Value>
+class file_handler
 {
   public:
- bool const setup_key(typename MapTask::key_type &key) const
+ file_handler(boost::mapreduce::specification const &spec)
+ : specification_(spec), data_(new data)
+ {
+ }
+
+ bool const get_data(Key const &key, Value &value) const;
+ bool const setup_key(Key &/*key*/) const { return false; }
+
+ private:
+ boost::mapreduce::specification const &specification_;
+
+ struct data;
+ boost::shared_ptr<data> data_;
+};
+
+template<>
+struct file_handler<
+ std::string,
+ std::ifstream>::data
+{
+};
+
+template<>
+bool const
+file_handler<
+ std::string,
+ std::ifstream>::get_data(
+ std::string const &key,
+ std::ifstream &value) const
+{
+ value.open(key.c_str());
+ return value.is_open();
+}
+
+
+template<>
+struct file_handler<
+ std::string,
+ std::pair<
+ char const *,
+ char const *> >::data
+{
+ struct detail
+ {
+ boost::iostreams::mapped_file mmf; // memory mapped file
+ boost::uintmax_t size; // size of the file
+ boost::uintmax_t offset; // offset to map next time
+ };
+
+ typedef
+ std::map<std::string, boost::shared_ptr<detail> >
+ maps_t;
+
+ maps_t maps;
+ boost::mutex mutex;
+ std::string current_file;
+};
+
+template<>
+bool const
+file_handler<
+ std::string,
+ std::pair<
+ char const *,
+ char const *> >::get_data(
+ std::string const &key,
+ std::pair<char const *, char const *> &value) const
+{
+ // we need to hold the lock for the duration of this function
+ boost::mutex::scoped_lock l(data_->mutex);
+ data::maps_t::iterator it;
+ if (data_->current_file.empty())
     {
- while (it_dir_ != boost::filesystem::basic_directory_iterator<path_t>()
- && boost::filesystem::is_directory(*it_dir_))
+ data_->current_file = key;
+ it = data_->maps.insert(std::make_pair(key, boost::shared_ptr<data::detail>(new data::detail))).first;
+ it->second->mmf.open(key, BOOST_IOS::in);
+ if (!it->second->mmf.is_open())
         {
- ++it_dir_;
+ std::cout << "\nFailed to map file into memory: " << key;
+ return false;
         }
 
- if (it_dir_ == boost::filesystem::basic_directory_iterator<path_t>())
- return false;
+ it->second->size = boost::filesystem::file_size(key);
+ it->second->offset = std::min(specification_.max_file_segment_size, it->second->size);
+ value.first = it->second->mmf.const_data();
+ value.second = value.first + it->second->offset;
+ }
+ else
+ {
+ BOOST_ASSERT(key == data_->current_file);
+ it = data_->maps.find(key);
+ BOOST_ASSERT(it != data_->maps.end());
+
+ value.first = it->second->mmf.const_data() + it->second->offset;
+ it->second->offset = std::min(it->second->offset+specification_.max_file_segment_size, it->second->size);
+ value.second = it->second->mmf.const_data() + it->second->offset;
+ }
 
- path_t path = *it_dir_++;
- key = path.external_file_string();
- return true;
+ if (it->second->offset == it->second->size)
+ data_->current_file.clear();
+
+ ///!!! parameterise finding a word boundary
+ char ch = std::toupper(*value.second);
+ while ((ch == '\'' || (ch >= 'A' && ch <= 'Z')) && it->second->offset != it->second->size)
+ {
+ ++value.second;
+ ++it->second->offset;
+ ch = std::toupper(*value.second);
     }
 
- bool const get_data(typename MapTask::key_type &key, typename MapTask::value_type &istream) const
+//std::cout<<"\nget_data(): next offset will be " << it->second->offset;
+ return true;
+}
+
+template<>
+bool const
+file_handler<
+ std::string,
+ std::pair<
+ char const *,
+ char const *> >::setup_key(std::string &key) const
+{
+ boost::mutex::scoped_lock l(data_->mutex);
+ if (data_->current_file.empty())
+ return false;
+ key = data_->current_file;
+ return true;
+}
+
+template<
+ typename MapTask,
+ typename FileHandler = file_handler<
+ typename MapTask::key_type,
+ typename MapTask::value_type> >
+class directory_iterator : boost::noncopyable
+{
+ public:
+ directory_iterator(boost::mapreduce::specification const &spec)
+ : specification_(spec),
+ file_handler_(spec)
+ {
+ it_dir_ = boost::filesystem::basic_directory_iterator<path_t>(specification_.input_directory);
+ }
+
+ bool const setup_key(typename MapTask::key_type &key) const
     {
- istream.open(key.c_str());
- return istream.is_open();
+ if (!file_handler_.setup_key(key))
+ {
+ while (it_dir_ != boost::filesystem::basic_directory_iterator<path_t>()
+ && boost::filesystem::is_directory(*it_dir_))
+ {
+ ++it_dir_;
+ }
+
+ if (it_dir_ == boost::filesystem::basic_directory_iterator<path_t>())
+ return false;
+
+ path_t path = *it_dir_++;
+ key = path.external_file_string();
+ }
+ return true;
     }
 
- void set_directory(char const *directory)
+ bool const get_data(typename MapTask::key_type &key, typename MapTask::value_type &value) const
     {
- directory_ = directory;
- it_dir_ = boost::filesystem::basic_directory_iterator<path_t>(directory_);
+ return file_handler_.get_data(key, value);
     }
 
   private:
@@ -57,7 +199,9 @@
     path_t;
 
     mutable boost::filesystem::basic_directory_iterator<path_t> it_dir_;
- std::string directory_;
+ std::string directory_;
+ boost::mapreduce::specification const &specification_;
+ FileHandler file_handler_;
 };
 
 } // namespace datasource

Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -19,6 +19,27 @@
 
 template<typename T> size_t length(T const &str);
 
+template<
+ typename MapKey,
+ typename MapValue,
+ typename IntermediateKey,
+ typename IntermediateValue>
+class map_task
+{
+ public:
+ typedef MapKey key_type;
+ typedef MapValue value_type;
+ typedef IntermediateKey intermediate_key_type;
+ typedef IntermediateValue intermediate_value_type;
+};
+
+template<typename ReduceValue>
+class reduce_task
+{
+ public:
+ typedef ReduceValue value_type;
+};
+
 template<typename MapTask,
          typename ReduceTask,
          typename Combiner=null_combiner,
@@ -47,7 +68,7 @@
         // 'value' parameter is not a reference to const to enable streams to be passed
         map_task_runner &operator()(typename map_task_type::key_type const &key, typename map_task_type::value_type &value)
         {
- map_task_type(*this)(key, value);
+ map_task_type::map(*this, key, value);
 
             // consolidating map intermediate results can save network time by
             // aggregating the mapped valued at mapper
@@ -87,7 +108,10 @@
     class reduce_task_runner : boost::noncopyable
     {
       public:
- reduce_task_runner(std::string const &output_filespec, size_t const partition, size_t const num_partitions)
+ reduce_task_runner(
+ std::string const &output_filespec,
+ size_t const partition,
+ size_t const num_partitions)
         {
             std::ostringstream filename;
             filename << output_filespec << partition+1 << "_of_" << num_partitions;
@@ -104,8 +128,7 @@
         template<typename It>
         void operator()(typename map_task_type::intermediate_key_type const &key, It it, It ite)
         {
- reduce_task_type reduce_task(*this);
- reduce_task(key, it, ite);
+ reduce_task_type::reduce(*this, key, it, ite);
         }
 
         std::string const &filename(void) const
@@ -118,10 +141,9 @@
         std::ofstream output_file_;
     };
 
- job(datasource_type &datasource)
- : num_partitions_(0),
- datasource_(datasource),
- output_filespec_(".\\mapreduce_")
+ job(datasource_type &datasource, specification const &spec)
+ : datasource_(datasource),
+ specification_(spec)
      {
      }
 
@@ -139,11 +161,6 @@
         partition_files_.clear();
     }
 
- void set_output_filespec(std::string const &output_filespec)
- {
- output_filespec_ = output_filespec;
- }
-
     bool const get_next_map_key(void *&key)
     {
         std::auto_ptr<typename map_task_type::key_type> next_key(new typename map_task_type::key_type);
@@ -159,62 +176,69 @@
     }
 
     template<typename SchedulePolicy>
- void run(specification const &spec, results &result)
+ void run(results &result)
     {
         SchedulePolicy schedule;
- this->run(schedule, spec, result);
+ this->run(schedule, result);
     }
 
     template<typename SchedulePolicy>
- void run(SchedulePolicy &schedule, specification const &spec, results &result)
+ void run(SchedulePolicy &schedule, results &result)
     {
         time_t const start_time = time(NULL);
- schedule(*this, spec, result);
+ schedule(*this, result);
         result.job_runtime = time(NULL) - start_time;
     }
 
     bool const run_map_task(void *key, results &result)
     {
+ bool success = false;
         time_t const start_time = time(NULL);
 
- ++result.counters.map_tasks;
+ try
+ {
+ std::auto_ptr<typename map_task_type::key_type> map_key_ptr(reinterpret_cast<typename map_task_type::key_type *>(key));
+ typename map_task_type::key_type &map_key = *map_key_ptr;
 
- std::auto_ptr<typename map_task_type::key_type> map_key_ptr(reinterpret_cast<typename map_task_type::key_type *>(key));
- typename map_task_type::key_type &map_key = *map_key_ptr;
+ // get the data
+ typename map_task_type::value_type value;
+ if (!datasource_.get_data(map_key, value))
+ return false;
 
- // get the data
- typename map_task_type::value_type value;
- if (!datasource_.get_data(map_key, value))
- {
- ++result.counters.map_tasks_error;
- return false;
- }
+ ++result.counters.map_keys_executed;
+
+ // Map Task
+ if (map_key == typename map_task_type::key_type() || value == typename map_task_type::value_type())
+ {
+ BOOST_ASSERT(map_key != typename map_task_type::key_type());
+ BOOST_ASSERT(value != typename map_task_type::value_type());
+ ++result.counters.map_key_errors;
+ return false;
+ }
+
+ map_task_runner runner(*this);
+ runner(map_key, value);
 
- // Map Task
- if (map_key == typename map_task_type::key_type() || value == typename map_task_type::value_type())
+ ++result.counters.map_keys_completed;
+ }
+ catch (std::exception &)
         {
- BOOST_ASSERT(map_key != typename map_task_type::key_type());
- BOOST_ASSERT(value != typename map_task_type::value_type());
- ++result.counters.map_tasks_error;
- return false;
+ ++result.counters.map_key_errors;
+ success = false;
         }
- map_task_runner runner(*this);
- runner(map_key, value);
-
         result.map_times.push_back(time(NULL)-start_time);
 
- ++result.counters.map_tasks_completed;
- return true;
+ return success;
     }
 
     unsigned const number_of_partitions(void) const
     {
- return num_partitions_;
+ return specification_.reduce_tasks;
     }
 
- void number_of_partitions(unsigned const partitions)
+ unsigned const number_of_map_tasks(void) const
     {
- num_partitions_ = partitions;
+ return specification_.map_tasks;
     }
 
     // the caller must synchronise calls to this function from multiple threads
@@ -233,38 +257,53 @@
     bool const run_reduce_task(unsigned const partition, filenames_t const &filenames, results &result)
     {
         time_t const start_time = time(NULL);
- std::string const filename = intermediate_store_type::merge_and_sort(filenames);
 
- reduce_task_runner runner(output_filespec_, partition, num_partitions_);
+ try
+ {
+ std::string const filename = intermediate_store_type::merge_and_sort(filenames);
 
- typename map_task_type::intermediate_key_type key;
- typename map_task_type::intermediate_key_type last_key;
- typename map_task_type::intermediate_value_type value;
- std::list<typename map_task_type::intermediate_value_type> values;
+ typename map_task_type::intermediate_key_type key;
+ typename map_task_type::intermediate_key_type last_key;
+ typename map_task_type::intermediate_value_type value;
+ std::list<typename map_task_type::intermediate_value_type> values;
+
+ reduce_task_runner runner(
+ specification_.output_filespec,
+ partition,
+ number_of_partitions());
 
- std::ifstream infile(filename.c_str());
- while (intermediate_store_type::read_record(infile, key, value))
- {
- if (key != last_key && length(key) > 0)
+ std::ifstream infile(filename.c_str());
+ while (intermediate_store_type::read_record(infile, key, value))
             {
- if (length(last_key) > 0)
+ if (key != last_key && length(key) > 0)
                 {
- runner(last_key, values.begin(), values.end());
- values.clear();
+ if (length(last_key) > 0)
+ {
+ ++result.counters.reduce_keys_executed;
+ runner(last_key, values.begin(), values.end());
+ values.clear();
+ }
+ if (length(key) > 0)
+ std::swap(key, last_key);
                 }
- if (length(key) > 0)
- std::swap(key, last_key);
- }
-
- values.push_back(value);
- }
 
- if (length(last_key) > 0)
- runner(last_key, values.begin(), values.end());
+ values.push_back(value);
+ }
 
- infile.close();
- boost::filesystem::remove(filename.c_str());
+ if (length(last_key) > 0)
+ {
+ ++result.counters.reduce_keys_executed;
+ runner(last_key, values.begin(), values.end());
+ }
 
+ infile.close();
+ boost::filesystem::remove(filename.c_str());
+ }
+ catch (std::exception &)
+ {
+ ++result.counters.reduce_key_errors;
+ }
+
         result.reduce_times.push_back(time(NULL)-start_time);
 
         return true;
@@ -277,10 +316,9 @@
         filenames_t> // file names
     partition_files_t;
 
- unsigned num_partitions_;
- datasource_type &datasource_;
- std::string output_filespec_;
- partition_files_t partition_files_;
+ datasource_type &datasource_;
+ partition_files_t partition_files_;
+ specification const &specification_;
 };
 
 template<>

Modified: sandbox/boost/mapreduce/mergesort.hpp
==============================================================================
--- sandbox/boost/mapreduce/mergesort.hpp (original)
+++ sandbox/boost/mapreduce/mergesort.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -154,6 +154,33 @@
     return boost::filesystem::remove(pathname);
 }
 
+template<typename Filenames>
+class temporary_file_manager : boost::noncopyable
+{
+ public:
+ temporary_file_manager(Filenames &filenames)
+ : filenames_(filenames)
+ {
+ }
+
+ ~temporary_file_manager()
+ {
+ try
+ {
+ // bind to the pass-though delete_file function because boost::filesystem::remove
+ // takes a boost::filesystem::path parameter and not a std::string parameter - the
+ // compiler will understandably not bind using an implicit conversion
+ std::for_each(filenames_.begin(), filenames_.end(), boost::bind(delete_file, _1));
+ }
+ catch (std::exception &)
+ {
+ }
+ }
+
+ private:
+ Filenames &filenames_;
+};
+
 } // namespace detail
 
 
@@ -163,7 +190,10 @@
                              unsigned const offset,
                              unsigned const max_lines = 1000000)
 {
- std::deque<std::string> temporary_files;
+ std::deque<std::string> temporary_files;
+ detail::temporary_file_manager<
+ std::deque<std::string> > tfm(temporary_files);
+
     std::ifstream infile(in, std::ios_base::in | std::ios_base::binary);
     if (!infile.is_open())
     {
@@ -203,9 +233,7 @@
         }
     }
     infile.close();
-
     detail::do_file_merge(temporary_files.begin(), temporary_files.end(), out, offset);
- std::for_each(temporary_files.begin(), temporary_files.end(), boost::bind(detail::delete_file, _1));
 
         return true;
 }

Modified: sandbox/boost/mapreduce/platform.hpp
==============================================================================
--- sandbox/boost/mapreduce/platform.hpp (original)
+++ sandbox/boost/mapreduce/platform.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -73,7 +73,7 @@
         BOOST_THROW_EXCEPTION(boost::system::system_error(GetLastError(), boost::system::system_category));
 
     Char file[_MAX_PATH+1];
- if (!detail::os_temporary_file_api_traits<Char>::get_temp_filename(path, "boost", 0, file))
+ if (!detail::os_temporary_file_api_traits<Char>::get_temp_filename(path, "mr_", 0, file))
         BOOST_THROW_EXCEPTION(boost::system::system_error(GetLastError(), boost::system::system_category));
 
     pathname = file;

Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -13,14 +13,7 @@
 #ifndef BOOST_MAPREDUCE_CPU_PARALLEL_HPP
 #define BOOST_MAPREDUCE_CPU_PARALLEL_HPP
 
-#ifdef BOOST_MSVC
-#pragma warning(push)
-#pragma warning(disable: 4244) // warning C4244: 'argument' : conversion from 'X' to 'Y', possible loss of data
-#endif
 #include <boost/thread.hpp>
-#ifdef BOOST_MSVC
-#pragma warning(pop)
-#endif
 
 namespace boost {
 
@@ -85,10 +78,9 @@
 class cpu_parallel
 {
   public:
- void operator()(Job &job, specification const &spec, results &result)
+ void operator()(Job &job, results &result)
     {
- unsigned const num_cpus = std::max(1,boost::thread::hardware_concurrency());
- job.number_of_partitions(spec.reduce_tasks);
+ unsigned const num_cpus = std::max(1U,boost::thread::hardware_concurrency());
 
         typedef std::vector<boost::shared_ptr<results> > all_results_t;
         all_results_t all_results;
@@ -98,7 +90,7 @@
         time_t start_time = time(NULL);
         boost::thread_group map_threads;
 
- unsigned const map_tasks = (spec.map_tasks==0)? num_cpus : std::min(num_cpus, spec.map_tasks);
+ unsigned const map_tasks = std::max(num_cpus,std::min(num_cpus, job.number_of_map_tasks()));
 
         for (unsigned loop=0; loop<map_tasks; ++loop)
         {
@@ -146,9 +138,12 @@
              it!=all_results.end();
              ++it)
         {
- result.counters.map_tasks += (*it)->counters.map_tasks;
- result.counters.map_tasks_error += (*it)->counters.map_tasks_error;
- result.counters.map_tasks_completed += (*it)->counters.map_tasks_completed;
+ result.counters.map_keys_executed += (*it)->counters.map_keys_executed;
+ result.counters.map_key_errors += (*it)->counters.map_key_errors;
+ result.counters.map_keys_completed += (*it)->counters.map_keys_completed;
+ result.counters.reduce_keys_executed += (*it)->counters.reduce_keys_executed;
+ result.counters.reduce_key_errors += (*it)->counters.reduce_key_errors;
+ result.counters.reduce_keys_completed += (*it)->counters.reduce_keys_completed;
 
             std::copy(
                 (*it)->map_times.begin(),

Modified: sandbox/boost/mapreduce/schedule_policy/sequential.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/sequential.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/sequential.hpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -23,10 +23,8 @@
 class sequential
 {
   public:
- void operator()(Job &job, specification const &spec, results &result)
+ void operator()(Job &job, results &result)
     {
- job.number_of_partitions(spec.reduce_tasks);
-
         // Map Tasks
         time_t start_time = time(NULL);
         void *key = 0;

Modified: sandbox/libs/mapreduce/doc/tutorial.html
==============================================================================
--- sandbox/libs/mapreduce/doc/tutorial.html (original)
+++ sandbox/libs/mapreduce/doc/tutorial.html 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -79,55 +79,52 @@
                   <code>key_type</code> and <code> value_type</code></li>
                 <li>Provide type definitions for Intermediate Key (<code>k2</code>) and Intermediate Value (<code>v2</code>);
                   <code>intermediate_key_type</code> and <code> intermediate_value_type</code></li>
- <li>Define a constructor taking a <code>job::map_task_runner</code> object by reference</li>
- <li>Store a reference to the <code>job::map_task_runner</code> object passed to the constructor,
- to be used to emit intermediate results</li>
- <li>Define a function-call operator <code>void operator()(key_type const &key, value_type
- const &value);</code> Note that the <code>const</code> qualifiers on these parameters
- are optional, but recommended where possible.</li>
+ <li>Implement a static mapper function <code>map()</code></li>
               </ul>
 <pre>
-class map_task
+struct map_task : public boost::mapreduce::map_task&lt;
+ std::string, // MapKey
+ std::pair&lt;char const *, char const *&gt;, // MapValue
+ std::string, // IntermediateKey
+ unsigned&gt; // IntermediateValue
 {
- public:
- typedef std::string key_type;
- typedef std::ifstream value_type;
- typedef std::string intermediate_key_type;
- typedef unsigned intermediate_value_type;
-
- map_task(job::map_task_runner &runner);
- void operator()(key_type const &key, value_type const &value);
-
- private:
- job::map_task_runner &runner_;
+ template&lt;typename Runtime&gt;
+ static void map(Runtime &runtime, std::string const &key, value_type const &value);
 };
 </pre>
+<p>The MapTask functor is derived from the <code>boost::mapreduce::map_task</code> to define the
+types required by the Boost.MapReduce library.</p>
+
+<p>The <code>map</code> function is implemented as a template function with the first function parameter
+being a template type <code>Runtime</code>. This parameter is passed by the library to be used as a callback
+to <em>emit</em> intermediate key/value pairs. The other two parameters <code>key</code> and <code>value</code>
+are of types defined in the <code>map_task</code> template parameter list. Note that the <code>const</code>
+qualifiers on these parametersare optional, but recommended where possible.</p>
+
               <h2>ReduceTask</h2>
               <p>Requirements of a ReduceTask function object are</p>
               <ul>
                 <li>Provide type definitions for Reduce Value (<code>v2</code>);
                   <code> value_type</code></li>
- <li>Define a constructor taking a <code>job::reduce_task_runner</code> object by reference</li>
- <li>Store a reference to the <code>job::reduce_task_runner</code> object passed to the constructor,
- to be used to emit results</li>
- <li>Define a function-call operator <code>void operator()(typename map_task::intermediate_key_type
- const &key, It it, It ite);</code> where It is an iterator type.</li>
+ <li>Implement a static reducer function <code>reduce()</code></li>
               </ul>
 <pre>
-class reduce_task
+struct reduce_task : public boost::mapreduce::reduce_task&lt;unsigned&gt;
 {
- public:
- typedef unsigned value_type;
-
- reduce_task(job::reduce_task_runner &runner);
-
- template&lt;typename It&gt;
- void operator()(typename map_task::intermediate_key_type const &key, It it, It ite);
-
- private:
- job::reduce_task_runner &runner_;
+ template&lt;typename Runtime, typename It&gt;
+ static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
+ {
+ runtime.emit(key, std::accumulate(it, ite, 0));
+ }
 };
 </pre>
+<p>The ReduceTask functor is derived from the <code>boost::mapreduce::reduce_task</code> to define the
+types required by the Boost.MapReduce library.</p>
+<p>The <code>reduce</code> function is implemented as a template function with the first function parameter
+being a template type <code>Runtime</code>, as with the <code>map</code> function above. The second template
+parameter defines an <em>iterator type</em> for function parameters 3 and 4 to bound the <em>list</em> of
+value pairs.</p>
+
 <p>See the <a href='./wordcount.html'>Word Count example</a> for a detailed breakdown of a simple implementation.</p>
             </div>
           </div>

Modified: sandbox/libs/mapreduce/doc/wordcount.html
==============================================================================
--- sandbox/libs/mapreduce/doc/wordcount.html (original)
+++ sandbox/libs/mapreduce/doc/wordcount.html 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -50,131 +50,130 @@
 map (filename; string, file stream; ifstream) --&gt; list(word; string, count; unsigned int)
 reduce (word; string, list(count; unsigned int)) --&gt; list(count; unsigned int)</pre>
 
-
-<h2>Type Definitions</h2>
-<p>
- For convenience, brevity and maintainability, define a <code>job</code> type for the MapReduce job.
- This local <code>job</code> type will be an defined in terms of the library's <code>mapreduce::job</code>
- class with template parameters specific to the Word Count application.
-</p>
-
-<pre>
-typedef
-mapreduce::job&lt;
- wordcount::map_task,
- wordcount::reduce_task&gt;
-job;
-</pre>
-<p>The class <code>mapreduce::job</code> actually has 5 template parameters. The first two must be supplied, the last
-three have default values. The definition above is therefore equivalent to</p>
-<pre>
-typedef
-mapreduce::job&lt;
- class wordcount::map_task,
- class wordcount::reduce_task,
- struct mapreduce::null_combiner,
- class mapreduce::datasource::directory_iterator&lt;class wordcount::map_task&gt;,
- class mapreduce::intermediates::local_disk&lt;
- class wordcount::map_task,
- struct mapreduce::detail::file_sorter,
- struct mapreduce::detail::file_merger&gt;
- &gt
-job;
-</pre>
-
 <h2>MapTask</h2>
 <p>
   The MapTask will be implemented by a function-object <code>wordcount::map_task</code>. There are four required
- data types to be defined in the functor for the <code>key</code>/<code>value</code> types of the input and
- output of the map task.
+ data types to be defined by the MapTask. These are defined by the base class via template parameters. The types
+ are the <code>key</code>/<code>value</code> types of the input and output of the map task.
+</p>
+<p>
+ The <code>map</code> function takes three parameters; the <code>runtime</code> which is passed by the MapReduce
+ library to be used as a callback to <em>emit</em> intermediate key/value pairs. The other two parameters are the
+ <code>key</code> and <code>value</code> for the map task to process. Normally these parameters would be expected
+ to be passed as a reference-to-const, but this is not a requirement. For example, if the <code>value</code> type
+ was to be defined as a file stream <code>std::ifstream</code>, then reference-to-const is not possible as the
+ function would not be able to read from the file as the read operation modifies the state of the object.
 </p>
-<pre>
-typedef std::string key_type;
-typedef std::ifstream value_type;
-typedef std::string intermediate_key_type;
-typedef unsigned intermediate_value_type;
-</pre>
 <p>
- Now the function-call operator, which takes two parameters; the <code>key</code> and <code>value</code> for the
- map task to process. Normally these parameters would be expected to be passed as a reference-to-const, but in
- the Word Count example, the <code>value</code> parameter is defined as an <code>std::ifstream</code> object. If
- this was passed as reference-to-const, then the function would not be able to read from the file as the read
- operation modifies the state of the object. As a result, the <code>value</code> parameter is passed as a plain
- reference.
-</p>
-<p>
- The function simply loops until the end-of-file is reached on the supplied <code>std::ifstream</code> object.
- In each iteration a <em>word</em> is read into a <code>string</code> object, converted to lowercase text and
- non-alphanumeric characters are stripped from the beginning and end. The <em>word</em> is then stored as an
- intermediate <code>key</code> with a <code>value</code> of <code>1</code>, by calling the
- <code>emit_intermediate()</code> function of the <code>job::map_task_runner</code> object which was passed to
- the constructor of the <code>map_task</code> object.
+ The function simply loops through the character array (defined by begin/end pointers in the <code>value</code>
+ parameter), defining a <em>word</em> to be a contiguous set of alphabetic characters or an apostrophe. The
+ <em>word</em> is then stored as an intermediate <code>key</code> with a <code>value</code> of <code>1</code>,
+ by calling the <code>emit_intermediate()</code> function of the <code>runtime</code> parameter object.
 </p>
 <pre>
-// not a reference to const to enable streams to be passed
-void operator()(key_type const &/*key*/, value_type &value)
+struct map_task : public boost::mapreduce::map_task&lt;
+ std::string, // MapKey
+ std::pair&lt;char const *, char const *&gt;, // MapValue
+ std::string, // IntermediateKey
+ unsigned&gt; // IntermediateValue
 {
- while (!value.eof())
+ template&lt;typename Runtime&gt;
+ static void map(Runtime &runtime, std::string const &/*key*/, value_type &value)
     {
- std::string word;
- value &gt;&gt; word;
- std::transform(word.begin(), word.end(), word.begin(),
- std::bind1st(
- std::mem_fun(&std::ctype&lt;char&gt;::tolower),
- &std::use_facet&lt;std::ctype&lt;char&gt; &gt;(std::locale::classic())));
-
- size_t length = word.length();
- size_t const original_length = length;
- std::string::const_iterator it;
- for (it=word.begin();
- it!=word.end() && !std::isalnum(*it, std::locale::classic());
- ++it)
- {
- --length;
- }
-
- for (std::string::const_reverse_iterator rit=word.rbegin();
- length&gt;0 && !std::isalnum(*rit, std::locale::classic());
- ++rit)
+ bool in_word = false;
+ char const *ptr = value.first;
+ char const *end = value.second;
+ char const *word = ptr;
+ for (; ptr != end; ++ptr)
         {
- --length;
+ char const ch = std::toupper(*ptr);
+ if (in_word)
+ {
+ if ((ch < 'A' || ch > 'Z') && ch != '\'')
+ {
+ runtime.emit_intermediate(std::string(word,ptr-word), 1);
+ in_word = false;
+ }
+ }
+ else
+ {
+ if (ch >= 'A' && ch <= 'Z')
+ {
+ word = ptr;
+ in_word = true;
+ }
+ }
         }
-
- if (length &gt; 0)
+ if (in_word)
         {
- if (length == original_length)
- runner_.emit_intermediate(word, 1);
- else
- runner_.emit_intermediate(std::string(&*it,length), 1);
+ BOOST_ASSERT(ptr-word > 0);
+ runtime.emit_intermediate(std::string(word,ptr-word), 1);
         }
     }
-}
+};
 </pre>
 
 <h2>ReduceTask</h2>
 <p>
- The ReduceTask will be implemented by a function-object <code>wordcount::reduce_task</code>. There
- is one required data type to be defined in the functor for the <code>value</code> type output of
- the reduce task.
+ The ReduceTask will be implemented by a function-object <code>wordcount::reduce_task</code>. This
+ functor is derived from the library's <code>reduce_task</code> class which takes a single template
+ parameter to define the <code>value</code> type output of the reduce task.
+</p>
+<p>
+ The <code>reduce</code> function takes four parameters; the <code>runtime</code> object is the library's
+ callback as described above. The second parametrer is the <code>key</code> of the reduce task and the
+ third and fourth parameters are a pair of iterators dictating the range of <code>value</code> objects
+ for the reduce task. In this Word Count example, the <code>key</code> is a text string containing the
+ <em>word</em>, and the iterators contain a list of frequencies for the word. The ReduceTask simply sums
+ the frequencies by calling <code>std::accumulate</code> and stores the final result by calling the
+ <code>emit()</code> function of the <code>job::reduce_task_runner</code> object which was passed to the
+ constructor of the <code>reduce_task</code> object.
 </p>
 <pre>
-typedef unsigned value_type;
+struct reduce_task : public boost::mapreduce::reduce_task&lt;unsigned&gt;
+{
+ template&lt;typename Runtime, typename It&gt;
+ static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
+ {
+ runtime.emit(key, std::accumulate(it, ite, 0));
+ }
+};
 </pre>
+
+<h2>Type Definitions</h2>
 <p>
- The function-call operator takes three parameters; the <code>key</code> of the reduce task and a pair
- of iterators dictating the range of <code>value</code> objects for the reduce task. In this Word Count
- example, the <code>key</code> is a text string containing the <em>word</em>, and the iterators contain
- a list of frequencies for the word. The ReduceTask simply sums the frequencies by calling
- <code>std::accumulate</code> and stores the final result by calling the <code>emit()</code> function of
- the <code>job::reduce_task_runner</code> object which was passed to the constructor of the
- <code>reduce_task</code> object.
+ For convenience, brevity and maintainability, define a <code>job</code> type for the MapReduce job.
+ This local <code>job</code> type will be an defined in terms of the library's <code>mapreduce::job</code>
+ class with template parameters specific to the Word Count application.
 </p>
+
 <pre>
-template&lt;typename It&gt;
-void operator()(typename map_task::intermediate_key_type const &key, It it, It const ite)
-{
- runner_.emit(key, std::accumulate(it, ite, reduce_task::value_type()));
-}
+typedef
+mapreduce::job&lt;
+ wordcount::map_task,
+ wordcount::reduce_task&gt;
+job;
+</pre>
+<p>The class <code>mapreduce::job</code> actually has 5 template parameters. The first two must be supplied, the last
+three have default values. The definition above is actually equivalent to</p>
+<pre>
+typedef
+class boost::mapreduce::job&lt;
+ struct wordcount::map_task,
+ struct wordcount::reduce_task,
+ class wordcount::combiner,
+ class boost::mapreduce::datasource::directory_iterator&lt;
+ struct wordcount::map_task,
+ class boost::mapreduce::datasource::file_handler&lt;
+ class stlp_std::basic_string&lt;char,class stlp_std::char_traits&lt;char&gt;,
+ class stlp_std::allocator&lt;char&gt; &gt;,
+ struct stlp_std::pair&lt;char const *,char const *&gt; &gt; &gt;,
+ class boost::mapreduce::intermediates::local_disk&lt;
+ struct wordcount::map_task,
+ struct boost::mapreduce::detail::file_sorter,
+ struct boost::mapreduce::detail::file_merger,
+ struct boost::mapreduce::hash_partitioner&gt; &gt;
+job;
 </pre>
 
 <h2>Program</h2>
@@ -187,7 +186,7 @@
   A <code>datasource</code> object is created to iterate through a directory of files and
   pass each file into a map task. A <code>mapreduce::specification</code> object is then
   created. This is used to specify system parameters such a the number of map tasks to run.
- <em>Note that this is a hint to the MapReduce runtime, and may differ from th actual
+ <em>Note that this is a hint to the MapReduce runtime, and may differ from the actual
   number of maps that are used.</em> The final supporting object that is created is an
   instance of <code>mapreduce::results</code>. This structure will be populated by the
   runtime to provide metrics and timings of the MapReduce job execution.
@@ -211,19 +210,51 @@
   See <a href='./schedule_policies.html'>Schedule Policies</a> for more information.
 </p>
 
+<p>
+ To reduce the complexity of dealing with the compexity of MapReduce, the library provides a
+ very simple interface to running a system with default values. In this example, we want to
+ run the Job without any specific configuration, so we can use the function
+</p>
+<pre>
+template&lt;typename Job&gt;
+void run(boost::mapreduce::specification &spec, boost::mapreduce::results &result);
+</pre>
+<p>
+ This function will default construct the objects required using the type definitions provide -
+ or defaulted - in the <code>job</code> definition. The user-defined program is therefore very
+ simple, with all the generic complexity being handled by the library:
+</p>
+
 <pre>
 int main(int argc, char **argv)
 {
- wordcount::job::datasource_type datasource;
- datasource.set_directory(argv[1]);
+ std::cout &lt;&lt; "MapReduce Wordcount Application";
+ if (argc &lt; 2)
+ {
+ std::cerr &lt;&lt; "Usage: wordcount directory [num_map_tasks]\n";
+ return 1;
+ }
 
- mapreduce::specification spec;
- spec.map_tasks = atoi(argv[2]);
+ boost::mapreduce::specification spec;
+ spec.input_directory = argv[1];
 
- mapreduce::results result;
- wordcount::job mr2(datasource);
+ std::cout &lt;&lt; "\n" &lt;&lt; std::max(1,(int)boost::thread::hardware_concurrency()) &lt;&lt; " CPU cores";
+ std::cout &lt;&lt; "\n" &lt;&lt; typeid(wordcount::job).name() &lt;&lt; "\n";
 
- mr2.run&lt;mapreduce::schedule_policy::cpu_parallel&lt;wordcount::job&gt; &gt;(spec, result);
+ boost::mapreduce::results result;
+ try
+ {
+ if (argc &gt; 2)
+ spec.map_tasks = atoi(argv[2]);
+
+ std::cout &lt;&lt; "\nRunning CPU Parallel MapReduce...";
+ boost::mapreduce::run&lt;wordcount::job&gt;(spec, result);
+ std::cout &lt;&lt; "\nCPU Parallel MapReduce Finished.";
+ }
+ catch (std::exception &e)
+ {
+ std::cout &lt;&lt; std::endl &lt;&lt; "Error: " &lt;&lt; e.what();
+ }
 
 ...
 </pre>
@@ -255,42 +286,47 @@
 <h2>Output</h2>
 <p>
   The wordcount program was run on a sample dataset consists of six plain text files consisting
- a total of 90.8 MB (95,284,354 bytes). The smallest file is 163 KB (167,529 bytes) and the largest
+ a total of 96 MB (100,628,434 bytes). The smallest file is 163 KB (167,529 bytes) and the largest
   is 88.1 MB (92,392,601 bytes).
 </p>
 <pre>
 MapReduce Wordcount Application
 2 CPU cores
-class mapreduce::job&lt;class wordcount::map_task,class wordcount::reduce_task,stru
-ct mapreduce::null_combiner,class mapreduce::datasource::directory_iterator&lt;clas
-s wordcount::map_task&gt;,class mapreduce::intermediates::local_disk&lt;class wordcoun
-t::map_task,struct mapreduce::detail::file_sorter,struct mapreduce::detail::file
-_merger&gt; &gt;
+class boost::mapreduce::job&lt;struct wordcount::map_task,struct wordcount::reduce_
+task,struct boost::mapreduce::null_combiner,class boost::mapreduce::datasource::
+directory_iterator&lt;struct wordcount::map_task,class boost::mapreduce::datasource
+::file_handler&lt;class stlp_std::basic_string&lt;char,class stlp_std::char_traits&lt;cha
+r&gt;,class stlp_std::allocator&lt;char&gt; &gt;,struct stlp_std::pair&lt;char const *,char con
+st *&gt; &gt; &gt;,class boost::mapreduce::intermediates::local_disk&lt;struct wordcount::ma
+p_task,struct boost::mapreduce::detail::file_sorter,struct boost::mapreduce::det
+ail::file_merger,struct boost::mapreduce::hash_partitioner&gt; &gt;
 
 Running CPU Parallel MapReduce...
 CPU Parallel MapReduce Finished.
 
 MapReduce statistics:
- MapReduce job runtime : 141 seconds, of which...
- Map phase runtime : 44 seconds
- Reduce phase runtime : 97 seconds
+ MapReduce job runtime : 220 seconds, of which...
+ Map phase runtime : 82 seconds
+ Reduce phase runtime : 138 seconds
 
   Map:
- Total Map keys : 6
- Map keys processed : 6
+ Total Map keys : 100
+ Map keys processed : 100
     Map key processing errors : 0
     Number of Map Tasks run (in parallel) : 2
     Fastest Map key processed in : 0 seconds
- Slowest Map key processed in : 43 seconds
- Average time to process Map keys : 7 seconds
+ Slowest Map key processed in : 3 seconds
+ Average time to process Map keys : 1 seconds
 
   Reduce:
+ Total Reduce keys : 153784
+ Reduce keys processed : 0
+ Reduce key processing errors : 0
     Number of Reduce Tasks run (in parallel): 2
     Number of Result Files : 10
- Fastest Reduce key processed in : 12 seconds
- Slowest Reduce key processed in : 36 seconds
- Average time to process Reduce keys : 30 seconds
-</pre>
+ Fastest Reduce key processed in : 16 seconds
+ Slowest Reduce key processed in : 44 seconds
+ Average time to process Reduce keys : 2 seconds</pre>
 
 <h2>Adding a Combiner</h2>
 <p>
@@ -338,39 +374,49 @@
 <pre>
 MapReduce Wordcount Application
 2 CPU cores
-class mapreduce::job&lt;class wordcount::map_task,class wordcount::reduce_task,clas
-s wordcount::combiner,class mapreduce::datasource::directory_iterator&lt;class word
-count::map_task&gt;,class mapreduce::intermediates::local_disk&lt;class wordcount::map
-_task,struct mapreduce::detail::file_sorter,struct mapreduce::detail::file_merge
-r&gt; &gt;
+class boost::mapreduce::job&lt;struct wordcount::map_task,struct wordcount::reduce_
+task,class wordcount::combiner,class boost::mapreduce::datasource::directory_ite
+rator&lt;struct wordcount::map_task,class boost::mapreduce::datasource::file_handle
+r&lt;class stlp_std::basic_string&lt;char,class stlp_std::char_traits&lt;char&gt;,class stlp
+_std::allocator&lt;char&gt; &gt;,struct stlp_std::pair&lt;char const *,char const *&gt; &gt; &gt;,cla
+ss boost::mapreduce::intermediates::local_disk&lt;struct wordcount::map_task,struct
+ boost::mapreduce::detail::file_sorter,struct boost::mapreduce::detail::file_mer
+ger,struct boost::mapreduce::hash_partitioner&gt; &gt;
 
 Running CPU Parallel MapReduce...
 CPU Parallel MapReduce Finished.
 
 MapReduce statistics:
- MapReduce job runtime : 116 seconds, of which...
- Map phase runtime : 114 seconds
- Reduce phase runtime : 2 seconds
+ MapReduce job runtime : 136 seconds, of which...
+ Map phase runtime : 125 seconds
+ Reduce phase runtime : 11 seconds
 
   Map:
- Total Map keys : 6
- Map keys processed : 6
+ Total Map keys : 100
+ Map keys processed : 100
     Map key processing errors : 0
     Number of Map Tasks run (in parallel) : 2
- Fastest Map key processed in : 1 seconds
- Slowest Map key processed in : 112 seconds
- Average time to process Map keys : 19 seconds
+ Fastest Map key processed in : 0 seconds
+ Slowest Map key processed in : 4 seconds
+ Average time to process Map keys : 2 seconds
 
   Reduce:
+ Total Reduce keys : 153784
+ Reduce keys processed : 0
+ Reduce key processing errors : 0
     Number of Reduce Tasks run (in parallel): 2
     Number of Result Files : 10
- Fastest Reduce key processed in : 0 seconds
- Slowest Reduce key processed in : 1 seconds
- Average time to process Reduce keys : 0 seconds
-</pre>
+ Fastest Reduce key processed in : 2 seconds
+ Slowest Reduce key processed in : 3 seconds
+ Average time to process Reduce keys : 0 seconds</pre>
 
 <h2>Source Code</h2>
-<p>The full source code for the Word Count example can be found <code>libs/mapreduce/test/wordcount/wordcount.cpp</code>.</p>
+<p>The full source code for the Word Count example can be found in
+<code>
+ <a href='https://svn.boost.org/svn/boost/sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp'>
+ libs/mapreduce/examples/wordcount/wordcount.cpp
+ </a>
+</code></p>
 
             </div>
           </div>

Added: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- (empty file)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -0,0 +1,173 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#if !defined(_DEBUG) && !defined(BOOST_DISABLE_ASSERTS)
+# pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
+#endif
+
+#include <boost/config.hpp>
+#if defined(BOOST_MSVC)
+# pragma warning(disable: 4244 4512 4267)
+#endif
+
+#include <boost/mapreduce.hpp>
+#include <numeric> // accumulate
+
+#if defined(BOOST_MSVC) && defined(_DEBUG)
+#include <crtdbg.h>
+#endif
+
+namespace wordcount {
+
+struct map_task : public boost::mapreduce::map_task<
+ std::string, // MapKey
+ std::pair<char const *, char const *>, // MapValue
+ std::string, // IntermediateKey
+ unsigned> // IntermediateValue
+{
+ template<typename Runtime>
+ static void map(Runtime &runtime, std::string const &/*key*/, value_type &value)
+ {
+ bool in_word = false;
+ char const *ptr = value.first;
+ char const *end = value.second;
+ char const *word = ptr;
+ for (; ptr != end; ++ptr)
+ {
+ char const ch = std::toupper(*ptr);
+ if (in_word)
+ {
+ if ((ch < 'A' || ch > 'Z') && ch != '\'')
+ {
+ runtime.emit_intermediate(std::string(word,ptr-word), 1);
+ in_word = false;
+ }
+ }
+ else
+ {
+ if (ch >= 'A' && ch <= 'Z')
+ {
+ word = ptr;
+ in_word = true;
+ }
+ }
+ }
+ if (in_word)
+ {
+ BOOST_ASSERT(ptr-word > 0);
+ runtime.emit_intermediate(std::string(word,ptr-word), 1);
+ }
+ }
+};
+
+struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+{
+ template<typename Runtime, typename It>
+ static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
+ {
+ runtime.emit(key, std::accumulate(it, ite, 0));
+ }
+};
+
+class combiner
+{
+ public:
+ void start(map_task::intermediate_key_type const &)
+ {
+ total_ = 0;
+ }
+
+ template<typename IntermediateStore>
+ void finish(map_task::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+ {
+ if (total_ > 0)
+ intermediate_store.insert(key, total_);
+ }
+
+ void operator()(map_task::intermediate_value_type const &value)
+ {
+ total_ += value;
+ }
+
+ private:
+ unsigned total_;
+};
+
+typedef
+boost::mapreduce::job<
+ wordcount::map_task
+ , wordcount::reduce_task
+ , wordcount::combiner
+> job;
+
+} // namespace wordcount
+
+
+
+int main(int argc, char **argv)
+{
+ std::cout << "MapReduce Wordcount Application";
+ if (argc < 2)
+ {
+ std::cerr << "Usage: wordcount directory [num_map_tasks]\n";
+ return 1;
+ }
+
+ boost::mapreduce::specification spec;
+ spec.input_directory = argv[1];
+
+ std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
+ std::cout << "\n" << typeid(wordcount::job).name() << "\n";
+
+ boost::mapreduce::results result;
+ try
+ {
+ if (argc > 2)
+ spec.map_tasks = atoi(argv[2]);
+
+ std::cout << "\nRunning CPU Parallel MapReduce...";
+ boost::mapreduce::run<wordcount::job>(spec, result);
+ std::cout << "\nCPU Parallel MapReduce Finished.";
+ }
+ catch (std::exception &e)
+ {
+ std::cout << std::endl << "Error: " << e.what();
+ }
+
+ std::cout << std::endl << "\n" << "MapReduce statistics:";
+ std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n " << " Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n " << "Map:";
+ std::cout << "\n " << "Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n " << "Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n " << "Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n " << "Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n " << "Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n " << "Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n " << "Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+
+ std::cout << "\n\n " << "Reduce:";
+ std::cout << "\n " << "Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n " << "Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n " << "Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n " << "Number of Result Files : " << result.counters.num_result_files;
+ if (result.reduce_times.size() > 0)
+ {
+ std::cout << "\n " << "Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n " << "Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n " << "Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ }
+
+ return 0;
+}

Added: sandbox/libs/mapreduce/examples/wordcount/wordcount.vcproj
==============================================================================
--- (empty file)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.vcproj 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -0,0 +1,206 @@
+<?xml version="1.0" encoding="Windows-1252"?>
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="8.00"
+ Name="wordcount"
+ ProjectGUID="{AB0444E8-E927-470A-BF0B-A60E67F91B06}"
+ RootNamespace="wordcount"
+ Keyword="Win32Proj"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)\compiler"
+ ConfigurationType="1"
+ CharacterSet="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions="WIN32_LEAN_AND_MEAN"
+ MinimalRebuild="true"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="0"
+ WarningLevel="4"
+ WarnAsError="true"
+ Detect64BitPortabilityProblems="true"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories=""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeForWindows98="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCWebDeploymentTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)\compiler"
+ ConfigurationType="1"
+ CharacterSet="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ InlineFunctionExpansion="2"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions="WIN32_LEAN_AND_MEAN;BOOST_LIB_DIAGNOSTIC"
+ RuntimeLibrary="2"
+ UsePrecompiledHeader="0"
+ WarningLevel="4"
+ WarnAsError="true"
+ Detect64BitPortabilityProblems="true"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ LinkIncremental="1"
+ AdditionalLibraryDirectories=""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeReferences="2"
+ EnableCOMDATFolding="2"
+ OptimizeForWindows98="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCWebDeploymentTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\wordcount.cpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ </Filter>
+ <Filter
+ Name="Resource Files"
+ Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav"
+ UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"
+ >
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>

Modified: sandbox/libs/mapreduce/mapreduce.sln
==============================================================================
--- sandbox/libs/mapreduce/mapreduce.sln (original)
+++ sandbox/libs/mapreduce/mapreduce.sln 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -1,9 +1,10 @@
-
 Microsoft Visual Studio Solution File, Format Version 9.00
 # Visual Studio 2005
 Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "mapreduce", "mapreduce.vcproj", "{F1A9A9FC-ACE9-4F93-8162-B888697FD81B}"
 EndProject
-Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "wordcount", "test\wordcount\wordcount.vcproj", "{AB0444E8-E927-470A-BF0B-A60E67F91B06}"
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "wordcount", "examples\wordcount\wordcount.vcproj", "{AB0444E8-E927-470A-BF0B-A60E67F91B06}"
+EndProject
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "mrtest", "test\mrtest.vcproj", "{3BC934D3-0EF5-4F82-B902-C1EC4527574D}"
 EndProject
 Global
         GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -19,6 +20,10 @@
                 {AB0444E8-E927-470A-BF0B-A60E67F91B06}.Debug|Win32.Build.0 = Debug|Win32
                 {AB0444E8-E927-470A-BF0B-A60E67F91B06}.Release|Win32.ActiveCfg = Release|Win32
                 {AB0444E8-E927-470A-BF0B-A60E67F91B06}.Release|Win32.Build.0 = Release|Win32
+ {3BC934D3-0EF5-4F82-B902-C1EC4527574D}.Debug|Win32.ActiveCfg = Debug|Win32
+ {3BC934D3-0EF5-4F82-B902-C1EC4527574D}.Debug|Win32.Build.0 = Debug|Win32
+ {3BC934D3-0EF5-4F82-B902-C1EC4527574D}.Release|Win32.ActiveCfg = Release|Win32
+ {3BC934D3-0EF5-4F82-B902-C1EC4527574D}.Release|Win32.Build.0 = Release|Win32
         EndGlobalSection
         GlobalSection(SolutionProperties) = preSolution
                 HideSolutionNode = FALSE

Modified: sandbox/libs/mapreduce/mapreduce.vcproj
==============================================================================
--- sandbox/libs/mapreduce/mapreduce.vcproj (original)
+++ sandbox/libs/mapreduce/mapreduce.vcproj 2009-07-26 11:48:09 EDT (Sun, 26 Jul 2009)
@@ -223,6 +223,10 @@
                         UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"
>
                 </Filter>
+ <File
+ RelativePath=".\test\mrtest.vcproj"
+ >
+ </File>
         </Files>
         <Globals>
         </Globals>


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk