Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55453 - in sandbox: boost boost/mapreduce boost/mapreduce/schedule_policy libs/mapreduce/examples/wordcount libs/mapreduce/test
From: cdm.henderson_at_[hidden]
Date: 2009-08-07 17:55:02


Author: chenderson
Date: 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
New Revision: 55453
URL: http://svn.boost.org/trac/boost/changeset/55453

Log:
Sub-second timing of map/reduce tasks
Text files modified:
   sandbox/boost/mapreduce.hpp | 18 ++++++------------
   sandbox/boost/mapreduce/job.hpp | 20 +++++++++++---------
   sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp | 11 ++++++-----
   sandbox/boost/mapreduce/schedule_policy/sequential.hpp | 10 ++++++----
   sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp | 4 ++--
   sandbox/libs/mapreduce/test/mrtest.cpp | 4 ++--
   6 files changed, 33 insertions(+), 34 deletions(-)

Modified: sandbox/boost/mapreduce.hpp
==============================================================================
--- sandbox/boost/mapreduce.hpp (original)
+++ sandbox/boost/mapreduce.hpp 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -24,6 +24,7 @@
 #include <boost/config.hpp>
 #include <boost/noncopyable.hpp>
 #include <boost/cstdint.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 
 namespace boost {
 
@@ -79,18 +80,11 @@
         }
     } counters;
 
- results()
- : job_runtime(0),
- map_runtime(0),
- reduce_runtime(0)
- {
- }
-
- time_t job_runtime;
- time_t map_runtime;
- time_t reduce_runtime;
- std::vector<time_t> map_times;
- std::vector<time_t> reduce_times;
+ boost::posix_time::time_duration job_runtime;
+ boost::posix_time::time_duration map_runtime;
+ boost::posix_time::time_duration reduce_runtime;
+ std::vector<boost::posix_time::time_duration> map_times;
+ std::vector<boost::posix_time::time_duration> reduce_times;
 };
 
 } // namespace mapreduce

Modified: sandbox/boost/mapreduce/job.hpp
==============================================================================
--- sandbox/boost/mapreduce/job.hpp (original)
+++ sandbox/boost/mapreduce/job.hpp 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -12,7 +12,7 @@
 
 #ifndef BOOST_MAPREDUCE_JOB_HPP
 #define BOOST_MAPREDUCE_JOB_HPP
-
+
 namespace boost {
 
 namespace mapreduce {
@@ -174,17 +174,19 @@
     template<typename SchedulePolicy>
     void run(SchedulePolicy &schedule, results &result)
     {
- time_t const start_time = time(NULL);
+ using namespace boost::posix_time;
+ ptime start_time(microsec_clock::universal_time());
         schedule(*this, result);
- result.job_runtime = time(NULL) - start_time;
+ result.job_runtime = microsec_clock::universal_time() - start_time;
     }
 
     template<typename Sync>
     bool const run_map_task(void *key, results &result, Sync &sync)
     {
- bool success = true;
- time_t const start_time = time(NULL);
+ using namespace boost::posix_time;
+ ptime start_time(microsec_clock::universal_time());
 
+ bool success = true;
         try
         {
             std::auto_ptr<typename map_task_type::key_type>
@@ -226,7 +228,7 @@
             ++result.counters.map_key_errors;
             success = false;
         }
- result.map_times.push_back(time(NULL)-start_time);
+ result.map_times.push_back(microsec_clock::universal_time() - start_time);
 
         return success;
     }
@@ -243,8 +245,8 @@
 
     bool const run_reduce_task(unsigned const partition, results &result)
     {
- time_t const start_time = time(NULL);
-
+ using namespace boost::posix_time;
+ ptime start_time(microsec_clock::universal_time());
         try
         {
             reduce_task_runner runner(
@@ -261,7 +263,7 @@
             ++result.counters.reduce_key_errors;
         }
         
- result.reduce_times.push_back(time(NULL)-start_time);
+ result.reduce_times.push_back(microsec_clock::universal_time()-start_time);
 
         return true;
     }

Modified: sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/cpu_parallel.hpp 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -87,11 +87,12 @@
         boost::mutex m1, m2;
 
         // run the Map Tasks
- time_t start_time = time(NULL);
- boost::thread_group map_threads;
+ using namespace boost::posix_time;
+ ptime start_time(microsec_clock::universal_time());
 
         unsigned const map_tasks = std::max(num_cpus,std::min(num_cpus, job.number_of_map_tasks()));
 
+ boost::thread_group map_threads;
         for (unsigned loop=0; loop<map_tasks; ++loop)
         {
             boost::shared_ptr<results> this_result(new results);
@@ -107,10 +108,10 @@
             map_threads.add_thread(thread);
         }
         map_threads.join_all();
- result.map_runtime = time(NULL) - start_time;
+ result.map_runtime = microsec_clock::universal_time() - start_time;
 
         // run the Reduce Tasks
- start_time = time(NULL);
+ start_time = microsec_clock::universal_time();
         boost::thread_group reduce_threads;
 
         unsigned const reduce_tasks =
@@ -132,7 +133,7 @@
             reduce_threads.add_thread(thread);
         }
         reduce_threads.join_all();
- result.reduce_runtime = time(NULL) - start_time;
+ result.reduce_runtime = microsec_clock::universal_time() - start_time;
 
         // we're done with the map/reduce job, collate the statistics before returning
         for (all_results_t::const_iterator it=all_results.begin();

Modified: sandbox/boost/mapreduce/schedule_policy/sequential.hpp
==============================================================================
--- sandbox/boost/mapreduce/schedule_policy/sequential.hpp (original)
+++ sandbox/boost/mapreduce/schedule_policy/sequential.hpp 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -36,19 +36,21 @@
   public:
     void operator()(Job &job, results &result)
     {
+ using namespace boost::posix_time;
+ ptime start_time(microsec_clock::universal_time());
+
         // Map Tasks
- time_t start_time = time(NULL);
         void *key = 0;
         detail::null_lock nolock;
         while (job.get_next_map_key(key) && job.run_map_task(key, result, nolock))
             ;
- result.map_runtime = time(NULL) - start_time;
+ result.map_runtime = microsec_clock::universal_time() - start_time;
 
         // Reduce Tasks
- start_time = time(NULL);
+ start_time(microsec_clock::universal_time());
         for (unsigned partition=0; partition<job.number_of_partitions(); ++partition)
             job.run_reduce_task(partition, result);
- result.reduce_runtime = time(NULL) - start_time;
+ result.reduce_runtime = microsec_clock::universal_time() - start_time;
 
         result.counters.actual_map_tasks = 1;
         result.counters.actual_reduce_tasks = 1;

Modified: sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp
==============================================================================
--- sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp (original)
+++ sandbox/libs/mapreduce/examples/wordcount/wordcount.cpp 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -174,7 +174,7 @@
         std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
         std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
         std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
 
         std::cout << "\n\n Reduce:";
         std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
@@ -186,7 +186,7 @@
         {
             std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
             std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
         }
 
         wordcount::job::const_result_iterator it = job.begin_results();

Modified: sandbox/libs/mapreduce/test/mrtest.cpp
==============================================================================
--- sandbox/libs/mapreduce/test/mrtest.cpp (original)
+++ sandbox/libs/mapreduce/test/mrtest.cpp 2009-08-07 17:55:00 EDT (Fri, 07 Aug 2009)
@@ -298,7 +298,7 @@
     std::cout << "\n Number of Map Tasks run (in parallel) : " << result.counters.actual_map_tasks;
     std::cout << "\n Fastest Map key processed in : " << *std::min_element(result.map_times.begin(), result.map_times.end()) << " seconds";
     std::cout << "\n Slowest Map key processed in : " << *std::max_element(result.map_times.begin(), result.map_times.end()) << " seconds";
- std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << "\n Average time to process Map keys : " << std::accumulate(result.map_times.begin(), result.map_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
 
     std::cout << "\n\n Reduce:";
     std::cout << "\n Total Reduce keys : " << result.counters.reduce_keys_executed;
@@ -310,7 +310,7 @@
     {
         std::cout << "\n Fastest Reduce key processed in : " << *std::min_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
         std::cout << "\n Slowest Reduce key processed in : " << *std::max_element(result.reduce_times.begin(), result.reduce_times.end()) << " seconds";
- std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::int64_t()) / result.map_times.size() << " seconds";
+ std::cout << "\n Average time to process Reduce keys : " << std::accumulate(result.reduce_times.begin(), result.reduce_times.end(), boost::posix_time::time_duration()) / result.map_times.size() << " seconds";
     }
 
     wordcount::job::const_result_iterator it = job.begin_results();


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk