Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55327 - in sandbox: boost/mapreduce boost/mapreduce/intermediates libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-07-31 17:36:01


Author: chenderson
Date: 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
New Revision: 55327
URL: http://svn.boost.org/trac/boost/changeset/55327

Log:
Added memory-based intermediate handling policy
Text files modified:
   sandbox/boost/mapreduce/hash_partitioner.hpp | 2 +
   sandbox/boost/mapreduce/intermediates.hpp | 7 +----
   sandbox/boost/mapreduce/intermediates/local_disk.hpp | 46 ++++++++++++++++++++++++++++++++-------
   sandbox/boost/mapreduce/job.hpp | 22 ++++++-------------
   sandbox/libs/mapreduce/test/mrtest.cpp | 25 ++++++++++++---------
   5 files changed, 62 insertions(+), 40 deletions(-)

Modified: sandbox/boost/mapreduce/hash_partitioner.hpp
==============================================================================
--- sandbox/boost/mapreduce/hash_partitioner.hpp (original)
+++ sandbox/boost/mapreduce/hash_partitioner.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -12,6 +12,8 @@
 
 #ifndef BOOST_MAPREDUCE_HASH_PARTITONER_HPP
 #define BOOST_MAPREDUCE_HASH_PARTITONER_HPP
+
+#include <boost/functional/hash.hpp>
  
 namespace boost {
 

Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp (original)
+++ sandbox/boost/mapreduce/intermediates.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -10,9 +10,6 @@
 // For more information, see http://www.boost.org/libs/mapreduce/
 //
  
-//!!!
-#ifdef USE_IN_MEMORY_INTERMEDIATES
-#include <boost/mapreduce/intermediates/in_memory.hpp>
-#endif // USE_IN_MEMORY_INTERMEDIATES
-
+#include <boost/mapreduce/hash_partitioner.hpp>
 #include <boost/mapreduce/intermediates/local_disk.hpp>
+#include <boost/mapreduce/intermediates/in_memory.hpp>

Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -13,10 +13,8 @@
 #ifndef BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
 #define BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
 
-#include <iomanip> // setw
-#include <fstream> // linux
-#include <boost/unordered_map.hpp>
-#include <boost/mapreduce/hash_partitioner.hpp>
+#include <iomanip> // setw
+#include <fstream> // linux
 
 namespace boost {
 
@@ -93,8 +91,35 @@
 
 namespace intermediates {
 
+template<typename MapTask, typename ReduceTask>
+class reduce_file_output
+{
+ public:
+ reduce_file_output(std::string const &output_filespec,
+ unsigned const partition,
+ unsigned const num_partitions)
+ {
+ std::ostringstream filename;
+ filename << output_filespec << partition+1 << "_of_" << num_partitions;
+ filename_ = filename.str();
+ output_file_.open(filename_.c_str());
+ }
+
+ void operator()(typename MapTask::intermediate_key_type const &key,
+ typename ReduceTask::value_type const &value)
+ {
+ output_file_ << key << "\t" << value << "\n";
+ }
+
+ private:
+ std::string filename_;
+ std::ofstream output_file_;
+};
+
+
 template<
     typename MapTask,
+ typename ReduceTask,
     typename SortFn=mapreduce::detail::file_sorter,
     typename MergeFn=mapreduce::detail::file_merger,
     typename PartitionFn=mapreduce::hash_partitioner>
@@ -102,11 +127,7 @@
 {
   private:
     typedef
-#ifdef _DEBUG
     std::map<
-#else
- boost::unordered_map<
-#endif
         size_t, // hash value of intermediate key (R)
         std::pair<
             std::string, // filename
@@ -115,6 +136,7 @@
 
   public:
     typedef MapTask map_task_type;
+ typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
 
     local_disk(unsigned const num_partitions)
       : num_partitions_(num_partitions)
@@ -141,7 +163,13 @@
                       typename map_task_type::intermediate_value_type const &value)
     {
         unsigned const partition = partitioner_(key, num_partitions_);
- intermediates_t::iterator it = intermediate_files_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
+
+ intermediates_t::iterator it =
+ intermediate_files_.insert(
+ make_pair(
+ partition,
+ intermediates_t::mapped_type())).first;
+
         if (it->second.first.empty())
         {
             it->second.first = platform::get_temporary_filename();

Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -44,7 +44,8 @@
          typename ReduceTask,
          typename Combiner=null_combiner,
          typename Datasource=datasource::directory_iterator<MapTask>,
- typename IntermediateStore=intermediates::local_disk<MapTask> >
+ typename IntermediateStore=intermediates::in_memory<MapTask, ReduceTask>,
+ typename StoreResult=typename IntermediateStore::store_result_type>
 class job : private boost::noncopyable
 {
   public:
@@ -99,19 +100,16 @@
       public:
         reduce_task_runner(
             std::string const &output_filespec,
- size_t const partition,
- size_t const num_partitions)
+ unsigned const partition,
+ unsigned const num_partitions)
+ : store_result_(output_filespec, partition, num_partitions)
         {
- std::ostringstream filename;
- filename << output_filespec << partition+1 << "_of_" << num_partitions;
- filename_ = filename.str();
- output_file_.open(filename_.c_str());
         }
 
         void emit(typename map_task_type::intermediate_key_type const &key,
                   typename reduce_task_type::value_type const &value)
         {
- output_file_ << key << "\t" << value << "\n";
+ store_result_(key, value);
         }
 
         template<typename It>
@@ -120,14 +118,8 @@
             reduce_task_type::reduce(*this, key, it, ite);
         }
 
- std::string const &filename(void) const
- {
- return filename_;
- }
-
       private:
- std::string filename_;
- std::ofstream output_file_;
+ StoreResult store_result_;
     };
 
     job(datasource_type &datasource, specification const &spec)

Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -9,6 +9,17 @@
 //
 // For more information, see http://www.boost.org/libs/mapreduce/
 //
+
+// configuration options
+#define WORD_COUNT_MEMORY_MAP_FILE
+#define USE_WORDCOUNT_COMBINER
+#define USE_IN_MEMORY_INTERMEDIATES
+
+#if defined(_DEBUG)
+# define RUN_SEQUENTIAL_MAP_REDUCE
+#else
+# define BOOST_DISABLE_ASSERTS
+#endif
  
 #if !defined(_DEBUG) && !defined(BOOST_DISABLE_ASSERTS)
 # pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
@@ -26,15 +37,6 @@
 #include <crtdbg.h>
 #endif
 
-// configuration options
-#define WORD_COUNT_MEMORY_MAP_FILE
-#define USE_WORDCOUNT_COMBINER
-//#define USE_IN_MEMORY_INTERMEDIATES
-
-#if defined(_DEBUG)
-#define RUN_SEQUENTIAL_MAP_REDUCE
-#endif
-
 namespace wordcount {
 
 typedef
@@ -172,9 +174,9 @@
 #else
   , boost::mapreduce::null_combiner
 #endif
-#ifdef USE_IN_MEMORY_INTERMEDIATES
+#ifndef USE_IN_MEMORY_INTERMEDIATES
   , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
- , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type>
+ , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
 #endif
> job;
 
@@ -244,6 +246,7 @@
         std::cout << "\nFinished.";
 #else
         std::cout << "\nRunning CPU Parallel MapReduce...";
+ spec.reduce_tasks = 1;
 
         if (argc > 2)
             spec.map_tasks = atoi(argv[2]);


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk