Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55298 - in sandbox/boost/mapreduce: . intermediates schedule_policy
From: cdm.henderson_at_[hidden]
Date: 2009-07-30 16:21:19


Author: chenderson
Date: 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
New Revision: 55298
URL: http://svn.boost.org/trac/boost/changeset/55298

Log:
Tidied up 'intermediates' interface
Text files modified:
   sandbox/boost/mapreduce/intermediates.hpp | 6 +
   sandbox/boost/mapreduce/intermediates/local_disk.hpp | 108 +++++++++++++++++++++++++++++-------
   sandbox/boost/mapreduce/job.hpp | 116 ++++++++-------------------------------
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp | 31 +++-------
   sandbox/boost/mapreduce/schedule_policy/sequential.hpp | 20 ++++--
   5 files changed, 138 insertions(+), 143 deletions(-)

Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp (original)
+++ sandbox/boost/mapreduce/intermediates.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -10,5 +10,9 @@
 // For more information, see http://www.boost.org/libs/mapreduce/
 //
  
-//#include <boost/mapreduce/intermediates/in_memory.hpp>
+//!!!
+#ifdef USE_IN_MEMORY_INTERMEDIATES
+#include <boost/mapreduce/intermediates/in_memory.hpp>
+#endif // USE_IN_MEMORY_INTERMEDIATES
+
 #include <boost/mapreduce/intermediates/local_disk.hpp>

Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -102,7 +102,11 @@
 {
   private:
     typedef
+#ifdef _DEBUG
+ std::map<
+#else
     boost::unordered_map<
+#endif
         size_t, // hash value of intermediate key (R)
         std::pair<
             std::string, // filename
@@ -111,7 +115,6 @@
 
   public:
     typedef MapTask map_task_type;
- typedef typename intermediates_t::const_iterator const_iterator;
 
     local_disk(unsigned const num_partitions)
       : num_partitions_(num_partitions)
@@ -125,12 +128,12 @@
             this->close_files();
 
             // delete the temporary files
- for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
                 boost::filesystem::remove(it->second.first);
         }
- catch (std::exception const &/*e*/)
+ catch (std::exception const &e)
         {
-// std::cerr << "\nError: " << e.what() << "\n";
+ std::cerr << "\nError: " << e.what() << "\n";
         }
     }
 
@@ -138,7 +141,7 @@
                       typename map_task_type::intermediate_value_type const &value)
     {
         unsigned const partition = partitioner_(key, num_partitions_);
- intermediates_t::iterator it = intermediates_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
+ intermediates_t::iterator it = intermediate_files_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
         if (it->second.first.empty())
         {
             it->second.first = platform::get_temporary_filename();
@@ -156,20 +159,11 @@
         return !(it->second.second->bad() || it->second.second->fail());
     }
 
- bool const get_partition_filename(size_t const partition, std::string &filename) const
- {
- intermediates_t::const_iterator it=completed_intermediates_.find(partition);
- if (it == completed_intermediates_.end())
- return false;
- filename = it->second.first;
- return true;
- }
-
     template<typename FnObj>
     void combine(FnObj &fn_obj)
     {
         this->close_files();
- for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
         {
             std::string infilename = it->second.first;
             std::string outfilename = platform::get_temporary_filename();
@@ -208,19 +202,88 @@
 
             boost::filesystem::remove(infilename);
         }
- this->close_files();
 
- assert(completed_intermediates_.size() == 0);
- std::swap(completed_intermediates_, intermediates_);
+ this->close_files();
     }
 
     void combine(mapreduce::null_combiner &/*fn_obj*/)
     {
         this->close_files();
- assert(completed_intermediates_.size() == 0);
- std::swap(completed_intermediates_, intermediates_);
     }
 
+ void merge_from(local_disk &other)
+ {
+ BOOST_ASSERT(num_partitions_ == other.num_partitions_);
+
+ for (unsigned partition=0; partition<num_partitions_; ++partition)
+ {
+ intermediates_t::iterator ito = other.intermediate_files_.find(partition);
+ BOOST_ASSERT(ito != other.intermediate_files_.end());
+
+ intermediates_t::iterator it = intermediate_files_.find(partition);
+ if (it == intermediate_files_.end())
+ {
+ intermediate_files_.insert(
+ make_pair(
+ partition,
+ std::make_pair(
+ ito->second.first,
+ boost::shared_ptr<std::ofstream>(new std::ofstream))));
+ }
+ else
+ {
+ std::list<std::string> filenames;
+ filenames.push_back(it->second.first);
+ filenames.push_back(ito->second.first);
+ it->second.first = merge_and_sort(filenames);
+ it->second.second->close();
+ }
+ other.intermediate_files_.erase(partition);
+ }
+ }
+
+ template<typename Callback>
+ void reduce(unsigned const partition, Callback &callback, results &result)
+ {
+ typename map_task_type::intermediate_key_type key;
+ typename map_task_type::intermediate_key_type last_key;
+ typename map_task_type::intermediate_value_type value;
+ std::list<typename map_task_type::intermediate_value_type> values;
+
+ std::list<std::string> filenames;
+ intermediates_t::const_iterator it = intermediate_files_.find(partition);
+ BOOST_ASSERT(it != intermediate_files_.end());
+
+ std::string const &filename = it->second.first;
+ std::ifstream infile(filename.c_str());
+ while (read_record(infile, key, value))
+ {
+ if (key != last_key && length(key) > 0)
+ {
+ if (length(last_key) > 0)
+ {
+ ++result.counters.reduce_keys_executed;
+ callback(last_key, values.begin(), values.end());
+ values.clear();
+ }
+ if (length(key) > 0)
+ std::swap(key, last_key);
+ }
+
+ values.push_back(value);
+ }
+
+ if (length(last_key) > 0)
+ {
+ ++result.counters.reduce_keys_executed;
+ callback(last_key, values.begin(), values.end());
+ }
+
+ infile.close();
+ boost::filesystem::remove(filename.c_str());
+ }
+
+ protected:
     static bool const read_record(std::ifstream &infile,
                                   typename map_task_type::intermediate_key_type &key,
                                   typename map_task_type::intermediate_value_type &value)
@@ -269,15 +332,14 @@
   private:
     void close_files(void)
     {
- for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
             if (it->second.second && it->second.second->is_open())
                 it->second.second->close();
     }
 
   private:
     unsigned const num_partitions_;
- intermediates_t intermediates_;
- intermediates_t completed_intermediates_;
+ intermediates_t intermediate_files_;
     PartitionFn partitioner_;
 };
 

Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -75,9 +75,6 @@
             combiner_type combiner;
             intermediate_store_.combine(combiner);
 
- // notify the job of the final partition files
- notify_runtime_partition_files();
-
             return *this;
         }
 
@@ -87,17 +84,9 @@
             return intermediate_store_.insert(key, value);
         }
 
- protected:
- void notify_runtime_partition_files(void) const
+ intermediate_store_type &intermediate_store(void)
         {
- for (unsigned partition=0; partition<job_.number_of_partitions(); ++partition)
- {
- std::string filename;
- if (intermediate_store_.get_partition_filename(partition, filename))
- {
- job_.new_partition_file(partition, filename);
- }
- }
+ return intermediate_store_;
         }
 
       private:
@@ -143,24 +132,11 @@
 
     job(datasource_type &datasource, specification const &spec)
       : datasource_(datasource),
- specification_(spec)
+ specification_(spec),
+ intermediate_store_(specification_.reduce_tasks)
      {
      }
 
- ~job()
- {
- try
- {
- for (partition_files_t::iterator itp=partition_files_.begin(); itp!=partition_files_.end(); ++itp)
- std::for_each(itp->second.begin(), itp->second.end(), boost::bind(detail::delete_file, _1));
- }
- catch (std::exception const &e)
- {
- std::cerr << "\nError: " << e.what() << "\n";
- }
- partition_files_.clear();
- }
-
     bool const get_next_map_key(void *&key)
     {
         std::auto_ptr<typename map_task_type::key_type> next_key(new typename map_task_type::key_type);
@@ -170,11 +146,6 @@
         return true;
     }
 
- void new_partition_file(unsigned const partition, std::string const &filename)
- {
- partition_files_.insert(std::make_pair(partition, partition_files_t::mapped_type())).first->second.push_back(filename);
- }
-
     template<typename SchedulePolicy>
     void run(results &result)
     {
@@ -190,14 +161,19 @@
         result.job_runtime = time(NULL) - start_time;
     }
 
- bool const run_map_task(void *key, results &result)
+ template<typename Sync>
+ bool const run_map_task(void *key, results &result, Sync &sync)
     {
         bool success = true;
         time_t const start_time = time(NULL);
 
         try
         {
- std::auto_ptr<typename map_task_type::key_type> map_key_ptr(reinterpret_cast<typename map_task_type::key_type *>(key));
+ std::auto_ptr<typename map_task_type::key_type>
+ map_key_ptr(
+ reinterpret_cast<
+ typename map_task_type::key_type *>(key));
+
             typename map_task_type::key_type &map_key = *map_key_ptr;
 
             // get the data
@@ -208,7 +184,8 @@
             ++result.counters.map_keys_executed;
 
             // Map Task
- if (map_key == typename map_task_type::key_type() || value == typename map_task_type::value_type())
+ if (map_key == typename map_task_type::key_type()
+ || value == typename map_task_type::value_type())
             {
                 BOOST_ASSERT(map_key != typename map_task_type::key_type());
                 BOOST_ASSERT(value != typename map_task_type::value_type());
@@ -219,6 +196,11 @@
             map_task_runner runner(*this);
             runner(map_key, value);
 
+ // merge the map task intermediate results into the job
+ sync.lock();
+ intermediate_store_.merge_from(runner.intermediate_store());
+ sync.unlock();
+
             ++result.counters.map_keys_completed;
         }
         catch (std::exception &)
@@ -241,67 +223,23 @@
         return specification_.map_tasks;
     }
 
- // the caller must synchronise calls to this function from multiple threads
- bool const get_partition_filenames(unsigned &partition, filenames_t &filenames)
- {
- partition_files_t::iterator itp = partition_files_.find(partition);
- if (itp == partition_files_.end())
- return false;
- else if (itp->second.size() == 0)
- return get_partition_filenames(++partition, filenames);
-
- std::swap(itp->second, filenames);
- return true;
- }
-
- bool const run_reduce_task(unsigned const partition, filenames_t const &filenames, results &result)
+ bool const run_reduce_task(unsigned const partition, results &result)
     {
         time_t const start_time = time(NULL);
 
         try
         {
- std::string const filename = intermediate_store_type::merge_and_sort(filenames);
-
- typename map_task_type::intermediate_key_type key;
- typename map_task_type::intermediate_key_type last_key;
- typename map_task_type::intermediate_value_type value;
- std::list<typename map_task_type::intermediate_value_type> values;
-
             reduce_task_runner runner(
                 specification_.output_filespec,
                 partition,
                 number_of_partitions());
 
- std::ifstream infile(filename.c_str());
- while (intermediate_store_type::read_record(infile, key, value))
- {
- if (key != last_key && length(key) > 0)
- {
- if (length(last_key) > 0)
- {
- ++result.counters.reduce_keys_executed;
- runner(last_key, values.begin(), values.end());
- values.clear();
- }
- if (length(key) > 0)
- std::swap(key, last_key);
- }
-
- values.push_back(value);
- }
-
- if (length(last_key) > 0)
- {
- ++result.counters.reduce_keys_executed;
- runner(last_key, values.begin(), values.end());
- }
-
- infile.close();
- boost::filesystem::remove(filename.c_str());
+ intermediate_store_.reduce(partition, runner, result);
         }
- catch (std::exception &)
+ catch (std::exception &e)
         {
- ++result.counters.reduce_key_errors;
+ std::cerr << "\nError: " << e.what() << "\n";
+ ++result.counters.reduce_key_errors;
         }
         
         result.reduce_times.push_back(time(NULL)-start_time);
@@ -310,15 +248,9 @@
     }
 
   private:
- typedef
- boost::unordered_map<
- unsigned, // partition
- filenames_t> // file names
- partition_files_t;
-
     datasource_type &datasource_;
- partition_files_t partition_files_;
     specification const &specification_;
+ intermediate_store_type intermediate_store_;
 };
 
 template<>

Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -24,7 +24,7 @@
 namespace detail {
 
 template<typename Job>
-inline void run_next_map_task(Job &job, results &result, boost::mutex &m)
+inline void run_next_map_task(Job &job, results &result, boost::mutex &m1, boost::mutex &m2)
 {
     try
     {
@@ -33,12 +33,12 @@
         {
             void *key = 0;
 
- m.lock();
+ m1.lock();
             run = job.get_next_map_key(key);
- m.unlock();
+ m1.unlock();
 
             if (run)
- job.run_map_task(key, result);
+ job.run_map_task(key, result, m2);
         }
     }
     catch (std::exception &e)
@@ -48,22 +48,11 @@
 }
 
 template<typename Job>
-inline void run_next_reduce_task(Job &job, unsigned &partition, results &result, boost::mutex &m)
+inline void run_next_reduce_task(Job &job, unsigned &partition, results &result)
 {
     try
     {
- bool run = true;
- while (run)
- {
- typename Job::filenames_t filenames;
-
- m.lock();
- run = job.get_partition_filenames(partition, filenames);
- m.unlock();
-
- if (run)
- job.run_reduce_task(partition, filenames, result);
- }
+ job.run_reduce_task(partition, result);
     }
     catch (std::exception &e)
     {
@@ -84,7 +73,7 @@
 
         typedef std::vector<boost::shared_ptr<results> > all_results_t;
         all_results_t all_results;
- boost::mutex m;
+ boost::mutex m1, m2;
 
         // run the Map Tasks
         time_t start_time = time(NULL);
@@ -102,7 +91,8 @@
                     detail::run_next_map_task<Job>,
                     boost::ref(job),
                     boost::ref(*this_result),
- boost::ref(m));
+ boost::ref(m1),
+ boost::ref(m2));
             map_threads.add_thread(thread);
         }
         map_threads.join_all();
@@ -126,8 +116,7 @@
                     detail::run_next_reduce_task<Job>,
                     boost::ref(job),
                     boost::ref(partition),
- boost::ref(*this_result),
- boost::ref(m));
+ boost::ref(*this_result));
             reduce_threads.add_thread(thread);
         }
         reduce_threads.join_all();

Modified: sandbox/boost/mapreduce/schedule_policy/sequential.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/sequential.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/sequential.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -19,6 +19,17 @@
 
 namespace schedule_policy {
 
+namespace detail {
+
+struct null_lock
+{
+ void lock(void) { }
+ void unlock(void) { }
+};
+
+} // namespace detail
+
+
 template<typename Job>
 class sequential
 {
@@ -28,18 +39,15 @@
         // Map Tasks
         time_t start_time = time(NULL);
         void *key = 0;
- while (job.get_next_map_key(key) && job.run_map_task(key, result))
+ detail::null_lock nolock;
+ while (job.get_next_map_key(key) && job.run_map_task(key, result, nolock))
             ;
         result.map_runtime = time(NULL) - start_time;
 
         // Reduce Tasks
         start_time = time(NULL);
         for (unsigned partition=0; partition<job.number_of_partitions(); ++partition)
- {
- typename Job::filenames_t filenames;
- if (job.get_partition_filenames(partition, filenames))
- job.run_reduce_task(partition, filenames, result);
- }
+ job.run_reduce_task(partition, result);
         result.reduce_runtime = time(NULL) - start_time;
 
         result.counters.actual_map_tasks = 1;


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk