Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r55190 - sandbox/boost/mapreduce/intermediates
From: cdm.henderson_at_[hidden]
Date: 2009-07-30 14:43:03


Author: chenderson
Date: 2009-07-26 17:21:07 EDT (Sun, 26 Jul 2009)
New Revision: 55190
URL: http://svn.boost.org/trac/boost/changeset/55190

Log:
Added missing file
Added:
   sandbox/boost/mapreduce/intermediates/local_disk.hpp (contents, props changed)

Added: sandbox/boost/mapreduce/intermediates/local_disk.hpp
==============================================================================
--- (empty file)
+++ sandbox/boost/mapreduce/intermediates/local_disk.hpp 2009-07-26 17:21:07 EDT (Sun, 26 Jul 2009)
@@ -0,0 +1,290 @@
+// Boost.MapReduce library
+//
+// Copyright (C) 2009 Craig Henderson.
+// cdm.henderson_at_[hidden]
+//
+// Use, modification and distribution is subject to the
+// Boost Software License, Version 1.0. (See accompanying
+// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
+//
+// For more information, see http://www.boost.org/libs/mapreduce/
+//
+
+#ifndef BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
+#define BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP
+
+#include <iomanip> // setw
+#include <fstream> // linux
+#include <boost/unordered_map.hpp>
+#include <boost/mapreduce/hash_partitioner.hpp>
+
+namespace boost {
+
+namespace mapreduce {
+
+struct null_combiner;
+
+namespace detail {
+
+struct file_merger
+{
+ template<typename List>
+ void operator()(List const &filenames, char const *dest)
+ {
+ buffer_.reset(new char[buffer_size_]);
+
+ std::ofstream outfile(dest, std::ios_base::in | std::ios_base::binary);
+ for (typename List::const_iterator it=filenames.begin(); it!=filenames.end(); ++it)
+ copy_file(*it, outfile);
+ }
+
+ private:
+ void copy_file(std::string const &filename, std::ofstream &outfile)
+ {
+ boost::uint64_t remaining = boost::filesystem::file_size(filename);
+
+ std::ifstream infile(filename.c_str(), std::ios_base::in | std::ios_base::binary);
+ if (!infile.is_open())
+ {
+ std::ostringstream err;
+ err << "Failed to open file: " << filename;
+ BOOST_THROW_EXCEPTION(std::runtime_error(err.str()));
+ }
+
+ std::streamsize read_write_size = buffer_size_;
+ while (remaining > 0)
+ {
+ if (remaining < (boost::uint64_t)read_write_size)
+ read_write_size = (std::streamsize)remaining;
+
+ infile.read(buffer_.get(), read_write_size);
+ if (infile.bad() || infile.fail())
+ {
+ std::ostringstream err;
+ err << "Error reading file " << filename;
+ BOOST_THROW_EXCEPTION(std::runtime_error(err.str()));
+ }
+
+ outfile.write(buffer_.get(), read_write_size);
+ if (outfile.bad() || outfile.fail())
+ {
+ std::ostringstream err;
+ err << "Error writing to file";
+ BOOST_THROW_EXCEPTION(std::runtime_error(err.str()));
+ }
+
+ remaining -= read_write_size;
+ }
+ }
+
+ std::auto_ptr<char> buffer_;
+ static unsigned const buffer_size_ = 1048576;
+};
+
+struct file_sorter
+{
+ bool const operator()(char const *in, char const *out, unsigned const offset) const
+ {
+ return mapreduce::merge_sort(in, out, offset);
+ }
+};
+
+} // namespace detail
+
+namespace intermediates {
+
+template<
+ typename MapTask,
+ typename SortFn=mapreduce::detail::file_sorter,
+ typename MergeFn=mapreduce::detail::file_merger,
+ typename PartitionFn=mapreduce::hash_partitioner>
+class local_disk : boost::noncopyable
+{
+ private:
+ typedef
+ boost::unordered_map<
+ size_t, // hash value of intermediate key (R)
+ std::pair<
+ std::string, // filename
+ boost::shared_ptr<std::ofstream> > > // file stream
+ intermediates_t;
+
+ public:
+ typedef MapTask map_task_type;
+ typedef typename intermediates_t::const_iterator const_iterator;
+
+ local_disk(unsigned const num_partitions)
+ : num_partitions_(num_partitions)
+ {
+ }
+
+ ~local_disk()
+ {
+ try
+ {
+ this->close_files();
+
+ // delete the temporary files
+ for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ boost::filesystem::remove(it->second.first);
+ }
+ catch (std::exception const &/*e*/)
+ {
+// std::cerr << "\nError: " << e.what() << "\n";
+ }
+ }
+
+ bool const insert(typename map_task_type::intermediate_key_type const &key,
+ typename map_task_type::intermediate_value_type const &value)
+ {
+ unsigned const partition = partitioner_(key, num_partitions_);
+ intermediates_t::iterator it = intermediates_.insert(make_pair(partition, intermediates_t::mapped_type())).first;
+ if (it->second.first.empty())
+ {
+ it->second.first = platform::get_temporary_filename();
+ it->second.second.reset(new std::ofstream);
+ }
+
+ if (!it->second.second->is_open())
+ it->second.second->open(it->second.first.c_str(), std::ios_base::out | std::ios_base::app);
+ assert(it->second.second->is_open());
+
+ std::ostringstream key_text_stream;
+ key_text_stream << key;
+ std::string key_text(key_text_stream.str());
+ *it->second.second << std::setw(10) << key_text.length() << "\t" << key << "\t" << value << "\n";
+ return !(it->second.second->bad() || it->second.second->fail());
+ }
+
+ bool const get_partition_filename(size_t const partition, std::string &filename) const
+ {
+ intermediates_t::const_iterator it=completed_intermediates_.find(partition);
+ if (it == completed_intermediates_.end())
+ return false;
+ filename = it->second.first;
+ return true;
+ }
+
+ template<typename FnObj>
+ void combine(FnObj &fn_obj)
+ {
+ this->close_files();
+ for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ {
+ std::string infilename = it->second.first;
+ std::string outfilename = platform::get_temporary_filename();
+
+ // sort the input file
+ SortFn()(infilename.c_str(), outfilename.c_str(), 11);
+ boost::filesystem::remove(infilename);
+ std::swap(infilename, outfilename);
+
+ std::string key, last_key;
+ std::ifstream infile(infilename.c_str());
+ while (!infile.eof())
+ {
+ typename map_task_type::intermediate_value_type value;
+ if (read_record(infile, key, value))
+ {
+ if (key != last_key && key.length() > 0)
+ {
+ if (last_key.length() > 0)
+ fn_obj.finish(last_key, *this);
+ if (key.length() > 0)
+ {
+ fn_obj.start(key);
+ std::swap(key, last_key);
+ }
+ }
+
+ fn_obj(value);
+ }
+ }
+
+ if (last_key.length() > 0)
+ fn_obj.finish(last_key, *this);
+
+ infile.close();
+
+ boost::filesystem::remove(infilename);
+ }
+ this->close_files();
+
+ assert(completed_intermediates_.size() == 0);
+ std::swap(completed_intermediates_, intermediates_);
+ }
+
+ void combine(mapreduce::null_combiner &/*fn_obj*/)
+ {
+ this->close_files();
+ assert(completed_intermediates_.size() == 0);
+ std::swap(completed_intermediates_, intermediates_);
+ }
+
+ static bool const read_record(std::ifstream &infile,
+ typename map_task_type::intermediate_key_type &key,
+ typename map_task_type::intermediate_value_type &value)
+ {
+#if defined(__SGI_STL_PORT)
+ size_t keylen;
+#else
+ std::streamsize keylen;
+#endif
+ keylen = 0;
+ infile >> keylen;
+ if (infile.eof() || keylen == 0)
+ return false;
+
+ char tab;
+ infile.read(&tab, 1);
+
+ key.resize(keylen);
+ infile.read(&*key.begin(), keylen);
+ infile.read(&tab, 1);
+ infile >> value;
+ return true;
+ }
+
+ template<typename List>
+ static std::string merge_and_sort(List const &filenames)
+ {
+ assert(filenames.size() > 0);
+
+ // first merge
+ std::string const temp = platform::get_temporary_filename();
+ MergeFn()(filenames, temp.c_str());
+
+ // then sort
+ typename List::const_iterator it=filenames.begin();
+ SortFn()(temp.c_str(), it->c_str(), 11);
+
+ // delete merge source files
+ boost::filesystem::remove(temp);
+ std::for_each(++it, filenames.end(), boost::bind(detail::delete_file, _1));
+
+ // return the resulting filename
+ return *filenames.begin();
+ }
+
+ private:
+ void close_files(void)
+ {
+ for (intermediates_t::iterator it=intermediates_.begin(); it!=intermediates_.end(); ++it)
+ if (it->second.second && it->second.second->is_open())
+ it->second.second->close();
+ }
+
+ private:
+ unsigned const num_partitions_;
+ intermediates_t intermediates_;
+ intermediates_t completed_intermediates_;
+ PartitionFn partitioner_;
+};
+
+} // namespace intermediates
+
+} // namespace mapreduce
+
+} // namespace boost
+
+#endif // BOOST_MAPREDUCE_LOCAL_DISK_INTERMEDIATES_HPP


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk