|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r55461 - sandbox/libs/mapreduce/doc
From: cdm.henderson_at_[hidden]
Date: 2009-08-08 06:11:29
Author: chenderson
Date: 2009-08-08 06:11:28 EDT (Sat, 08 Aug 2009)
New Revision: 55461
URL: http://svn.boost.org/trac/boost/changeset/55461
Log:
Updated documentation
Text files modified:
sandbox/libs/mapreduce/doc/future.html | 5
sandbox/libs/mapreduce/doc/tutorial.html | 14 -
sandbox/libs/mapreduce/doc/wordcount.html | 250 +++++++++++++++++++++++++--------------
3 files changed, 163 insertions(+), 106 deletions(-)
Modified: sandbox/libs/mapreduce/doc/future.html
==============================================================================
--- sandbox/libs/mapreduce/doc/future.html (original)
+++ sandbox/libs/mapreduce/doc/future.html 2009-08-08 06:11:28 EDT (Sat, 08 Aug 2009)
@@ -56,11 +56,6 @@
and eliminating the two would improve performance considerably).</p>
</li>
<li>
- <p>The only intermediate handler currently provided by the library is the <code>intermediates::local_disk<></code>
- policy class. An early implementation of the library used in-memory storage for intermediates, and it
- may be useful to redevelop this as a fully-fledged intermediate policy class.</p>
- </li>
- <li>
<p>An extension to the <code>intermediates::local_disk<></code> policy class could be to compress
the intermediate files, using the Boost.Iostreams zip/bzip2 compression libraries. This is a
long-term item that will be very useful when the library is extended to supported cross-machine
Modified: sandbox/libs/mapreduce/doc/tutorial.html
==============================================================================
--- sandbox/libs/mapreduce/doc/tutorial.html (original)
+++ sandbox/libs/mapreduce/doc/tutorial.html 2009-08-08 06:11:28 EDT (Sat, 08 Aug 2009)
@@ -77,16 +77,12 @@
<ul>
<li>Provide type definitions for Map Key (<code>k1</code>) and Map Value (<code>v1</code>);
<code>key_type</code> and <code> value_type</code></li>
- <li>Provide type definitions for Intermediate Key (<code>k2</code>) and Intermediate Value (<code>v2</code>);
- <code>intermediate_key_type</code> and <code> intermediate_value_type</code></li>
<li>Implement a static mapper function <code>map()</code></li>
</ul>
<pre>
struct map_task : public boost::mapreduce::map_task<
- std::string, // MapKey
- std::pair<char const *, char const *>, // MapValue
- std::string, // IntermediateKey
- unsigned> // IntermediateValue
+ std::string, // MapKey
+ std::pair<char const *, char const *> > // MapValue
{
template<typename Runtime>
static void map(Runtime &runtime, std::string const &key, value_type const &value);
@@ -104,12 +100,12 @@
<h2>ReduceTask</h2>
<p>Requirements of a ReduceTask function object are</p>
<ul>
- <li>Provide type definitions for Reduce Value (<code>v2</code>);
- <code> value_type</code></li>
+ <li>Provide type definitions for Reduce Key (<code>k2</code>) and Reduce Value (<code>v2</code>);
+ <code>key_type</code> and <code> value_type</code></li>
<li>Implement a static reducer function <code>reduce()</code></li>
</ul>
<pre>
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
{
template<typename Runtime, typename It>
static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
Modified: sandbox/libs/mapreduce/doc/wordcount.html
==============================================================================
--- sandbox/libs/mapreduce/doc/wordcount.html (original)
+++ sandbox/libs/mapreduce/doc/wordcount.html 2009-08-08 06:11:28 EDT (Sat, 08 Aug 2009)
@@ -52,9 +52,9 @@
<h2>MapTask</h2>
<p>
- The MapTask will be implemented by a function-object <code>wordcount::map_task</code>. There are four required
- data types to be defined by the MapTask. These are defined by the base class via template parameters. The types
- are the <code>key</code>/<code>value</code> types of the input and output of the map task.
+ The MapTask will be implemented by a function-object <code>wordcount::map_task</code>. There are two required
+ data types to be defined by the MapTask. These are defined by the base class via template parameters. These are
+ <code>key</code>/<code>value</code> types of the input of the map task.
</p>
<p>
The <code>map</code> function takes three parameters; the <code>runtime</code> which is passed by the MapReduce
@@ -72,10 +72,8 @@
</p>
<pre>
struct map_task : public boost::mapreduce::map_task<
- std::string, // MapKey
- std::pair<char const *, char const *>, // MapValue
- std::string, // IntermediateKey
- unsigned> // IntermediateValue
+ std::string, // MapKey
+ std::pair<char const *, char const *> > // MapValue
{
template<typename Runtime>
static void map(Runtime &runtime, std::string const &/*key*/, value_type &value)
@@ -116,8 +114,8 @@
<h2>ReduceTask</h2>
<p>
The ReduceTask will be implemented by a function-object <code>wordcount::reduce_task</code>. This
- functor is derived from the library's <code>reduce_task</code> class which takes a single template
- parameter to define the <code>value</code> type output of the reduce task.
+ functor is derived from the library's <code>reduce_task</code> class which takes two template
+ parameters to define the <code>key</code> and <code>value</code> types output of the reduce task.
</p>
<p>
The <code>reduce</code> function takes four parameters; the <code>runtime</code> object is the library's
@@ -130,7 +128,7 @@
constructor of the <code>reduce_task</code> object.
</p>
<pre>
-struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+struct reduce_task : public boost::mapreduce::reduce_task<std::string, unsigned>
{
template<typename Runtime, typename It>
static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
@@ -142,7 +140,7 @@
<h2>Type Definitions</h2>
<p>
- For convenience, brevity and maintainability, define a <code>job</code> type for the MapReduce job.
+ For convenience, brevity and maintainability, we define a <code>job</code> type for the MapReduce job.
This local <code>job</code> type will be an defined in terms of the library's <code>mapreduce::job</code>
class with template parameters specific to the Word Count application.
</p>
@@ -154,26 +152,26 @@
wordcount::reduce_task>
job;
</pre>
-<p>The class <code>mapreduce::job</code> actually has 5 template parameters. The first two must be supplied, the last
+<p>The class <code>mapreduce::job</code> actually has six template parameters. The first two must be supplied, the last
three have default values. The definition above is actually equivalent to</p>
<pre>
-typedef
class boost::mapreduce::job<
struct wordcount::map_task,
struct wordcount::reduce_task,
- class wordcount::combiner,
+ struct boost::mapreduce::null_combiner,
class boost::mapreduce::datasource::directory_iterator<
struct wordcount::map_task,
class boost::mapreduce::datasource::file_handler<
- class stlp_std::basic_string<char,class stlp_std::char_traits<char>,
- class stlp_std::allocator<char> >,
- struct stlp_std::pair<char const *,char const *> > >,
- class boost::mapreduce::intermediates::local_disk<
+ class std::basic_string<
+ char,class std::char_traits<char>,
+ class std::allocator<char> >,
+ struct std::pair<char const *,char const *> > >,
+ class boost::mapreduce::intermediates::in_memory<
struct wordcount::map_task,
- struct boost::mapreduce::detail::file_sorter,
- struct boost::mapreduce::detail::file_merger,
- struct boost::mapreduce::hash_partitioner> >
-job;
+ struct wordcount::reduce_task,
+ struct boost::mapreduce::hash_partitioner>,
+ class boost::mapreduce::intermediates::reduce_null_output<
+ struct wordcount::map_task,struct wordcount::reduce_task> >
</pre>
<h2>Program</h2>
@@ -242,52 +240,90 @@
std::cout << "\n" << typeid(wordcount::job).name() << "\n";
boost::mapreduce::results result;
+ wordcount::job::datasource_type datasource(spec);
try
{
if (argc > 2)
spec.map_tasks = atoi(argv[2]);
- std::cout << "\nRunning CPU Parallel MapReduce...";
- boost::mapreduce::run<wordcount::job>(spec, result);
- std::cout << "\nCPU Parallel MapReduce Finished.";
- }
- catch (std::exception &e)
- {
- std::cout << std::endl << "Error: " << e.what();
- }
-
-...
+ if (argc > 3)
+ spec.reduce_tasks = atoi(argv[3]);
+ else
+ spec.reduce_tasks = std::max(1U,boost::thread::hardware_concurrency());
+
+ std::cout << "\nRunning Parallel WordCount MapReduce...";
+ wordcount::job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::cpu_parallel<wordcount::job> >(result);
+ std::cout << "\nMapReduce Finished.";
</pre>
<p>
- At the end of the MapReduce job execution, the results can be written to the screen.
+ At the end of the MapReduce job execution, the statistics can be written to the screen.
</p>
<pre>
-std::cout << std::endl << "\n" << "MapReduce statistics:";
-std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
-std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";
-std::cout << "\n " << " Reduce phase runtime : " << result.reduce_runtime << " seconds";
-std::cout << "\n\n " << "Map:";
-std::cout << "\n " << "Total Map keys : " << result.counters.map_tasks;
-std::cout << "\n " << "Map keys processed : " << result.counters.map_tasks_completed;
-std::cout << "\n " << "Map key processing errors : " << result.counters.map_tasks_error;
-std::cout << "\n " << "Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
-std::cout << "\n " << "Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-std::cout << "\n " << "Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
-std::cout << "\n " << "Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
-
-std::cout << "\n\n " << "Reduce:";
-std::cout << "\n " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
-std::cout << "\n " << "Number of Result Files : " << result.counters.num_result_files;
-std::cout << "\n " << "Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-std::cout << "\n " << "Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
-std::cout << "\n " << "Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << std::endl << "\nMapReduce statistics:";
+ std::cout << "\n MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n Map:";
+ std::cout << "\n Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
+
+ std::cout << "\n\n Reduce:";
+ std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n Number of Result Files : " << result.counters.num_result_files;
+ if (result.reduce_times.size() > 0)
+ {
+ std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
+ }
+</pre>
+
+<p>and the top 10 words in descending frequency:</p>
+<pre>
+ wordcount::job::const_result_iterator it = job.begin_results();
+ wordcount::job::const_result_iterator ite = job.end_results();
+ if (it != ite)
+ {
+ typedef std::list<wordcount::job::keyvalue_t> frequencies_t;
+ frequencies_t frequencies;
+ frequencies.push_back(*it);
+ frequencies_t::reverse_iterator it_smallest = frequencies.rbegin();
+ for (++it; it!=ite; ++it)
+ {
+ if (frequencies.size() < 10) // show top 10
+ {
+ frequencies.push_back(*it);
+ if (it->second < it_smallest->second)
+ it_smallest = frequencies.rbegin();
+ }
+ else if (it->second > it_smallest->second)
+ {
+ *it_smallest = *it;
+ it_smallest = std::min_element(frequencies.rbegin(), frequencies.rend(), boost::mapreduce::detail::less_2nd<wordcount::job::keyvalue_t>);
+ }
+ }
+
+ frequencies.sort(boost::mapreduce::detail::greater_2nd<wordcount::job::keyvalue_t>);
+ std::cout << "\n\nMapReduce results:";
+ for (frequencies_t::const_iterator freq=frequencies.begin(); freq!=frequencies.end(); ++freq)
+ std::cout << "\n" << freq->first << "\t" << freq->second;
+ }
</pre>
<h2>Output</h2>
<p>
- The wordcount program was run on a sample dataset consists of six plain text files consisting
- a total of 96 MB (100,628,434 bytes). The smallest file is 163 KB (167,529 bytes) and the largest
- is 88.1 MB (92,392,601 bytes).
+ The wordcount program build using Microsoft Visual Studio 2005 and STLPort. It was run on a sample
+ dataset consists of six plain text files consisting a total of 96 MB (100,628,434 bytes). The
+ smallest file is 163 KB (167,529 bytes) and the largest is 88.1 MB (92,392,601 bytes).
</p>
<pre>
MapReduce Wordcount Application
@@ -297,36 +333,51 @@
directory_iterator<struct wordcount::map_task,class boost::mapreduce::datasource
::file_handler<class stlp_std::basic_string<char,class stlp_std::char_traits<cha
r>,class stlp_std::allocator<char> >,struct stlp_std::pair<char const *,char con
-st *> > >,class boost::mapreduce::intermediates::local_disk<struct wordcount::ma
-p_task,struct boost::mapreduce::detail::file_sorter,struct boost::mapreduce::det
-ail::file_merger,struct boost::mapreduce::hash_partitioner> >
+st *> > >,class boost::mapreduce::intermediates::in_memory<struct wordcount::map
+_task,struct wordcount::reduce_task,struct boost::mapreduce::hash_partitioner>,c
+lass boost::mapreduce::intermediates::reduce_null_output<struct wordcount::map_t
+ask,struct wordcount::reduce_task> >
-Running CPU Parallel MapReduce...
-CPU Parallel MapReduce Finished.
+Running Parallel WordCount MapReduce...
+MapReduce Finished.
MapReduce statistics:
- MapReduce job runtime : 220 seconds, of which...
- Map phase runtime : 82 seconds
- Reduce phase runtime : 138 seconds
+ MapReduce job runtime : 00:00:23.031250 seconds, of which.
+..
+ Map phase runtime : 00:00:20.390625 seconds
+ Reduce phase runtime : 00:00:02.640625 seconds
Map:
Total Map keys : 100
Map keys processed : 100
Map key processing errors : 0
Number of Map Tasks run (in parallel) : 2
- Fastest Map key processed in : 0 seconds
- Slowest Map key processed in : 3 seconds
- Average time to process Map keys : 1 seconds
+ Fastest Map key processed in : 00:00:00.046875 seconds
+ Slowest Map key processed in : 00:00:00.531250 seconds
+ Average time to process Map keys : 00:00:00.406093 seconds
Reduce:
- Total Reduce keys : 153784
- Reduce keys processed : 0
+ Total Reduce keys : 120925
+ Reduce keys processed : 120925
Reduce key processing errors : 0
Number of Reduce Tasks run (in parallel): 2
- Number of Result Files : 10
- Fastest Reduce key processed in : 16 seconds
- Slowest Reduce key processed in : 44 seconds
- Average time to process Reduce keys : 2 seconds</pre>
+ Number of Result Files : 2
+ Fastest Reduce key processed in : 00:00:02.500000 seconds
+ Slowest Reduce key processed in : 00:00:02.640625 seconds
+ Average time to process Reduce keys : 00:00:00.051406 seconds
+
+MapReduce results:
+the 817758
+of 441398
+to 426628
+and 410033
+a 327046
+in 282907
+that 212299
+is 204080
+you 162297
+i 156817
+</pre>
<h2>Adding a Combiner</h2>
<p>
@@ -367,7 +418,7 @@
<p>
The <code>combiner</code> runs as a part of the Map Task, hence the time
-taken for the Map phase is significantly increased with the introduction
+taken for the Map phase is slightly increased with the introduction
of a combiner, but the Reduce phase is reduce almost no time at all.
</p>
@@ -379,36 +430,51 @@
rator<struct wordcount::map_task,class boost::mapreduce::datasource::file_handle
r<class stlp_std::basic_string<char,class stlp_std::char_traits<char>,class stlp
_std::allocator<char> >,struct stlp_std::pair<char const *,char const *> > >,cla
-ss boost::mapreduce::intermediates::local_disk<struct wordcount::map_task,struct
- boost::mapreduce::detail::file_sorter,struct boost::mapreduce::detail::file_mer
-ger,struct boost::mapreduce::hash_partitioner> >
+ss boost::mapreduce::intermediates::in_memory<struct wordcount::map_task,struct
+wordcount::reduce_task,struct boost::mapreduce::hash_partitioner>,class boost::m
+apreduce::intermediates::reduce_null_output<struct wordcount::map_task,struct wo
+rdcount::reduce_task> >
-Running CPU Parallel MapReduce...
-CPU Parallel MapReduce Finished.
+Running Parallel WordCount MapReduce...
+MapReduce Finished.
MapReduce statistics:
- MapReduce job runtime : 136 seconds, of which...
- Map phase runtime : 125 seconds
- Reduce phase runtime : 11 seconds
+ MapReduce job runtime : 00:00:21.468750 seconds, of which.
+..
+ Map phase runtime : 00:00:21.046875 seconds
+ Reduce phase runtime : 00:00:00.421875 seconds
Map:
- Total Map keys : 100
- Map keys processed : 100
+ Total Map keys : 102
+ Map keys processed : 102
Map key processing errors : 0
Number of Map Tasks run (in parallel) : 2
- Fastest Map key processed in : 0 seconds
- Slowest Map key processed in : 4 seconds
- Average time to process Map keys : 2 seconds
+ Fastest Map key processed in : 00:00:00.171875 seconds
+ Slowest Map key processed in : 00:00:00.500000 seconds
+ Average time to process Map keys : 00:00:00.412224 seconds
Reduce:
- Total Reduce keys : 153784
- Reduce keys processed : 0
+ Total Reduce keys : 123748
+ Reduce keys processed : 123748
Reduce key processing errors : 0
Number of Reduce Tasks run (in parallel): 2
- Number of Result Files : 10
- Fastest Reduce key processed in : 2 seconds
- Slowest Reduce key processed in : 3 seconds
- Average time to process Reduce keys : 0 seconds</pre>
+ Number of Result Files : 2
+ Fastest Reduce key processed in : 00:00:00.406250 seconds
+ Slowest Reduce key processed in : 00:00:00.421875 seconds
+ Average time to process Reduce keys : 00:00:00.008118 seconds
+
+MapReduce results:
+the 1115050
+of 615296
+and 545303
+to 475179
+a 336756
+in 327755
+that 289805
+he 219652
+is 207698
+it 197484
+</pre>
<h2>Source Code</h2>
<p>The full source code for the Word Count example can be found in
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk