Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55181 - sandbox/libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-07-30 14:42:38


Author: chenderson
Date: 2009-07-26 14:16:32 EDT (Sun, 26 Jul 2009)
New Revision: 55181
URL: http://svn.boost.org/trac/boost/changeset/55181

Log:
Test harness
Added:
   sandbox/libs/mapreduce/test/
   sandbox/libs/mapreduce/test/mrtest.cpp (contents, props changed)
   sandbox/libs/mapreduce/test/mrtest.vcproj (contents, props changed)

Added: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- (empty file)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-07-26 14:16:32 EDT (Sun, 26 Jul 2009)
@@ -0,0 +1,276 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#if !defined(_DEBUG) && !defined(BOOST_DISABLE_ASSERTS)
+# pragma message("Warning: BOOST_DISABLE_ASSERTS not defined")
+#endif
+
+#include <boost/config.hpp>
+#if defined(BOOST_MSVC)
+# pragma warning(disable: 4244 4512 4267)
+#endif
+
+#include <boost/mapreduce.hpp>
+#include <numeric> // accumulate
+
+#if defined(BOOST_MSVC) && defined(_DEBUG)
+#include <crtdbg.h>
+#endif
+
+// configuration options
+#define WORD_COUNT_MEMORY_MAP_FILE
+#define USE_WORDCOUNT_COMBINER
+//#define USE_IN_MEMORY_INTERMEDIATES
+
+#if defined(_DEBUG)
+#define RUN_SEQUENTIAL_MAP_REDUCE
+#endif
+
+namespace wordcount {
+
+typedef
+#ifdef WORD_COUNT_MEMORY_MAP_FILE
+ std::pair<char const *, char const *>
+#else
+ std::ifstream
+#endif
+map_value_type;
+
+template<typename T>
+struct map_task : public boost::mapreduce::map_task<
+ std::string, // MapKey
+ map_value_type, // MapValue
+ std::string, // IntermediateKey
+ unsigned> // IntermediateValue
+{
+ template<typename Runtime>
+ static void map(Runtime &runtime, std::string const &key, T &value);
+};
+typedef map_task<map_value_type> map_task_type;
+
+struct reduce_task : public boost::mapreduce::reduce_task<unsigned>
+{
+ template<typename Runtime, typename It>
+ static void reduce(Runtime &runtime, std::string const &key, It it, It const ite)
+ {
+ runtime.emit(key, std::accumulate(it, ite, 0));
+ }
+};
+
+template<> template<typename Runtime>
+void
+map_task<
+ std::pair<
+ char const *, char const *> >::map(
+ Runtime &runtime,
+ std::string const &/*key*/,
+ std::pair<char const *, char const *> &value)
+{
+ bool in_word = false;
+ char const *ptr = value.first;
+ char const *end = value.second;
+ char const *word = ptr;
+ for (; ptr != end; ++ptr)
+ {
+ char const ch = std::toupper(*ptr);
+ if (in_word)
+ {
+ if ((ch < 'A' || ch > 'Z') && ch != '\'')
+ {
+ runtime.emit_intermediate(std::string(word,ptr-word), 1);
+ in_word = false;
+ }
+ }
+ else
+ {
+ if (ch >= 'A' && ch <= 'Z')
+ {
+ word = ptr;
+ in_word = true;
+ }
+ }
+ }
+ if (in_word)
+ {
+ BOOST_ASSERT(ptr-word > 0);
+ runtime.emit_intermediate(std::string(word,ptr-word), 1);
+ }
+}
+
+
+template<> template<typename Runtime>
+void
+map_task<std::ifstream>::map(
+ Runtime &runtime,
+ std::string const &/*key*/,
+ std::ifstream &value)
+{
+ while (!value.eof())
+ {
+ std::string word;
+ value >> word;
+ std::transform(word.begin(), word.end(), word.begin(),
+ std::bind1st(
+ std::mem_fun(&std::ctype<char>::tolower),
+ &std::use_facet<std::ctype<char> >(std::locale::classic())));
+
+ size_t length = word.length();
+ size_t const original_length = length;
+ std::string::const_iterator it;
+ for (it=word.begin();
+ it!=word.end() && !std::isalnum(*it, std::locale::classic());
+ ++it)
+ {
+ --length;
+ }
+
+ for (std::string::const_reverse_iterator rit=word.rbegin();
+ length>0 && !std::isalnum(*rit, std::locale::classic());
+ ++rit)
+ {
+ --length;
+ }
+
+ if (length > 0)
+ {
+ if (length == original_length)
+ runtime.emit_intermediate(word, 1);
+ else
+ runtime.emit_intermediate(std::string(&*it,length), 1);
+ }
+ }
+}
+
+
+class combiner;
+
+typedef
+boost::mapreduce::job<
+ wordcount::map_task_type
+ , wordcount::reduce_task
+#ifdef USE_WORDCOUNT_COMBINER
+ , wordcount::combiner
+#else
+ , boost::mapreduce::null_combiner
+#endif
+#ifdef USE_IN_MEMORY_INTERMEDIATES
+ , boost::mapreduce::datasource::directory_iterator<wordcount::map_task_type>
+ , boost::mapreduce::intermediates::in_memory<wordcount::map_task_type>
+#endif
+> job;
+
+
+
+class combiner
+{
+ public:
+ void start(job::map_task_type::intermediate_key_type const &)
+ {
+ total_ = 0;
+ }
+
+ template<typename IntermediateStore>
+ void finish(job::map_task_type::intermediate_key_type const &key, IntermediateStore &intermediate_store)
+ {
+ if (total_ > 0)
+ intermediate_store.insert(key, total_);
+ }
+
+ void operator()(job::map_task_type::intermediate_value_type const &value)
+ {
+ total_ += value;
+ }
+
+ private:
+ unsigned total_;
+};
+
+} // namespace wordcount
+
+
+
+int main(int argc, char **argv)
+{
+#if defined(BOOST_MSVC) && defined(_DEBUG)
+// _CrtSetBreakAlloc(380);
+ _CrtSetDbgFlag(_CrtSetDbgFlag(_CRTDBG_REPORT_FLAG) | _CRTDBG_LEAK_CHECK_DF);
+#endif
+
+ std::cout << "MapReduce Wordcount Application";
+ if (argc < 2)
+ {
+ std::cerr << "Usage: wordcount directory [num_map_tasks]\n";
+ return 1;
+ }
+
+ boost::mapreduce::specification spec;
+ boost::mapreduce::results result;
+
+ spec.input_directory = argv[1];
+ wordcount::job::datasource_type datasource(spec);
+
+ std::cout << "\n" << std::max(1,(int)boost::thread::hardware_concurrency()) << " CPU cores";
+ std::cout << "\n" << typeid(wordcount::job).name() << "\n";
+
+ try
+ {
+#ifdef RUN_SEQUENTIAL_MAP_REDUCE
+ std::cout << "\nRunning Sequential MapReduce...";
+
+ spec.map_tasks = 1;
+
+ wordcount::job job(datasource, spec);
+ job.run<boost::mapreduce::schedule_policy::sequential<wordcount::job> >(result);
+ std::cout << "\nFinished.";
+#else
+ std::cout << "\nRunning CPU Parallel MapReduce...";
+
+ if (argc > 2)
+ spec.map_tasks = atoi(argv[2]);
+
+ boost::mapreduce::run<wordcount::job>(spec, result);
+ std::cout << "\nCPU Parallel MapReduce Finished.";
+#endif
+ }
+ catch (std::exception &e)
+ {
+ std::cout << std::endl << "Error: " << e.what();
+ }
+
+ std::cout << std::endl << "\n" << "MapReduce statistics:";
+ std::cout << "\n " << "MapReduce job runtime : " << result.job_runtime << " seconds, of which...";
+ std::cout << "\n " << " Map phase runtime : " << result.map_runtime << " seconds";
+ std::cout << "\n " << " Reduce phase runtime : " << result.reduce_runtime << " seconds";
+ std::cout << "\n\n " << "Map:";
+ std::cout << "\n " << "Total Map keys : " << result.counters.map_keys_executed;
+ std::cout << "\n " << "Map keys processed : " << result.counters.map_keys_completed;
+ std::cout << "\n " << "Map key processing errors : " << result.counters.map_key_errors;
+ std::cout << "\n " << "Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
+ std::cout << "\n " << "Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n " << "Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
+ std::cout << "\n " << "Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+
+ std::cout << "\n\n " << "Reduce:";
+ std::cout << "\n " << "Total Reduce keys : " << result.counters.reduce_keys_executed;
+ std::cout << "\n " << "Reduce keys processed : " << result.counters.reduce_keys_completed;
+ std::cout << "\n " << "Reduce key processing errors : " << result.counters.reduce_key_errors;
+ std::cout << "\n " << "Number of Reduce Tasks run (in parallel): " << result.counters.actual_reduce_tasks;
+ std::cout << "\n " << "Number of Result Files : " << result.counters.num_result_files;
+ if (result.reduce_times.size() > 0)
+ {
+ std::cout << "\n " << "Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n " << "Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
+ std::cout << "\n " << "Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ }
+
+ return 0;
+}

Added: sandbox/libs/mapreduce/test/mrtest.vcproj
==============================================================================
--- (empty file)
+++ sandbox/libs/mapreduce/test/mrtest.vcproj 2009-07-26 14:16:32 EDT (Sun, 26 Jul 2009)
@@ -0,0 +1,206 @@
+<?xml version="1.0" encoding="Windows-1252"?>
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="8.00"
+ Name="mrtest"
+ ProjectGUID="{3BC934D3-0EF5-4F82-B902-C1EC4527574D}"
+ RootNamespace="mrtest"
+ Keyword="Win32Proj"
+ >
+ <Platforms>
+ <Platform
+ Name="Win32"
+ />
+ </Platforms>
+ <ToolFiles>
+ </ToolFiles>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)\compiler"
+ ConfigurationType="1"
+ CharacterSet="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions="WIN32_LEAN_AND_MEAN"
+ MinimalRebuild="true"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="3"
+ UsePrecompiledHeader="0"
+ WarningLevel="4"
+ WarnAsError="true"
+ Detect64BitPortabilityProblems="true"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ LinkIncremental="2"
+ AdditionalLibraryDirectories=""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeForWindows98="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCWebDeploymentTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="$(ConfigurationName)"
+ IntermediateDirectory="$(ConfigurationName)\compiler"
+ ConfigurationType="1"
+ CharacterSet="1"
+ WholeProgramOptimization="1"
+ >
+ <Tool
+ Name="VCPreBuildEventTool"
+ />
+ <Tool
+ Name="VCCustomBuildTool"
+ />
+ <Tool
+ Name="VCXMLDataGeneratorTool"
+ />
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"
+ />
+ <Tool
+ Name="VCMIDLTool"
+ />
+ <Tool
+ Name="VCCLCompilerTool"
+ InlineFunctionExpansion="2"
+ AdditionalIncludeDirectories=""
+ PreprocessorDefinitions="WIN32_LEAN_AND_MEAN;BOOST_LIB_DIAGNOSTIC"
+ RuntimeLibrary="2"
+ UsePrecompiledHeader="0"
+ WarningLevel="4"
+ WarnAsError="true"
+ Detect64BitPortabilityProblems="true"
+ DebugInformationFormat="3"
+ />
+ <Tool
+ Name="VCManagedResourceCompilerTool"
+ />
+ <Tool
+ Name="VCResourceCompilerTool"
+ />
+ <Tool
+ Name="VCPreLinkEventTool"
+ />
+ <Tool
+ Name="VCLinkerTool"
+ LinkIncremental="1"
+ AdditionalLibraryDirectories=""
+ GenerateDebugInformation="true"
+ SubSystem="1"
+ OptimizeReferences="2"
+ EnableCOMDATFolding="2"
+ OptimizeForWindows98="1"
+ TargetMachine="1"
+ />
+ <Tool
+ Name="VCALinkTool"
+ />
+ <Tool
+ Name="VCManifestTool"
+ />
+ <Tool
+ Name="VCXDCMakeTool"
+ />
+ <Tool
+ Name="VCBscMakeTool"
+ />
+ <Tool
+ Name="VCFxCopTool"
+ />
+ <Tool
+ Name="VCAppVerifierTool"
+ />
+ <Tool
+ Name="VCWebDeploymentTool"
+ />
+ <Tool
+ Name="VCPostBuildEventTool"
+ />
+ </Configuration>
+ </Configurations>
+ <References>
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cc;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"
+ >
+ <File
+ RelativePath=".\mrtest.cpp"
+ >
+ </File>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"
+ >
+ </Filter>
+ <Filter
+ Name="Resource Files"
+ Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx;tiff;tif;png;wav"
+ UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"
+ >
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk