|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r55343 - in sandbox: boost/mapreduce boost/mapreduce/intermediates libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-01 10:15:07
Author: chenderson
Date: 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
New Revision: 55343
URL: http://svn.boost.org/trac/boost/changeset/55343
Log:
Refined the type definitions for key/value pair types in Map and Reduce Tasks
Text files modified:
sandbox/boost/mapreduce/intermediates/in_memory.hpp | 15 ++++++++-------
sandbox/boost/mapreduce/intermediates/local_disk.hpp | 25 +++++++++++++------------
sandbox/boost/mapreduce/job.hpp | 21 ++++++++-------------
sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp | 14 ++++++--------
sandbox/libs/mapreduce/test/mrtest.cpp | 29 +++++++++++++++++------------
5 files changed, 52 insertions(+), 52 deletions(-)
Modified: sandbox/boost/mapreduce/intermediates/in_memory.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/in_memory.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/in_memory.hpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -29,8 +29,8 @@
{
}
- void operator()(typename MapTask::intermediate_key_type const &/*key*/,
- typename ReduceTask::value_type const &/*value*/)
+ void operator()(typename ReduceTask::key_type const &/*key*/,
+ typename ReduceTask::value_type const &/*value*/)
{
}
};
@@ -46,12 +46,13 @@
typedef
std::vector<
std::map<
- typename MapTask::intermediate_key_type,
- std::list<typename MapTask::intermediate_value_type> > >
+ typename ReduceTask::key_type,
+ std::list<typename ReduceTask::value_type> > >
intermediates_t;
public:
- typedef MapTask map_task_type;
+ typedef MapTask map_task_type;
+ typedef ReduceTask reduce_task_type;
typedef reduce_null_output<MapTask, ReduceTask> store_result_type;
in_memory(unsigned const num_partitions)
@@ -99,8 +100,8 @@
}
- bool const insert(typename map_task_type::intermediate_key_type const &key,
- typename map_task_type::intermediate_value_type const &value)
+ bool const insert(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
{
typename intermediates_t::value_type &map = intermediates_[partitioner_(key, num_partitions_)];
Modified: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- sandbox/boost/mapreduce/intermediates/local_disk.hpp (original)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -108,8 +108,8 @@
output_file_.open(filename_.c_str());
}
- void operator()(typename MapTask::intermediate_key_type const &key,
- typename ReduceTask::value_type const &value)
+ void operator()(typename ReduceTask::key_type const &key,
+ typename ReduceTask::value_type const &value)
{
output_file_ << key << "\t" << value << "\n";
}
@@ -138,7 +138,8 @@
intermediates_t;
public:
- typedef MapTask map_task_type;
+ typedef MapTask map_task_type;
+ typedef ReduceTask reduce_task_type;
typedef reduce_file_output<MapTask, ReduceTask> store_result_type;
local_disk(unsigned const num_partitions)
@@ -162,8 +163,8 @@
}
}
- bool const insert(typename map_task_type::intermediate_key_type const &key,
- typename map_task_type::intermediate_value_type const &value)
+ bool const insert(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
{
unsigned const partition = partitioner_(key, num_partitions_);
@@ -208,7 +209,7 @@
std::ifstream infile(infilename.c_str());
while (!infile.eof())
{
- typename map_task_type::intermediate_value_type value;
+ typename reduce_task_type::value_type value;
if (read_record(infile, key, value))
{
if (key != last_key && key.length() > 0)
@@ -276,10 +277,10 @@
template<typename Callback>
void reduce(unsigned const partition, Callback &callback, results &result)
{
- typename map_task_type::intermediate_key_type key;
- typename map_task_type::intermediate_key_type last_key;
- typename map_task_type::intermediate_value_type value;
- std::list<typename map_task_type::intermediate_value_type> values;
+ typename reduce_task_type::key_type key;
+ typename reduce_task_type::key_type last_key;
+ typename reduce_task_type::value_type value;
+ std::list<typename reduce_task_type::value_type> values;
std::list<std::string> filenames;
intermediates_t::const_iterator it = intermediate_files_.find(partition);
@@ -316,8 +317,8 @@
protected:
static bool const read_record(std::ifstream &infile,
- typename map_task_type::intermediate_key_type &key,
- typename map_task_type::intermediate_value_type &value)
+ typename reduce_task_type::key_type &key,
+ typename reduce_task_type::value_type &value)
{
#if defined(__SGI_STL_PORT)
size_t keylen;
Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -19,24 +19,19 @@
template<typename T> size_t length(T const &str);
-template<
- typename MapKey,
- typename MapValue,
- typename IntermediateKey,
- typename IntermediateValue>
+template<typename MapKey, typename MapValue>
class map_task
{
public:
typedef MapKey key_type;
typedef MapValue value_type;
- typedef IntermediateKey intermediate_key_type;
- typedef IntermediateValue intermediate_value_type;
};
-template<typename ReduceValue>
+template<typename ReduceKey, typename ReduceValue>
class reduce_task
{
public:
+ typedef ReduceKey key_type;
typedef ReduceValue value_type;
};
@@ -78,8 +73,8 @@
return *this;
}
- bool const emit_intermediate(typename map_task_type::intermediate_key_type const &key,
- typename map_task_type::intermediate_value_type const &value)
+ bool const emit_intermediate(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
{
return intermediate_store_.insert(key, value);
}
@@ -105,14 +100,14 @@
{
}
- void emit(typename map_task_type::intermediate_key_type const &key,
- typename reduce_task_type::value_type const &value)
+ void emit(typename reduce_task_type::key_type const &key,
+ typename reduce_task_type::value_type const &value)
{
store_result_(key, value);
}
template<typename It>
- void operator()(typename map_task_type::intermediate_key_type const &key, It it, It ite)
+ void operator()(typename reduce_task_type::key_type const &key, It it, It ite)
{
reduce_task_type::reduce(*this, key, it, ite);
}
Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp (original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -29,10 +29,8 @@
namespace wordcount {
struct map_task : public boost::mapreduce::map_task<
- std::string, // MapKey
- std::pair<char const *, char const *>, // MapValue
- std::string, // IntermediateKey
- unsigned> // IntermediateValue
+ std::string, // MapKey
+ std::pair<char const *, char const *> > // MapValue
{
template<typename Runtime>
static void map(Runtime &runtime, std::string const &/*key*/, value_type &value)
@@ -79,7 +77,7 @@
}
};
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
{
template<typename Runtime, typename It>
static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -98,19 +96,19 @@
intermediate_store.combine(instance);
}
- void start(map_task::intermediate_key_type const &)
+ void start(reduce_task::key_type const &)
{
total_ = 0;
}
template<typename IntermediateStore>
- void finish(map_task::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+ void finish(reduce_task::key_type const &key, IntermediateStore &intermediate_store)
{
if (total_ > 0)
intermediate_store.insert(key, total_);
}
- void operator()(map_task::intermediate_value_type const &value)
+ void operator()(reduce_task::value_type const &value)
{
total_ += value;
}
Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-08-01 10:15:05 EDT (Sat, 01 Aug 2009)
@@ -50,18 +50,14 @@
map_value_type;
template<typename T>
-struct map_task : public boost::mapreduce::map_task<
- std::string, // MapKey
- map_value_type, // MapValue
- std::string, // IntermediateKey
- unsigned> // IntermediateValue
+struct map_task
+ : public boost::mapreduce::map_task<std::string, map_value_type>
{
template<typename Runtime>
static void map(Runtime &runtime, std::string const &key, T &value);
};
-typedef map_task<map_value_type> map_task_type;
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
{
template<typename Runtime, typename It>
static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -167,6 +163,8 @@
class combiner;
+typedef map_task<map_value_type> map_task_type;
+
typedef
boost::mapreduce::job<
wordcount::map_task_type
@@ -197,19 +195,19 @@
intermediate_store.combine(instance);
}
- void start(job::map_task_type::intermediate_key_type const &)
+ void start(job::reduce_task_type::key_type const &)
{
total_ = 0;
}
template<typename IntermediateStore>
- void finish(job::map_task_type::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+ void finish(job::reduce_task_type::key_type const &key, IntermediateStore &intermediate_store)
{
if (total_ > 0)
intermediate_store.insert(key, total_);
}
- void operator()(job::map_task_type::intermediate_value_type const &value)
+ void operator()(job::reduce_task_type::value_type const &value)
{
total_ += value;
}
@@ -256,7 +254,7 @@
spec.map_tasks = 1;
spec.reduce_tasks = 1;
- wordcount::job job(datasource, spec);
+ wordcount::job job(datasource, spec);
job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
std::cout << "\nFinished.";
#else
@@ -266,7 +264,12 @@
if (argc > 2)
spec.map_tasks = atoi(argv[2]);
- boost::mapreduce::run<wordcount::job>(spec, result);
+ // this method can be called, but since we want access to the result data,
+ // we need to have a job object to interrogate
+ //boost::mapreduce::run<wordcount::job>(spec, result);
+
+ wordcount::job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
std::cout << "\nCPU Parallel MapReduce Finished.";
#endif
}
@@ -275,6 +278,8 @@
std::cout << std::endl << "Error: " << e.what();
}
+ typedef std::pair<wordcount::reduce_task::key_type, wordcount::reduce_task::value_type> keyvalue_t;
+
std::cout << std::endl << "\n" << "MapReduce statistics:";
std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk