|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r55298 - in sandbox/boost/mapreduce: . intermediates schedule_policy
From: cdm.henderson_at_[hidden]
Date: 2009-07-30 16:21:19
Author: chenderson
Date: 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
New Revision: 55298
URL: http://svn.boost.org/trac/boost/changeset/55298
Log:
Tidied up 'intermediates' interface
Text files modified:
sandbox/boost/mapreduce/intermediates.hpp | 6 +
sandbox/boost/mapreduce/intermediates/local_disk.hpp | 108 +++++++++++++++++++++++++++++-------
sandbox/boost/mapreduce/job.hpp | 116 ++++++++-------------------------------
sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp | 31 +++-------
sandbox/boost/mapreduce/schedule_policy/sequential.hpp | 20 ++++--
5 files changed, 138 insertions(+), 143 deletions(-)
Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp (original)
+++ sandbox/boost/mapreduce/intermediates.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -10,5 +10,9 @@
// For more information, see http://www.boost.org/libs/mapreduce/
//
-//#include <boost/mapreduce/intermediates/in_memory.hpp>
+//!!!
+#ifdef USE_IN_MEMORY_INTERMEDIATES
+#include <boost/mapreduce/intermediates/in_memory.hpp>
+#endif // USE_IN_MEMORY_INTERMEDIATES
+
#include <boost/mapreduce/intermediates/local_disk.hpp>
Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -102,7 +102,11 @@
{
private:
typedef
+#ifdef _DEBUG
+ std::map<
+#else
boost::unordered_map<
+#endif
size_t, // hash value of intermediate key (R)
std::pair<
std::string, // filename
@@ -111,7 +115,6 @@
public:
typedef MapTask map_task_type;
- typedef typename intermediates_t::const_iterator const_iterator;
local_disk(unsigned const num_partitions)
: num_partitions_(num_partitions)
@@ -125,12 +128,12 @@
this->close_files();
// delete the temporary files
- for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
boost::filesystem::remove(it->second.first);
}
- catch (std::exception const &/*e*/)
+ catch (std::exception const &e)
{
-// std::cerr << "\nError: " << e.what() << "\n";
+ std::cerr << "\nError: " << e.what() << "\n";
}
}
@@ -138,7 +141,7 @@
typename map_task_type::intermediate_value_type const &value)
{
unsigned const partition = partitioner_(key, num_partitions_);
- intermediates_t::iterator it = intermediates_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
+ intermediates_t::iterator it = intermediate_files_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
if (it->second.first.empty())
{
it->second.first = platform::get_temporary_filename();
@@ -156,20 +159,11 @@
return !(it->second.second->bad() || it->second.second->fail());
}
- bool const get_partition_filename(size_t const partition, std::string &filename) const
- {
- intermediates_t::const_iterator it=completed_intermediates_.find(partition);
- if (it == completed_intermediates_.end())
- return false;
- filename = it->second.first;
- return true;
- }
-
template<typename FnObj>
void combine(FnObj &fn_obj)
{
this->close_files();
- for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
{
std::string infilename = it->second.first;
std::string outfilename = platform::get_temporary_filename();
@@ -208,19 +202,88 @@
boost::filesystem::remove(infilename);
}
- this->close_files();
- assert(completed_intermediates_.size() == 0);
- std::swap(completed_intermediates_, intermediates_);
+ this->close_files();
}
void combine(mapreduce::null_combiner &/*fn_obj*/)
{
this->close_files();
- assert(completed_intermediates_.size() == 0);
- std::swap(completed_intermediates_, intermediates_);
}
+ void merge_from(local_disk &other)
+ {
+ BOOST_ASSERT(num_partitions_ == other.num_partitions_);
+
+ for (unsigned partition=0; partition<num_partitions_; ++partition)
+ {
+ intermediates_t::iterator ito = other.intermediate_files_.find(partition);
+ BOOST_ASSERT(ito != other.intermediate_files_.end());
+
+ intermediates_t::iterator it = intermediate_files_.find(partition);
+ if (it == intermediate_files_.end())
+ {
+ intermediate_files_.insert(
+ make_pair(
+ partition,
+ std::make_pair(
+ ito->second.first,
+ boost::shared_ptr<std::ofstream>(new std::ofstream))));
+ }
+ else
+ {
+ std::list<std::string> filenames;
+ filenames.push_back(it->second.first);
+ filenames.push_back(ito->second.first);
+ it->second.first = merge_and_sort(filenames);
+ it->second.second->close();
+ }
+ other.intermediate_files_.erase(partition);
+ }
+ }
+
+ template<typename Callback>
+ void reduce(unsigned const partition, Callback &callback, results &result)
+ {
+ typename map_task_type::intermediate_key_type key;
+ typename map_task_type::intermediate_key_type last_key;
+ typename map_task_type::intermediate_value_type value;
+ std::list<typename map_task_type::intermediate_value_type> values;
+
+ std::list<std::string> filenames;
+ intermediates_t::const_iterator it = intermediate_files_.find(partition);
+ BOOST_ASSERT(it != intermediate_files_.end());
+
+ std::string const &filename = it->second.first;
+ std::ifstream infile(filename.c_str());
+ while (read_record(infile, key, value))
+ {
+ if (key != last_key && length(key) > 0)
+ {
+ if (length(last_key) > 0)
+ {
+ ++result.counters.reduce_keys_executed;
+ callback(last_key, values.begin(), values.end());
+ values.clear();
+ }
+ if (length(key) > 0)
+ std::swap(key, last_key);
+ }
+
+ values.push_back(value);
+ }
+
+ if (length(last_key) > 0)
+ {
+ ++result.counters.reduce_keys_executed;
+ callback(last_key, values.begin(), values.end());
+ }
+
+ infile.close();
+ boost::filesystem::remove(filename.c_str());
+ }
+
+ protected:
static bool const read_record(std::ifstream &infile,
typename map_task_type::intermediate_key_type &key,
typename map_task_type::intermediate_value_type &value)
@@ -269,15 +332,14 @@
private:
void close_files(void)
{
- for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ for (intermediates_t::iterator it=intermediate_files_.begin(); it!=intermediate_files_.end(); ++it)
if (it->second.second && it->second.second->is_open())
it->second.second->close();
}
private:
unsigned const num_partitions_;
- intermediates_t intermediates_;
- intermediates_t completed_intermediates_;
+ intermediates_t intermediate_files_;
PartitionFn partitioner_;
};
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -75,9 +75,6 @@
combiner_type combiner;
intermediate_store_.combine(combiner);
- // notify the job of the final partition files
- notify_runtime_partition_files();
-
return *this;
}
@@ -87,17 +84,9 @@
return intermediate_store_.insert(key, value);
}
- protected:
- void notify_runtime_partition_files(void) const
+ intermediate_store_type &intermediate_store(void)
{
- for (unsigned partition=0; partition<job_.number_of_partitions(); ++partition)
- {
- std::string filename;
- if (intermediate_store_.get_partition_filename(partition, filename))
- {
- job_.new_partition_file(partition, filename);
- }
- }
+ return intermediate_store_;
}
private:
@@ -143,24 +132,11 @@
job(datasource_type &datasource, specification const &spec)
: datasource_(datasource),
- specification_(spec)
+ specification_(spec),
+ intermediate_store_(specification_.reduce_tasks)
{
}
- ~job()
- {
- try
- {
- for (partition_files_t::iterator itp=partition_files_.begin(); itp!=partition_files_.end(); ++itp)
- std::for_each(itp->second.begin(), itp->second.end(), boost::bind(detail::delete_file, _1));
- }
- catch (std::exception const &e)
- {
- std::cerr << "\nError: " << e.what() << "\n";
- }
- partition_files_.clear();
- }
-
bool const get_next_map_key(void *&key)
{
std::auto_ptr<typename map_task_type::key_type> next_key(new typename map_task_type::key_type);
@@ -170,11 +146,6 @@
return true;
}
- void new_partition_file(unsigned const partition, std::string const &filename)
- {
- partition_files_.insert(std::make_pair(partition, partition_files_t::mapped_type())).first->second.push_back(filename);
- }
-
template<typename SchedulePolicy>
void run(results &result)
{
@@ -190,14 +161,19 @@
result.job_runtime = time(NULL) - start_time;
}
- bool const run_map_task(void *key, results &result)
+ template<typename Sync>
+ bool const run_map_task(void *key, results &result, Sync &sync)
{
bool success = true;
time_t const start_time = time(NULL);
try
{
- std::auto_ptr<typename map_task_type::key_type> map_key_ptr(reinterpret_cast<typename map_task_type::key_type *>(key));
+ std::auto_ptr<typename map_task_type::key_type>
+ map_key_ptr(
+ reinterpret_cast<
+ typename map_task_type::key_type *>(key));
+
typename map_task_type::key_type &map_key = *map_key_ptr;
// get the data
@@ -208,7 +184,8 @@
++result.counters.map_keys_executed;
// Map Task
- if (map_key == typename map_task_type::key_type() || value == typename map_task_type::value_type())
+ if (map_key == typename map_task_type::key_type()
+ || value == typename map_task_type::value_type())
{
BOOST_ASSERT(map_key != typename map_task_type::key_type());
BOOST_ASSERT(value != typename map_task_type::value_type());
@@ -219,6 +196,11 @@
map_task_runner runner(*this);
runner(map_key, value);
+ // merge the map task intermediate results into the job
+ sync.lock();
+ intermediate_store_.merge_from(runner.intermediate_store());
+ sync.unlock();
+
++result.counters.map_keys_completed;
}
catch (std::exception &)
@@ -241,67 +223,23 @@
return specification_.map_tasks;
}
- // the caller must synchronise calls to this function from multiple threads
- bool const get_partition_filenames(unsigned &partition, filenames_t &filenames)
- {
- partition_files_t::iterator itp = partition_files_.find(partition);
- if (itp == partition_files_.end())
- return false;
- else if (itp->second.size() == 0)
- return get_partition_filenames(++partition, filenames);
-
- std::swap(itp->second, filenames);
- return true;
- }
-
- bool const run_reduce_task(unsigned const partition, filenames_t const &filenames, results &result)
+ bool const run_reduce_task(unsigned const partition, results &result)
{
time_t const start_time = time(NULL);
try
{
- std::string const filename = intermediate_store_type::merge_and_sort(filenames);
-
- typename map_task_type::intermediate_key_type key;
- typename map_task_type::intermediate_key_type last_key;
- typename map_task_type::intermediate_value_type value;
- std::list<typename map_task_type::intermediate_value_type> values;
-
reduce_task_runner runner(
specification_.output_filespec,
partition,
number_of_partitions());
- std::ifstream infile(filename.c_str());
- while (intermediate_store_type::read_record(infile, key, value))
- {
- if (key != last_key && length(key) > 0)
- {
- if (length(last_key) > 0)
- {
- ++result.counters.reduce_keys_executed;
- runner(last_key, values.begin(), values.end());
- values.clear();
- }
- if (length(key) > 0)
- std::swap(key, last_key);
- }
-
- values.push_back(value);
- }
-
- if (length(last_key) > 0)
- {
- ++result.counters.reduce_keys_executed;
- runner(last_key, values.begin(), values.end());
- }
-
- infile.close();
- boost::filesystem::remove(filename.c_str());
+ intermediate_store_.reduce(partition, runner, result);
}
- catch (std::exception &)
+ catch (std::exception &e)
{
- ++result.counters.reduce_key_errors;
+ std::cerr << "\nError: " << e.what() << "\n";
+ ++result.counters.reduce_key_errors;
}
result.reduce_times.push_back(time(NULL)-start_time);
@@ -310,15 +248,9 @@
}
private:
- typedef
- boost::unordered_map<
- unsigned, // partition
- filenames_t> // file names
- partition_files_t;
-
datasource_type &datasource_;
- partition_files_t partition_files_;
specification const &specification_;
+ intermediate_store_type intermediate_store_;
};
template<>
Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -24,7 +24,7 @@
namespace detail {
template<typename Job>
-inline void run_next_map_task(Job &job, results &result, boost::mutex &m)
+inline void run_next_map_task(Job &job, results &result, boost::mutex &m1, boost::mutex &m2)
{
try
{
@@ -33,12 +33,12 @@
{
void *key = 0;
- m.lock();
+ m1.lock();
run = job.get_next_map_key(key);
- m.unlock();
+ m1.unlock();
if (run)
- job.run_map_task(key, result);
+ job.run_map_task(key, result, m2);
}
}
catch (std::exception &e)
@@ -48,22 +48,11 @@
}
template<typename Job>
-inline void run_next_reduce_task(Job &job, unsigned &partition, results &result, boost::mutex &m)
+inline void run_next_reduce_task(Job &job, unsigned &partition, results &result)
{
try
{
- bool run = true;
- while (run)
- {
- typename Job::filenames_t filenames;
-
- m.lock();
- run = job.get_partition_filenames(partition, filenames);
- m.unlock();
-
- if (run)
- job.run_reduce_task(partition, filenames, result);
- }
+ job.run_reduce_task(partition, result);
}
catch (std::exception &e)
{
@@ -84,7 +73,7 @@
typedef std::vector<boost::shared_ptr<results> > all_results_t;
all_results_t all_results;
- boost::mutex m;
+ boost::mutex m1, m2;
// run the Map Tasks
time_t start_time = time(NULL);
@@ -102,7 +91,8 @@
detail::run_next_map_task<Job>,
boost::ref(job),
boost::ref(*this_result),
- boost::ref(m));
+ boost::ref(m1),
+ boost::ref(m2));
map_threads.add_thread(thread);
}
map_threads.join_all();
@@ -126,8 +116,7 @@
detail::run_next_reduce_task<Job>,
boost::ref(job),
boost::ref(partition),
- boost::ref(*this_result),
- boost::ref(m));
+ boost::ref(*this_result));
reduce_threads.add_thread(thread);
}
reduce_threads.join_all();
Modified: sandbox/boost/mapreduce/schedule_policy/sequential.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/sequential.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/sequential.hpp 2009-07-30 15:07:21 EDT (Thu, 30 Jul 2009)
@@ -19,6 +19,17 @@
namespace schedule_policy {
+namespace detail {
+
+struct null_lock
+{
+ void lock(void) { }
+ void unlock(void) { }
+};
+
+} // namespace detail
+
+
template<typename Job>
class sequential
{
@@ -28,18 +39,15 @@
// Map Tasks
time_t start_time = time(NULL);
void *key = 0;
- while (job.get_next_map_key(key) && job.run_map_task(key, result))
+ detail::null_lock nolock;
+ while (job.get_next_map_key(key) && job.run_map_task(key, result, nolock))
;
result.map_runtime = time(NULL) - start_time;
// Reduce Tasks
start_time = time(NULL);
for (unsigned partition=0; partition<job.number_of_partitions(); ++partition)
- {
- typename Job::filenames_t filenames;
- if (job.get_partition_filenames(partition, filenames))
- job.run_reduce_task(partition, filenames, result);
- }
+ job.run_reduce_task(partition, result);
result.reduce_runtime = time(NULL) - start_time;
result.counters.actual_map_tasks = 1;
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk