Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55343 - in sandbox: boost/mapreduce boost/mapreduce/intermediates libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-01 10:15:07


Author: chenderson
Date: 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
New Revision: 55343
URL: http://svn.boost.org/trac/boost/changeset/55343

Log:
Refined the type definitions for key/value pair types in Map and Reduce Tasks
Text files modified:
   sandbox/boost/mapreduce/intermediates/in_memory.hpp | 15 ++++++++-------
   sandbox/boost/mapreduce/intermediates/local_disk.hpp | 25 +++++++++++++------------
   sandbox/boost/mapreduce/job.hpp | 21 ++++++++-------------
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp | 14 ++++++--------
   sandbox/libs/mapreduce/test/mrtest.cpp | 29 +++++++++++++++++------------
   5 files changed, 52 insertions(+), 52 deletions(-)

Modified: sandbox/boost/mapreduce/intermediates/in_memory.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/in_memory.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/in_memory.hpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -29,8 +29,8 @@
     {
     }
 
- void operator()(typename MapTask::intermediate_key_type const &/*key*/,
- typename ReduceTask::value_type const &/*value*/)
+ void operator()(typename ReduceTask::key_type const &/*key*/,
+ typename ReduceTask::value_type const &/*value*/)
     {
     }
 };
@@ -46,12 +46,13 @@
     typedef
     std::vector<
         std::map<
- typename MapTask::intermediate_key_type,
- std::list<typename MapTask::intermediate_value_type> > >
+ typename ReduceTask::key_type,
+ std::list<typename ReduceTask::value_type> > >
     intermediates_t;
 
   public:
- typedef MapTask map_task_type;
+ typedef MapTask map_task_type;
+ typedef ReduceTask reduce_task_type;
     typedef reduce_null_output<MapTask, ReduceTask> store_result_type;
 
     in_memory(unsigned const num_partitions)
@@ -99,8 +100,8 @@
     }
 
 
- bool const insert(typename map_task_type::intermediate_key_type const &key,
- typename map_task_type::intermediate_value_type const &value)
+ bool const insert(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
     {
         typename intermediates_t::value_type &map = intermediates_[partitioner_(key, num_partitions_)];
 

Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -108,8 +108,8 @@
         output_file_.open(filename_.c_str());
     }
 
- void operator()(typename MapTask::intermediate_key_type const &key,
- typename ReduceTask::value_type const &value)
+ void operator()(typename ReduceTask::key_type const &key,
+ typename ReduceTask::value_type const &value)
     {
         output_file_ << key << "\t" << value << "\n";
     }
@@ -138,7 +138,8 @@
     intermediates_t;
 
   public:
- typedef MapTask map_task_type;
+ typedef MapTask map_task_type;
+ typedef ReduceTask reduce_task_type;
     typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
 
     local_disk(unsigned const num_partitions)
@@ -162,8 +163,8 @@
         }
     }
 
- bool const insert(typename map_task_type::intermediate_key_type const &key,
- typename map_task_type::intermediate_value_type const &value)
+ bool const insert(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
     {
         unsigned const partition = partitioner_(key, num_partitions_);
 
@@ -208,7 +209,7 @@
             std::ifstream infile(infilename.c_str());
             while (!infile.eof())
             {
- typename map_task_type::intermediate_value_type value;
+ typename reduce_task_type::value_type value;
                 if (read_record(infile, key, value))
                 {
                     if (key != last_key && key.length() > 0)
@@ -276,10 +277,10 @@
     template<typename Callback>
     void reduce(unsigned const partition, Callback &callback, results &result)
     {
- typename map_task_type::intermediate_key_type key;
- typename map_task_type::intermediate_key_type last_key;
- typename map_task_type::intermediate_value_type value;
- std::list<typename map_task_type::intermediate_value_type> values;
+ typename reduce_task_type::key_type key;
+ typename reduce_task_type::key_type last_key;
+ typename reduce_task_type::value_type value;
+ std::list<typename reduce_task_type::value_type> values;
 
         std::list<std::string> filenames;
         intermediates_t::const_iterator it = intermediate_files_.find(partition);
@@ -316,8 +317,8 @@
 
   protected:
     static bool const read_record(std::ifstream &infile,
- typename map_task_type::intermediate_key_type &key,
- typename map_task_type::intermediate_value_type &value)
+ typename reduce_task_type::key_type &key,
+ typename reduce_task_type::value_type &value)
     {
 #if defined(__SGI_STL_PORT)
         size_t keylen;

Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -19,24 +19,19 @@
 
 template<typename T> size_t length(T const &str);
 
-template<
- typename MapKey,
- typename MapValue,
- typename IntermediateKey,
- typename IntermediateValue>
+template<typename MapKey, typename MapValue>
 class map_task
 {
   public:
     typedef MapKey key_type;
     typedef MapValue value_type;
- typedef IntermediateKey intermediate_key_type;
- typedef IntermediateValue intermediate_value_type;
 };
 
-template<typename ReduceValue>
+template<typename ReduceKey, typename ReduceValue>
 class reduce_task
 {
   public:
+ typedef ReduceKey key_type;
     typedef ReduceValue value_type;
 };
 
@@ -78,8 +73,8 @@
             return *this;
         }
 
- bool const emit_intermediate(typename map_task_type::intermediate_key_type const &key,
- typename map_task_type::intermediate_value_type const &value)
+ bool const emit_intermediate(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
         {
             return intermediate_store_.insert(key, value);
         }
@@ -105,14 +100,14 @@
         {
         }
 
- void emit(typename map_task_type::intermediate_key_type const &key,
- typename reduce_task_type::value_type const &value)
+ void emit(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
         {
             store_result_(key, value);
         }
 
         template<typename It>
- void operator()(typename map_task_type::intermediate_key_type const &key, It it, It ite)
+ void operator()(typename reduce_task_type::key_type const &key, It it, It ite)
         {
             reduce_task_type::reduce(*this, key, it, ite);
         }

Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp (original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -29,10 +29,8 @@
 namespace wordcount {
 
 struct map_task : public boost::mapreduce::map_task<
- std::string, // MapKey
- std::pair<char const *, char const *>, // MapValue
- std::string, // IntermediateKey
- unsigned> // IntermediateValue
+ std::string, // MapKey
+ std::pair<char const *, char const *> > // MapValue
 {
     template<typename Runtime>
     static void map(Runtime &runtime, std::string const &/*key*/, value_type &value)
@@ -79,7 +77,7 @@
     }
 };
 
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
 {
     template<typename Runtime, typename It>
     static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -98,19 +96,19 @@
         intermediate_store.combine(instance);
     }
 
- void start(map_task::intermediate_key_type const &)
+ void start(reduce_task::key_type const &)
     {
         total_ = 0;
     }
 
     template<typename IntermediateStore>
- void finish(map_task::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+ void finish(reduce_task::key_type const &key, IntermediateStore &intermediate_store)
     {
         if (total_ > 0)
             intermediate_store.insert(key, total_);
     }
 
- void operator()(map_task::intermediate_value_type const &value)
+ void operator()(reduce_task::value_type const &value)
     {
         total_ += value;
     }

Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -50,18 +50,14 @@
 map_value_type;
 
 template<typename T>
-struct map_task : public boost::mapreduce::map_task<
- std::string, // MapKey
- map_value_type, // MapValue
- std::string, // IntermediateKey
- unsigned> // IntermediateValue
+struct map_task
+ : public boost::mapreduce::map_task<std::string, map_value_type>
 {
     template<typename Runtime>
     static void map(Runtime &runtime, std::string const &key, T &value);
 };
-typedef map_task<map_value_type> map_task_type;
 
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
 {
     template<typename Runtime, typename It>
     static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -167,6 +163,8 @@
 
 class combiner;
 
+typedef map_task<map_value_type> map_task_type;
+
 typedef
 boost::mapreduce::job<
     wordcount::map_task_type
@@ -197,19 +195,19 @@
         intermediate_store.combine(instance);
     }
 
- void start(job::map_task_type::intermediate_key_type const &)
+ void start(job::reduce_task_type::key_type const &)
     {
         total_ = 0;
     }
 
     template<typename IntermediateStore>
- void finish(job::map_task_type::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+ void finish(job::reduce_task_type::key_type const &key, IntermediateStore &intermediate_store)
     {
         if (total_ > 0)
             intermediate_store.insert(key, total_);
     }
 
- void operator()(job::map_task_type::intermediate_value_type const &value)
+ void operator()(job::reduce_task_type::value_type const &value)
     {
         total_ += value;
     }
@@ -256,7 +254,7 @@
         spec.map_tasks = 1;
         spec.reduce_tasks = 1;
 
- wordcount::job job(datasource, spec);
+ wordcount::job job(datasource, spec);
         job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
         std::cout << "\nFinished.";
 #else
@@ -266,7 +264,12 @@
         if (argc > 2)
             spec.map_tasks = atoi(argv[2]);
 
- boost::mapreduce::run<wordcount::job>(spec, result);
+ // this method can be called, but since we want access to the result data,
+ // we need to have a job object to interrogate
+ //boost::mapreduce::run<wordcount::job>(spec, result);
+
+ wordcount::job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
         std::cout << "\nCPU Parallel MapReduce Finished.";
 #endif
     }
@@ -275,6 +278,8 @@
         std::cout << std::endl << "Error: " << e.what();
     }
 
+ typedef std::pair<wordcount::reduce_task::key_type, wordcount::reduce_task::value_type> keyvalue_t;
+
     std::cout << std::endl << "\n" << "MapReduce statistics:";
     std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
     std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk