Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55133 - in sandbox/boost/mapreduce: . intermediates schedule_policy
From: cdm.henderson_at_[hidden]
Date: 2009-07-23 14:57:52


Author: chenderson
Date: 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
New Revision: 55133
URL: http://svn.boost.org/trac/boost/changeset/55133

Log:
Initial upload; based on v0.2 from the Boost Vault.
Added:
   sandbox/boost/mapreduce/
   sandbox/boost/mapreduce/datasource.hpp (contents, props changed)
   sandbox/boost/mapreduce/hash_partitioner.hpp (contents, props changed)
   sandbox/boost/mapreduce/intermediates/
   sandbox/boost/mapreduce/intermediates.hpp (contents, props changed)
   sandbox/boost/mapreduce/job.hpp (contents, props changed)
   sandbox/boost/mapreduce/mergesort.hpp (contents, props changed)
   sandbox/boost/mapreduce/null_combiner.hpp (contents, props changed)
   sandbox/boost/mapreduce/platform.hpp (contents, props changed)
   sandbox/boost/mapreduce/schedule_policy/
   sandbox/boost/mapreduce/schedule_policy.hpp (contents, props changed)
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp (contents, props changed)
   sandbox/boost/mapreduce/schedule_policy/sequential.hpp (contents, props changed)

Added: sandbox/boost/mapreduce/datasource.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/datasource.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,69 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_DATASOURCE_SCHEDULE_POLICY_HPP
+#define BOOST_DATASOURCE_SCHEDULE_POLICY_HPP
+
+namespace boost {
+
+namespace mapreduce {
+
+namespace datasource {
+
+template<typename MapTask>
+class directory_iterator : boost::noncopyable
+{
+ public:
+ bool const setup_key(typename MapTask::key_type &key) const
+ {
+ while (it_dir_ != boost::filesystem::basic_directory_iterator<path_t>()
+ && boost::filesystem::is_directory(*it_dir_))
+ {
+ ++it_dir_;
+ }
+
+ if (it_dir_ == boost::filesystem::basic_directory_iterator<path_t>())
+ return false;
+
+ path_t path = *it_dir_++;
+ key = path.external_file_string();
+ return true;
+ }
+
+ bool const get_data(typename MapTask::key_type &key, typename MapTask::value_type &istream) const
+ {
+ istream.open(key.c_str());
+ return istream.is_open();
+ }
+
+ void set_directory(char const *directory)
+ {
+ directory_ = directory;
+ it_dir_ = boost::filesystem::basic_directory_iterator<path_t>(directory_);
+ }
+
+ private:
+ typedef
+ boost::filesystem::basic_path<std::string, boost::filesystem::path_traits>
+ path_t;
+
+ mutable boost::filesystem::basic_directory_iterator<path_t> it_dir_;
+ std::string directory_;
+};
+
+} // namespace datasource
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_DATASOURCE_SCHEDULE_POLICY_HPP

Added: sandbox/boost/mapreduce/hash_partitioner.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/hash_partitioner.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,34 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_HASH_PARTITONER_HPP
+#define BOOST_MAPREDUCE_HASH_PARTITONER_HPP
+
+namespace boost {
+
+namespace mapreduce {
+
+struct hash_partitioner
+{
+ template<typename T>
+ unsigned operator()(T const &key, unsigned partitions) const
+ {
+ boost::hash<T> hasher;
+ return hasher(key) % partitions;
+ }
+};
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_HASH_PARTITONER_HPP

Added: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/intermediates.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,14 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+//#include <boost/mapreduce/intermediates/in_memory.hpp>
+#include <boost/mapreduce/intermediates/local_disk.hpp>

Added: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/job.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,296 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_JOB_HPP
+#define BOOST_MAPREDUCE_JOB_HPP
+
+namespace boost {
+
+namespace mapreduce {
+
+template<typename T> size_t length(T const &str);
+
+template<typename MapTask,
+ typename ReduceTask,
+ typename Combiner=null_combiner,
+ typename Datasource=datasource::directory_iterator<MapTask>,
+ typename IntermediateStore=intermediates::local_disk<MapTask> >
+class job : private boost::noncopyable
+{
+ public:
+ typedef MapTask map_task_type;
+ typedef ReduceTask reduce_task_type;
+ typedef Datasource datasource_type;
+ typedef IntermediateStore intermediate_store_type;
+ typedef Combiner combiner_type;
+ typedef std::list<std::string> filenames_t;
+
+ public:
+ class map_task_runner : boost::noncopyable
+ {
+ public:
+ map_task_runner(job &j)
+ : job_(j),
+ intermediate_store_(job_.number_of_partitions())
+ {
+ }
+
+ // 'value' parameter is not a reference to const to enable streams to be passed
+ map_task_runner &operator()(typename map_task_type::key_type const &key, typename map_task_type::value_type &value)
+ {
+ map_task_type(*this)(key, value);
+
+ // consolidating map intermediate results can save network time by
+ // aggregating the mapped valued at mapper
+ combiner_type combiner;
+ intermediate_store_.combine(combiner);
+
+ // notify the job of the final partition files
+ notify_runtime_partition_files();
+
+ return *this;
+ }
+
+ bool const emit_intermediate(typename map_task_type::intermediate_key_type const &key,
+ typename map_task_type::intermediate_value_type const &value)
+ {
+ return intermediate_store_.insert(key, value);
+ }
+
+ protected:
+ void notify_runtime_partition_files(void) const
+ {
+ for (unsigned partition=0; partition<job_.number_of_partitions(); ++partition)
+ {
+ std::string filename;
+ if (intermediate_store_.get_partition_filename(partition, filename))
+ {
+ job_.new_partition_file(partition, filename);
+ }
+ }
+ }
+
+ private:
+ job &job_;
+ intermediate_store_type intermediate_store_;
+ };
+
+ class reduce_task_runner : boost::noncopyable
+ {
+ public:
+ reduce_task_runner(std::string const &output_filespec, size_t const partition, size_t const num_partitions)
+ {
+ std::ostringstream filename;
+ filename << output_filespec << partition+1 << "_of_" << num_partitions;
+ filename_ = filename.str();
+ output_file_.open(filename_.c_str());
+ }
+
+ void emit(typename map_task_type::intermediate_key_type const &key,
+ typename reduce_task_type::value_type const &value)
+ {
+ output_file_ << key << "\t" << value << "\n";
+ }
+
+ template<typename It>
+ void operator()(typename map_task_type::intermediate_key_type const &key, It it, It ite)
+ {
+ reduce_task_type reduce_task(*this);
+ reduce_task(key, it, ite);
+ }
+
+ std::string const &filename(void) const
+ {
+ return filename_;
+ }
+
+ private:
+ std::string filename_;
+ std::ofstream output_file_;
+ };
+
+ job(datasource_type &datasource)
+ : num_partitions_(0),
+ datasource_(datasource),
+ output_filespec_(".\\mapreduce_")
+ {
+ }
+
+ ~job()
+ {
+ try
+ {
+ for (partition_files_t::iterator itp=partition_files_.begin(); itp!=partition_files_.end(); ++itp)
+ std::for_each(itp->second.begin(), itp->second.end(), boost::bind(detail::delete_file, _1));
+ }
+ catch (std::exception const &e)
+ {
+ std::cerr << "\nError: " << e.what() << "\n";
+ }
+ partition_files_.clear();
+ }
+
+ void set_output_filespec(std::string const &output_filespec)
+ {
+ output_filespec_ = output_filespec;
+ }
+
+ bool const get_next_map_key(void *&key)
+ {
+ std::auto_ptr<typename map_task_type::key_type> next_key(new typename map_task_type::key_type);
+ if (!datasource_.setup_key(*next_key))
+ return false;
+ key = next_key.release();
+ return true;
+ }
+
+ void new_partition_file(unsigned const partition, std::string const &filename)
+ {
+ partition_files_.insert(std::make_pair(partition, partition_files_t::mapped_type())).first->second.push_back(filename);
+ }
+
+ template<typename SchedulePolicy>
+ void run(specification const &spec, results &result)
+ {
+ SchedulePolicy schedule;
+ this->run(schedule, spec, result);
+ }
+
+ template<typename SchedulePolicy>
+ void run(SchedulePolicy &schedule, specification const &spec, results &result)
+ {
+ time_t const start_time = time(NULL);
+ schedule(*this, spec, result);
+ result.job_runtime = time(NULL) - start_time;
+ }
+
+ bool const run_map_task(void *key, results &result)
+ {
+ time_t const start_time = time(NULL);
+
+ ++result.counters.map_tasks;
+
+ std::auto_ptr<typename map_task_type::key_type> map_key_ptr(reinterpret_cast<typename map_task_type::key_type *>(key));
+ typename map_task_type::key_type &map_key = *map_key_ptr;
+
+ // get the data
+ typename map_task_type::value_type value;
+ if (!datasource_.get_data(map_key, value))
+ {
+ ++result.counters.map_tasks_error;
+ return false;
+ }
+
+ // Map Task
+ if (map_key == typename map_task_type::key_type() || value == typename map_task_type::value_type())
+ {
+ BOOST_ASSERT(map_key != typename map_task_type::key_type());
+ BOOST_ASSERT(value != typename map_task_type::value_type());
+ ++result.counters.map_tasks_error;
+ return false;
+ }
+ map_task_runner runner(*this);
+ runner(map_key, value);
+
+ result.map_times.push_back(time(NULL)-start_time);
+
+ ++result.counters.map_tasks_completed;
+ return true;
+ }
+
+ unsigned const number_of_partitions(void) const
+ {
+ return num_partitions_;
+ }
+
+ void number_of_partitions(unsigned const partitions)
+ {
+ num_partitions_ = partitions;
+ }
+
+ // the caller must synchronise calls to this function from multiple threads
+ bool const get_partition_filenames(unsigned &partition, filenames_t &filenames)
+ {
+ partition_files_t::iterator itp = partition_files_.find(partition);
+ if (itp == partition_files_.end())
+ return false;
+ else if (itp->second.size() == 0)
+ return get_partition_filenames(++partition, filenames);
+
+ std::swap(itp->second, filenames);
+ return true;
+ }
+
+ bool const run_reduce_task(unsigned const partition, filenames_t const &filenames, results &result)
+ {
+ time_t const start_time = time(NULL);
+ std::string const filename = intermediate_store_type::merge_and_sort(filenames);
+
+ reduce_task_runner runner(output_filespec_, partition, num_partitions_);
+
+ typename map_task_type::intermediate_key_type key;
+ typename map_task_type::intermediate_key_type last_key;
+ typename map_task_type::intermediate_value_type value;
+ std::list<typename map_task_type::intermediate_value_type> values;
+
+ std::ifstream infile(filename.c_str());
+ while (intermediate_store_type::read_record(infile, key, value))
+ {
+ if (key != last_key && length(key) > 0)
+ {
+ if (length(last_key) > 0)
+ {
+ runner(last_key, values.begin(), values.end());
+ values.clear();
+ }
+ if (length(key) > 0)
+ std::swap(key, last_key);
+ }
+
+ values.push_back(value);
+ }
+
+ if (length(last_key) > 0)
+ runner(last_key, values.begin(), values.end());
+
+ infile.close();
+ boost::filesystem::remove(filename.c_str());
+
+ result.reduce_times.push_back(time(NULL)-start_time);
+
+ return true;
+ }
+
+ private:
+ typedef
+ boost::unordered_map<
+ unsigned, // partition
+ filenames_t> // file names
+ partition_files_t;
+
+ unsigned num_partitions_;
+ datasource_type &datasource_;
+ std::string output_filespec_;
+ partition_files_t partition_files_;
+};
+
+template<>
+inline size_t length(std::string const &str)
+{
+ return str.length();
+}
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_JOB_HPP

Added: sandbox/boost/mapreduce/mergesort.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/mergesort.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,217 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_MERGESORT_HPP
+#define BOOST_MAPREDUCE_MERGESORT_HPP
+
+#include <deque>
+#include <list>
+#include <map>
+#include <sstream>
+#include <boost/shared_ptr.hpp>
+#include <boost/bind.hpp>
+#include <boost/filesystem.hpp>
+
+#ifdef GNU_GCC
+#include <cstring> // ubuntu linux
+#include <fstream> // ubuntu linux
+#endif
+
+#ifdef BOOST_NO_STDC_NAMESPACE
+ namespace std { using ::strcmp; }
+#endif
+
+namespace boost {
+
+namespace mapreduce {
+
+namespace detail {
+
+template<typename T>
+bool const less_2nd(T const &first, T const &second)
+{
+ return first.second < second.second;
+}
+
+struct key_offset_compare
+{
+ public:
+ key_offset_compare(unsigned const &offset)
+ : offset_(offset)
+ {
+ }
+
+ key_offset_compare(key_offset_compare const &other)
+ : offset_(other.offset_)
+ {
+ }
+
+ key_offset_compare &operator=(key_offset_compare const &other);
+
+ template<typename T>
+ bool const operator()(std::pair<T, std::string> const &first, std::pair<T, std::string> const &second) const
+ {
+ return (*this)(first.second, second.second);
+ }
+
+ bool const operator()(boost::shared_ptr<std::string> const &first, boost::shared_ptr<std::string> const &second) const
+ {
+ return (*this)(*first, *second);
+ }
+
+ bool const operator()(std::string const &first, std::string const &second) const
+ {
+ std::string::size_type length1 = first.length();
+ std::string::size_type length2 = second.length();
+
+ if (length1 == 0)
+ return (length2 != 0);
+ else if (length2 == 0)
+ return (length1 == 0);
+ else if (offset_ >= length1)
+ return true;
+ else if (offset_ >= length2)
+ return false;
+
+ std::string::const_iterator it1 = first.begin() + offset_;
+ std::string::const_iterator it2 = second.begin() + offset_;
+
+ length1 -= offset_;
+ length2 -= offset_;
+
+ return (std::strcmp(&*it1, &*it2) < 0);
+ }
+
+ private:
+ unsigned const &offset_;
+};
+
+
+template<typename It>
+bool const do_file_merge(It first, It last, char const *outfilename, unsigned const offset)
+{
+#ifdef _DEBUG
+ int const max_files=10;
+#endif
+
+ std::ofstream outfile(outfilename, std::ios_base::out | std::ios_base::binary);
+ while (first!=last)
+ { //!!!subsequent times around the loop need to merge with outfilename from previous iteration
+ typedef std::list<std::pair<boost::shared_ptr<std::ifstream>, std::string> > file_lines_t;
+ file_lines_t file_lines;
+ for (; first!=last; ++first)
+ {
+ boost::shared_ptr<std::ifstream> file(new std::ifstream(first->c_str(), std::ios_base::in | std::ios_base::binary));
+ if (!file->is_open())
+ break;
+#ifdef _DEBUG
+ if (file_lines.size() == max_files)
+ break;
+#endif
+
+ std::string line;
+ std::getline(*file, line);
+ file_lines.push_back(std::make_pair(file, line));
+ }
+
+ while (file_lines.size() > 0)
+ {
+ typename file_lines_t::iterator it;
+ if (file_lines.size() == 1)
+ it = file_lines.begin();
+ else
+ it = std::min_element(file_lines.begin(), file_lines.end(), detail::key_offset_compare(offset));
+ outfile << it->second << "\n";
+
+ std::getline(*it->first, it->second);
+ if (it->first->eof())
+ file_lines.erase(it);
+ }
+ }
+
+ return true;
+}
+
+struct less_shared_ptr_string
+{
+ bool operator()(boost::shared_ptr<std::string> const &a, boost::shared_ptr<std::string> const &b) const
+ {
+ return *a < *b;
+ }
+};
+
+inline bool const delete_file(std::string const &pathname)
+{
+ return boost::filesystem::remove(pathname);
+}
+
+} // namespace detail
+
+
+
+inline bool const merge_sort(char const *in,
+ char const *out,
+ unsigned const offset,
+ unsigned const max_lines = 1000000)
+{
+ std::deque<std::string> temporary_files;
+ std::ifstream infile(in, std::ios_base::in | std::ios_base::binary);
+ if (!infile.is_open())
+ {
+ std::ostringstream err;
+ err << "Unable to open file " << in;
+ BOOST_THROW_EXCEPTION(std::runtime_error(err.str()));
+ }
+
+ while (!infile.eof())
+ {
+ typedef std::map<boost::shared_ptr<std::string>, unsigned, detail::key_offset_compare> lines_t;
+ detail::key_offset_compare map_comparator(offset);
+ lines_t lines(map_comparator);
+
+ for (unsigned loop=0; !infile.eof() && loop<max_lines; ++loop)
+ {
+ if (infile.fail() || infile.bad())
+ BOOST_THROW_EXCEPTION(std::runtime_error("An error occurred reading the input file."));
+
+ boost::shared_ptr<std::string> line(new std::string);
+ std::getline(infile, *line);
+
+ ++lines.insert(std::make_pair(line,0U)).first->second;
+ }
+
+ std::string const temp_filename(platform::get_temporary_filename());
+ temporary_files.push_back(temp_filename);
+ std::ofstream file(temp_filename.c_str(), std::ios_base::out | std::ios_base::binary);
+ for (lines_t::const_iterator it=lines.begin(); it!=lines.end(); ++it)
+ {
+ if (file.fail() || file.bad())
+ BOOST_THROW_EXCEPTION(std::runtime_error("An error occurred writing temporary a file."));
+
+ for (unsigned loop=0; loop<it->second; ++loop)
+ file << *(it->first);
+ file << "\n";
+ }
+ }
+ infile.close();
+
+ detail::do_file_merge(temporary_files.begin(), temporary_files.end(), out, offset);
+ std::for_each(temporary_files.begin(), temporary_files.end(), boost::bind(detail::delete_file, _1));
+
+ return true;
+}
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_MERGESORT_HPP

Added: sandbox/boost/mapreduce/null_combiner.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/null_combiner.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,39 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_NULL_COMBINER_HPP
+#define BOOST_MAPREDUCE_NULL_COMBINER_HPP
+
+namespace boost {
+
+namespace mapreduce {
+
+struct null_combiner
+{
+ template<typename IntermediateValueType>
+ void start(IntermediateValueType const &)
+ { }
+
+ template<typename IntermediateValueType, typename IntermediateStore>
+ void finish(IntermediateValueType const &, IntermediateStore &)
+ { }
+
+ template<typename IntermediateValueType>
+ void operator()(IntermediateValueType const &)
+ { }
+};
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_NULL_COMBINER_HPP

Added: sandbox/boost/mapreduce/platform.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/platform.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,116 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_PLATFORM_HPP
+#define BOOST_MAPREDUCE_PLATFORM_HPP
+
+#include <boost/config.hpp>
+#include <boost/system/system_error.hpp>
+
+#if defined(BOOST_WINDOWS)
+#include <windows.h>
+
+#if defined(BOOST_MSVC)
+#undef min
+#undef max
+#endif
+
+namespace boost {
+
+namespace mapreduce {
+
+namespace win32 {
+
+namespace detail {
+
+template<typename Char>
+struct os_temporary_file_api_traits;
+
+template<>
+struct os_temporary_file_api_traits<char>
+{
+ static DWORD get_temp_path(DWORD length, char *buffer)
+ {
+ return GetTempPathA(length, buffer);
+ }
+
+ static unsigned get_temp_filename(char const *path, char const *prefix, unsigned unique, char *filename)
+ {
+ return GetTempFileNameA(path, prefix, unique, filename);
+ }
+};
+
+template<>
+struct os_temporary_file_api_traits<wchar_t>
+{
+ static DWORD get_temp_path(DWORD length, wchar_t *buffer)
+ {
+ return GetTempPathW(length, buffer);
+ }
+
+ static unsigned get_temp_filename(wchar_t const *path, wchar_t const *prefix, unsigned unique, wchar_t *filename)
+ {
+ return GetTempFileNameW(path, prefix, unique, filename);
+ }
+};
+
+} // namespace detail
+
+template<typename Char>
+std::basic_string<Char> &get_temporary_filename(std::basic_string<Char> &pathname)
+{
+ Char path[_MAX_PATH+1];
+ if (!detail::os_temporary_file_api_traits<Char>::get_temp_path(sizeof(path)/sizeof(path[0]), path))
+ BOOST_THROW_EXCEPTION(boost::system::system_error(GetLastError(), boost::system::system_category));
+
+ Char file[_MAX_PATH+1];
+ if (!detail::os_temporary_file_api_traits<Char>::get_temp_filename(path, "boost", 0, file))
+ BOOST_THROW_EXCEPTION(boost::system::system_error(GetLastError(), boost::system::system_category));
+
+ pathname = file;
+ return pathname;
+}
+
+#else
+
+namespace boost {
+
+namespace mapreduce {
+
+namespace linux_os {
+
+std::string &get_temporary_filename(std::string &pathname); // !!! not implemented
+
+#endif
+
+inline std::string const get_temporary_filename(void)
+{
+ std::string filename;
+ return get_temporary_filename(filename);
+}
+
+} // namespace win32/linux
+
+
+#if defined(_WIN32)
+namespace platform=win32;
+#elif defined(LINUX)
+namespace platform=linux_os;
+#else
+#error Undefined Platform
+#endif
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_PLATFORM_HPP

Added: sandbox/boost/mapreduce/schedule_policy.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/schedule_policy.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,14 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#include <boost/mapreduce/schedule_policy/sequential.hpp>
+#include <boost/mapreduce/schedule_policy/cpu_parallel.hpp>

Added: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,175 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_CPU_PARALLEL_HPP
+#define BOOST_MAPREDUCE_CPU_PARALLEL_HPP
+
+#ifdef BOOST_MSVC
+#pragma warning(push)
+#pragma warning(disable: 4244) // warning C4244: 'argument' : conversion from 'X' to 'Y', possible loss of data
+#endif
+#include <boost/thread.hpp>
+#ifdef BOOST_MSVC
+#pragma warning(pop)
+#endif
+
+namespace boost {
+
+namespace mapreduce {
+
+namespace schedule_policy {
+
+namespace detail {
+
+template<typename Job>
+inline void run_next_map_task(Job &job, results &result, boost::mutex &m)
+{
+ try
+ {
+ bool run = true;
+ while (run)
+ {
+ void *key = 0;
+
+ m.lock();
+ run = job.get_next_map_key(key);
+ m.unlock();
+
+ if (run)
+ job.run_map_task(key, result);
+ }
+ }
+ catch (std::exception &e)
+ {
+ std::cerr << "\nError: " << e.what() << "\n";
+ }
+}
+
+template<typename Job>
+inline void run_next_reduce_task(Job &job, unsigned &partition, results &result, boost::mutex &m)
+{
+ try
+ {
+ bool run = true;
+ while (run)
+ {
+ typename Job::filenames_t filenames;
+
+ m.lock();
+ run = job.get_partition_filenames(partition, filenames);
+ m.unlock();
+
+ if (run)
+ job.run_reduce_task(partition, filenames, result);
+ }
+ }
+ catch (std::exception &e)
+ {
+ std::cerr << "\nError: " << e.what() << "\n";
+ }
+}
+
+} // namespace detail
+
+
+template<typename Job>
+class cpu_parallel
+{
+ public:
+ void operator()(Job &job, specification const &spec, results &result)
+ {
+ unsigned const num_cpus = std::max(1,boost::thread::hardware_concurrency());
+ job.number_of_partitions(spec.reduce_tasks);
+
+ typedef std::vector<boost::shared_ptr<results> > all_results_t;
+ all_results_t all_results;
+ boost::mutex m;
+
+ // run the Map Tasks
+ time_t start_time = time(NULL);
+ boost::thread_group map_threads;
+
+ unsigned const map_tasks = (spec.map_tasks==0)? num_cpus : std::min(num_cpus, spec.map_tasks);
+
+ for (unsigned loop=0; loop<map_tasks; ++loop)
+ {
+ boost::shared_ptr<results> this_result(new results);
+ all_results.push_back(this_result);
+
+ boost::thread *thread =
+ new boost::thread(
+ detail::run_next_map_task<Job>,
+ boost::ref(job),
+ boost::ref(*this_result),
+ boost::ref(m));
+ map_threads.add_thread(thread);
+ }
+ map_threads.join_all();
+ result.map_runtime = time(NULL) - start_time;
+
+ // run the Reduce Tasks
+ start_time = time(NULL);
+ boost::thread_group reduce_threads;
+
+ unsigned const reduce_tasks =
+ std::min<unsigned const>(num_cpus, job.number_of_partitions());
+
+ unsigned partition = 0;
+ for (unsigned loop=0; loop<reduce_tasks; ++loop)
+ {
+ boost::shared_ptr<results> this_result(new results);
+ all_results.push_back(this_result);
+
+ boost::thread *thread =
+ new boost::thread(
+ detail::run_next_reduce_task<Job>,
+ boost::ref(job),
+ boost::ref(partition),
+ boost::ref(*this_result),
+ boost::ref(m));
+ reduce_threads.add_thread(thread);
+ }
+ reduce_threads.join_all();
+ result.reduce_runtime = time(NULL) - start_time;
+
+ // we're done with the map/reduce job, collate the statistics before returning
+ for (all_results_t::const_iterator it=all_results.begin();
+ it!=all_results.end();
+ ++it)
+ {
+ result.counters.map_tasks += (*it)->counters.map_tasks;
+ result.counters.map_tasks_error += (*it)->counters.map_tasks_error;
+ result.counters.map_tasks_completed += (*it)->counters.map_tasks_completed;
+
+ std::copy(
+ (*it)->map_times.begin(),
+ (*it)->map_times.end(),
+ std::back_inserter(result.map_times));
+ std::copy(
+ (*it)->reduce_times.begin(),
+ (*it)->reduce_times.end(),
+ std::back_inserter(result.reduce_times));
+
+ }
+ result.counters.actual_map_tasks = map_tasks;
+ result.counters.actual_reduce_tasks = reduce_tasks;
+ result.counters.num_result_files = job.number_of_partitions();
+ }
+};
+
+} // namespace schedule_policy
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_CPU_PARALLEL_HPP

Added: sandbox/boost/mapreduce/schedule_policy/sequential.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/schedule_policy/sequential.hpp 2009-07-23 14:57:51 EDT (Thu, 23 Jul 2009)
@@ -0,0 +1,59 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_SEQUENTIAL_SCHEDULE_HPP
+#define BOOST_MAPREDUCE_SEQUENTIAL_SCHEDULE_HPP
+
+namespace boost {
+
+namespace mapreduce {
+
+namespace schedule_policy {
+
+template<typename Job>
+class sequential
+{
+ public:
+ void operator()(Job &job, specification const &spec, results &result)
+ {
+ job.number_of_partitions(spec.reduce_tasks);
+
+ // Map Tasks
+ time_t start_time = time(NULL);
+ void *key = 0;
+ while (job.get_next_map_key(key) && job.run_map_task(key, result))
+ ;
+ result.map_runtime = time(NULL) - start_time;
+
+ // Reduce Tasks
+ start_time = time(NULL);
+ for (unsigned partition=0; partition<job.number_of_partitions(); ++partition)
+ {
+ typename Job::filenames_t filenames;
+ if (job.get_partition_filenames(partition, filenames))
+ job.run_reduce_task(partition, filenames, result);
+ }
+ result.reduce_runtime = time(NULL) - start_time;
+
+ result.counters.actual_map_tasks = 1;
+ result.counters.actual_reduce_tasks = 1;
+ result.counters.num_result_files = job.number_of_partitions();
+ }
+};
+
+} // namespace schedule_policy
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_SEQUENTIAL_SCHEDULE_HPP


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk