|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r55327 - in sandbox: boost/mapreduce boost/mapreduce/intermediates libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-07-31 17:36:01
Author: chenderson
Date: 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
New Revision: 55327
URL: http://svn.boost.org/trac/boost/changeset/55327
Log:
Added memory-based intermediate handling policy
Text files modified:
sandbox/boost/mapreduce/hash_partitioner.hpp | 2 +
sandbox/boost/mapreduce/intermediates.hpp | 7 +----
sandbox/boost/mapreduce/intermediates/local_disk.hpp | 46 ++++++++++++++++++++++++++++++++-------
sandbox/boost/mapreduce/job.hpp | 22 ++++++-------------
sandbox/libs/mapreduce/test/mrtest.cpp | 25 ++++++++++++---------
5 files changed, 62 insertions(+), 40 deletions(-)
Modified: sandbox/boost/mapreduce/hash_partitioner.hpp
==============================================================================
--- sandbox/boost/mapreduce/hash_partitioner.hpp (original)
+++ sandbox/boost/mapreduce/hash_partitioner.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -12,6 +12,8 @@
#ifndef BOOST_MAPREDUCE_HASH_PARTITONER_HPP
#define BOOST_MAPREDUCE_HASH_PARTITONER_HPP
+
+#include <boost/functional/hash.hpp>
namespace boost {
Modified: sandbox/boost/mapreduce/intermediates.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates.hpp (original)
+++ sandbox/boost/mapreduce/intermediates.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -10,9 +10,6 @@
// For more information, see http://www.boost.org/libs/mapreduce/
//
-//!!!
-#ifdef USE_IN_MEMORY_INTERMEDIATES
-#include <boost/mapreduce/intermediates/in_memory.hpp>
-#endif // USE_IN_MEMORY_INTERMEDIATES
-
+#include <boost/mapreduce/hash_partitioner.hpp>
#include <boost/mapreduce/intermediates/local_disk.hpp>
+#include <boost/mapreduce/intermediates/in_memory.hpp>
Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -13,10 +13,8 @@
#ifndef BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
#define BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
-#include <iomanip> // setw
-#include <fstream> // linux
-#include <boost/unordered_map.hpp>
-#include <boost/mapreduce/hash_partitioner.hpp>
+#include <iomanip> // setw
+#include <fstream> // linux
namespace boost {
@@ -93,8 +91,35 @@
namespace intermediates {
+template<typename MapTask, typename ReduceTask>
+class reduce_file_output
+{
+ public:
+ reduce_file_output(std::string const &output_filespec,
+ unsigned const partition,
+ unsigned const num_partitions)
+ {
+ std::ostringstream filename;
+ filename << output_filespec << partition+1 << "_of_" << num_partitions;
+ filename_ = filename.str();
+ output_file_.open(filename_.c_str());
+ }
+
+ void operator()(typename MapTask::intermediate_key_type const &key,
+ typename ReduceTask::value_type const &value)
+ {
+ output_file_ << key << "\t" << value << "\n";
+ }
+
+ private:
+ std::string filename_;
+ std::ofstream output_file_;
+};
+
+
template<
typename MapTask,
+ typename ReduceTask,
typename SortFn=mapreduce::detail::file_sorter,
typename MergeFn=mapreduce::detail::file_merger,
typename PartitionFn=mapreduce::hash_partitioner>
@@ -102,11 +127,7 @@
{
private:
typedef
-#ifdef _DEBUG
std::map<
-#else
- boost::unordered_map<
-#endif
size_t, // hash value of intermediate key (R)
std::pair<
std::string, // filename
@@ -115,6 +136,7 @@
public:
typedef MapTask map_task_type;
+ typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
local_disk(unsigned const num_partitions)
: num_partitions_(num_partitions)
@@ -141,7 +163,13 @@
typename map_task_type::intermediate_value_type const &value)
{
unsigned const partition = partitioner_(key, num_partitions_);
- intermediates_t::iterator it = intermediate_files_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
+
+ intermediates_t::iterator it =
+ intermediate_files_.insert(
+ make_pair(
+ partition,
+ intermediates_t::mapped_type())).first;
+
if (it->second.first.empty())
{
it->second.first = platform::get_temporary_filename();
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -44,7 +44,8 @@
typename ReduceTask,
typename Combiner=null_combiner,
typename Datasource=datasource::directory_iterator<MapTask>,
- typename IntermediateStore=intermediates::local_disk<MapTask> >
+ typename IntermediateStore=intermediates::in_memory<MapTask, ReduceTask>,
+ typename StoreResult=typename IntermediateStore::store_result_type>
class job : private boost::noncopyable
{
public:
@@ -99,19 +100,16 @@
public:
reduce_task_runner(
std::string const &output_filespec,
- size_t const partition,
- size_t const num_partitions)
+ unsigned const partition,
+ unsigned const num_partitions)
+ : store_result_(output_filespec, partition, num_partitions)
{
- std::ostringstream filename;
- filename << output_filespec << partition+1 << "_of_" << num_partitions;
- filename_ = filename.str();
- output_file_.open(filename_.c_str());
}
void emit(typename map_task_type::intermediate_key_type const &key,
typename reduce_task_type::value_type const &value)
{
- output_file_ << key << "\t" << value << "\n";
+ store_result_(key, value);
}
template<typename It>
@@ -120,14 +118,8 @@
reduce_task_type::reduce(*this, key, it, ite);
}
- std::string const &filename(void) const
- {
- return filename_;
- }
-
private:
- std::string filename_;
- std::ofstream output_file_;
+ StoreResult store_result_;
};
job(datasource_type &datasource, specification const &spec)
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-07-31 17:35:59 EDT (Fri, 31 Jul 2009)
@@ -9,6 +9,17 @@
//
// For more information, see http://www.boost.org/libs/mapreduce/
//
+
+// configuration options
+#define WORD_COUNT_MEMORY_MAP_FILE
+#define USE_WORDCOUNT_COMBINER
+#define USE_IN_MEMORY_INTERMEDIATES
+
+#if defined(_DEBUG)
+# define RUN_SEQUENTIAL_MAP_REDUCE
+#else
+# define BOOST_DISABLE_ASSERTS
+#endif
#if !defined(_DEBUG) && !defined(BOOST_DISABLE_ASSERTS)
# pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
@@ -26,15 +37,6 @@
#include <crtdbg.h>
#endif
-// configuration options
-#define WORD_COUNT_MEMORY_MAP_FILE
-#define USE_WORDCOUNT_COMBINER
-//#define USE_IN_MEMORY_INTERMEDIATES
-
-#if defined(_DEBUG)
-#define RUN_SEQUENTIAL_MAP_REDUCE
-#endif
-
namespace wordcount {
typedef
@@ -172,9 +174,9 @@
#else
, boost::mapreduce::null_combiner
#endif
-#ifdef USE_IN_MEMORY_INTERMEDIATES
+#ifndef USE_IN_MEMORY_INTERMEDIATES
, boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
- , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type>
+ , boost::mapreduce::intermediates::local_disk<wordcount::map_task_type, wordcount::reduce_task>
#endif
> job;
@@ -244,6 +246,7 @@
std::cout << "\nFinished.";
#else
std::cout << "\nRunning CPU Parallel MapReduce...";
+ spec.reduce_tasks = 1;
if (argc > 2)
spec.map_tasks = atoi(argv[2]);
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk