Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r63910 - in sandbox/transaction: boost/transact/detail libs/transact/perf libs/transact/test
From: bob.s.walters_at_[hidden]
Date: 2010-07-11 22:38:15


Author: bobwalters
Date: 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
New Revision: 63910
URL: http://svn.boost.org/trac/boost/changeset/63910

Log:
Windows direct and OS-buffered ofile with directory synchronization.
Refined filewriteperf test

Text files modified:
   sandbox/transaction/boost/transact/detail/buffering_file.hpp | 73 ++++++++++++++++++++++++---------
   sandbox/transaction/boost/transact/detail/file.hpp | 84 +++++++++++++++++++++++++++++++--------
   sandbox/transaction/boost/transact/detail/syncing_file.hpp | 85 ++++++++++++++++++---------------------
   sandbox/transaction/libs/transact/perf/filewrite.cpp | 56 +++++++++++++++++--------
   sandbox/transaction/libs/transact/test/Jamfile.v2 | 2
   5 files changed, 196 insertions(+), 104 deletions(-)

Modified: sandbox/transaction/boost/transact/detail/buffering_file.hpp
==============================================================================
--- sandbox/transaction/boost/transact/detail/buffering_file.hpp (original)
+++ sandbox/transaction/boost/transact/detail/buffering_file.hpp 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -17,25 +17,51 @@
 namespace transact{
 namespace detail{
 
+template<std::size_t Capacity, bool direct_io>
+struct ofile_buffer {
+ static const bool direct = direct_io;
+ char data[Capacity];
+};
+
+#ifdef _WIN32
+// Direct I/O supported on Windows given that there are only two ways
+// to achieve synchronized sequaltial disk I/O - flush the system I/O buffers
+// or use Windows direct I/O. Not supported on any other OS. Although O_DIRECT
+// supported on many, strong
+template<std::size_t Capacity>
+struct ofile_buffer<Capacity,true> {
+ static const bool direct = true;
+ char *data;
+
+ ofile_buffer() {
+ // direct I/O requires pagesize alignment. This assumes
+ // Capacity is > pagesize, but not necessarilly a multiple of it.
+ int alignment=1; // largest power of 2 >= Capacity
+ for (std::size_t i=Capacity; (i>>=1); alignment<<=1 )
+ ;
+ data = (char*)_aligned_malloc(Capacity, alignment);
+ };
+ ~ofile_buffer() {
+ _aligned_free(data);
+ };
+};
+#endif
+
+
 template<class Base,std::size_t Capacity>
 class buffering_seq_ofile{
 public:
     typedef typename Base::size_type size_type;
+ static const bool direct_io = Base::has_direct_io;
+
     explicit buffering_seq_ofile(std::string const &name)
         : base(name)
- , size(0){
-#ifdef _WIN32
- // support possibility that Base is using unbuffered I/O
- // requiring specific buffer memory alignment
- int alignment=1; // largest power of 2 >= Capacity
- for (std::size_t i=Capacity; (i>>=1); alignment<<=1 )
- buffer = (char*)_aligned_malloc(Capacity, alignment);
-#endif
- }
+ , size(0)
+ { }
     template<class Size>
     void write(void const *data,Size s){
         if(this->size + s <= Capacity){
- std::memcpy(this->buffer+this->size,data,s);
+ std::memcpy(this->buffer.data+this->size,data,s);
             this->size+=s;
         }else this->write_overflow(data,s);
     }
@@ -58,18 +84,27 @@
             std::cerr << "ignored exception" << std::endl;
 #endif
         }
-#ifdef _WIN32
- _aligned_free(buffer);
-#endif
     }
 private:
     void write_overflow(void const *data,std::size_t s){
         BOOST_ASSERT(this->size + s > Capacity);
- if(this->size == 0){
+ if (direct_io) {
+ while (this->size + s > Capacity) {
+ std::size_t write=Capacity - this->size;
+ std::memcpy(this->buffer.data+this->size,data,write);
+ this->size=Capacity;
+ this->flush_buffer();
+ data = static_cast<char const *>(data)+write;
+ s-=write;
+ }
+ if (s) {
+ this->write(data,s);
+ }
+ }else if(this->size == 0){
             this->base.write(data,s);
         }else{
             std::size_t write=Capacity - this->size;
- std::memcpy(this->buffer+this->size,data,write);
+ std::memcpy(this->buffer.data+this->size,data,write);
             this->size=Capacity;
             this->flush_buffer();
             this->write(static_cast<char const *>(data)+write,s-write);
@@ -77,17 +112,13 @@
     }
     void flush_buffer(){
         if(this->size > 0){
- this->base.write(this->buffer,this->size);
+ this->base.write(this->buffer.data,this->size);
             this->size=0;
         }
     }
 
     Base base;
-#ifdef _WIN32
- char *buffer;
-#else
- char buffer[Capacity];
-#endif
+ ofile_buffer<Capacity,direct_io> buffer;
         std::size_t size;
 };
 

Modified: sandbox/transaction/boost/transact/detail/file.hpp
==============================================================================
--- sandbox/transaction/boost/transact/detail/file.hpp (original)
+++ sandbox/transaction/boost/transact/detail/file.hpp 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -19,65 +19,110 @@
 #ifdef _WIN32
 #include <Windows.h>
 #include <WinBase.h>
+
+#include <strsafe.h>
+
+static void throw_io_failure(char const* function)
+{
+ // Retrieve the system error message for the last-error code
+ DWORD dw = GetLastError();
+
+ LPVOID lpMsgBuf;
+ FormatMessage( FORMAT_MESSAGE_ALLOCATE_BUFFER |
+ FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL,
+ dw,
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
+ (LPTSTR) &lpMsgBuf,
+ 0, NULL );
+
+ // Display the error message and exit the process
+ // TODO - should be incorporated into io_failure what().
+ std::cerr << function << " failed with error " << dw << ": " << (char*)lpMsgBuf << std::endl;
+
+ LocalFree(lpMsgBuf);
+ throw io_failure();
+}
         
 // low-level ofile representation for WIN32
+template <bool direct_io = false>
 class ofile {
 public:
         typedef unsigned int size_type;
-
+ static const bool has_direct_io = direct_io;
+
         void* filedes;
         
         ofile(std::string const &name) : filedes(INVALID_HANDLE_VALUE) {
                 unsigned long access = GENERIC_READ | GENERIC_WRITE;
                 unsigned long creation_flags = OPEN_ALWAYS;
- unsigned long flags = FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
+ unsigned long flags = 0;
+ if ( direct_io )
+ flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
+
                 this->filedes = CreateFileA(name.c_str(), access,
+ FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
+ 0, creation_flags, flags, 0);
+ if (this->filedes == INVALID_HANDLE_VALUE )
+ throw_io_failure("CreateFileA");
+
+ //make sure the directory entry has reached the disk:
+ std::string dirname=filesystem::system_complete(name).parent_path().external_directory_string();
+
+ creation_flags = OPEN_EXISTING;
+ flags = FILE_FLAG_BACKUP_SEMANTICS;
+ void *dirfiledes = CreateFileA(dirname.c_str(), access,
                                                                                 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
                                                                                 0, creation_flags, flags, 0);
- if (this->filedes == INVALID_HANDLE_VALUE ) {
- throw io_failure();
- }
- std::cerr << "File opened" << std::endl;
+ if (dirfiledes == INVALID_HANDLE_VALUE )
+ throw_io_failure("CreateFileA");
+ if(!FlushFileBuffers(dirfiledes))
+ throw_io_failure("FlushFileBuffers");
+ if(!CloseHandle(dirfiledes))
+ throw_io_failure("CloseHandle");
         }
         
         ~ofile() {
- if(this->filedes != INVALID_HANDLE_VALUE) CloseHandle(this->filedes);
+ if(this->filedes != INVALID_HANDLE_VALUE)
+ CloseHandle(this->filedes);
         }
         
         void seek(size_type const &s) {
                 LARGE_INTEGER loc;
                 loc.QuadPart = s;
- if(SetFilePointerEx(this->filedes, loc, NULL, FILE_BEGIN) == 0) {
- std::cerr << "SetFilePointerEx == 0" << std::endl;
- throw io_failure();
- }
+ if(SetFilePointerEx(this->filedes, loc, NULL, FILE_BEGIN) == 0)
+ throw_io_failure("SetFilePointerEx");
         }
         
         size_type write(const char *data, size_type const &size) {
                 DWORD written;
- if(WriteFile(this->filedes, data, size, &written, 0) == 0) {
- std::cerr << "WriteFile == 0" << std::endl;
- throw io_failure();
- }
+ if(WriteFile(this->filedes, data, size, &written, 0) == 0)
+ throw_io_failure("WriteFile");
                 return (size_type)written;
         }
         
- void sync() { }
+ void sync() {
+ if (!direct_io && FlushFileBuffers(this->filedes) == 0)
+ throw_io_failure("FlushFileBuffers");
+ }
 };
 
 #else
         
 #include <unistd.h>
 #include <fcntl.h>
-
+
 #ifndef _POSIX_SYNCHRONIZED_IO
 #error no POSIX synchronized IO available
 #endif
 
 // low-level ofile for Linux/Unix
+template <bool direct_io = false> // ignored on Posix API.
 class ofile {
 public:
         typedef unsigned int size_type;
+ static const bool has_direct_io = direct_io;
 
         int filedes;
         
@@ -88,6 +133,7 @@
 #endif
                 this->filedes= open(name.c_str(),flags,S_IRUSR | S_IWUSR);
                 if(this->filedes==-1) throw io_failure();
+
                 { //make sure the directory entry has reached the disk:
                         std::string dirname=filesystem::path(name).directory_string();
                         if(dirname.empty()) dirname=".";
@@ -121,7 +167,9 @@
 };
 
 #endif
-
+
+typedef ofile<true> direct_ofile;
+
 }
 }
 }

Modified: sandbox/transaction/boost/transact/detail/syncing_file.hpp
==============================================================================
--- sandbox/transaction/boost/transact/detail/syncing_file.hpp (original)
+++ sandbox/transaction/boost/transact/detail/syncing_file.hpp 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -15,78 +15,73 @@
 #include <boost/static_assert.hpp>
 #include <boost/assert.hpp>
 #include <boost/config.hpp>
-#include <boost/transact/detail/file.hpp>
+#include <boost/transact/detail/buffering_file.hpp>
 
 namespace boost{
 namespace transact{
 namespace detail{
 
+
+template <class Base>
 class syncing_seq_ofile{
 public:
     typedef unsigned int size_type;
- explicit syncing_seq_ofile(std::string const &name);
- void write(void const *data,std::size_t size);
+ static const bool has_direct_io = Base::has_direct_io;
+
+ explicit syncing_seq_ofile(std::string const &name)
+ : pos(0)
+ , base(name){
+ this->write_ahead(0,write_ahead_size);
+ }
+ void write(void const *data,std::size_t size){
+ size_type const s=this->pos % write_ahead_size;
+ if(s + size >= write_ahead_size){ //there must be at least one 0 at the and, so also write ahead if this is equal.
+ size_type start=this->pos - s + write_ahead_size;
+ size_type end=start+((s + size)/write_ahead_size) * write_ahead_size; //usually == start + write_ahead_size, but "size" can theoretically span a whole write_ahead_size
+ BOOST_ASSERT(end > start);
+ this->write_ahead(start,end);
+ }
+
+ std::size_t ret= base.write((char const *)data,size);
+ if(ret > 0) this->pos+=ret;
+ if(ret != std::size_t(size)) throw io_failure();
+ }
     size_type position() const{ return this->pos; }
- void flush();
- void sync();
+ void flush() {}
+ void sync() {
+ base.sync();
+ }
 private:
     size_type pos;
- ofile filedes;
+ Base base;
 
-private:
     void write_ahead(size_type const &start,size_type const &end){
         BOOST_ASSERT(start % write_ahead_size == 0);
         BOOST_ASSERT(end % write_ahead_size == 0);
         BOOST_STATIC_ASSERT(write_ahead_size % page_size == 0);
- filedes.seek(start);
+ base.seek(start);
         for(size_type off=start;off < end;off+=page_size){
- filedes.write(empty_page.data,page_size);
+ base.write(empty_page.data,page_size);
         }
- filedes.sync();
- filedes.seek(this->pos);
+ base.sync();
+ base.seek(this->pos);
     }
 
     static std::size_t const write_ahead_size=10*1024*1024;
     static std::size_t const page_size=4096;
 
- struct empty_page_type{
- empty_page_size(){
- std::memset(data,0,page_size);
+ struct empty_page_type : public ofile_buffer<page_size,has_direct_io> {
+ typedef ofile_buffer<page_size,has_direct_io> base_buffer;
+ empty_page_type() : base_buffer() {
+ std::memset(base_buffer::data,0,page_size);
         }
- char data[page_size];
- }
+ };
+
     static empty_page_type empty_page;
- int filedes;
 };
 
-syncing_seq_ofile::empty_page_type syncing_seq_ofile::empty_page;
-
-inline syncing_seq_ofile::syncing_seq_ofile(std::string const &name)
- : pos(0)
- , filedes(name){
- this->write_ahead(0,write_ahead_size);
-}
-
-void syncing_seq_ofile::write(void const *data,std::size_t size){
- size_type const s=this->pos % write_ahead_size;
- if(s + size >= write_ahead_size){ //there must be at least one 0 at the and, so also write ahead if this is equal.
- size_type start=this->pos - s + write_ahead_size;
- size_type end=start+((s + size)/write_ahead_size) * write_ahead_size; //usually == start + write_ahead_size, but "size" can theoretically span a whole write_ahead_size
- BOOST_ASSERT(end > start);
- this->write_ahead(start,end);
- }
-
- std::size_t ret= filedes.write((char const *)data,size);
- if(ret > 0) this->pos+=ret;
- if(ret != std::size_t(size)) throw io_failure();
-}
-
-
-inline void syncing_seq_ofile::flush(){}
-
-inline void syncing_seq_ofile::sync(){
- filedes.sync();
-}
+template<class Base>
+typename syncing_seq_ofile<Base>::empty_page_type syncing_seq_ofile<Base>::empty_page;
 
 
 }

Modified: sandbox/transaction/libs/transact/perf/filewrite.cpp
==============================================================================
--- sandbox/transaction/libs/transact/perf/filewrite.cpp (original)
+++ sandbox/transaction/libs/transact/perf/filewrite.cpp 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -14,21 +14,20 @@
 #include <boost/transact/detail/buffering_file.hpp>
 #include <boost/transact/detail/sectorizing_file.hpp>
 #include <boost/transact/detail/syncing_file.hpp>
+#include <boost/transact/detail/file.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 
-static const int num_sets=100;
-static const int txns_per_set=1000;
-static const int txn_size=3000; // typical transaction log size
-static int total_txns = 0;
-static int total_bytes = 0;
+static const int loop_size=1000;
 
-static char txn_buffer[ txn_size ]; // random data set
+static char txn_buffer[ 10000 ]; // random data set
 
 using namespace boost::transact;
+using namespace boost::posix_time;
 
 typedef detail::sectorizing_seq_ofile<
                         detail::aligning_seq_ofile<
                                 detail::buffering_seq_ofile<
- detail::syncing_seq_ofile,
+ detail::syncing_seq_ofile< detail::ofile<false> >,
                                         8192
>
>
@@ -36,23 +35,42 @@
 ofile_t;
 
 
-void log_a_set(ofile_t &outfile) {
- for (int i=0; i<txns_per_set; i++) {
+typedef detail::sectorizing_seq_ofile<
+ detail::aligning_seq_ofile<
+ detail::buffering_seq_ofile<
+ detail::syncing_seq_ofile< detail::direct_ofile >,
+ 8192
+ >
+ >
+ >
+direct_ofile_t;
+
+template <class file_t>
+void filetest1(const char *filename, size_t txn_size) {
+ file_t outfile(filename);
+
+ ptime start = microsec_clock::local_time();
+ for (int i=0; i<loop_size; i++) {
                 outfile.write(txn_buffer, txn_size);
+ outfile.sync();
         }
- total_txns += txns_per_set;
- total_bytes += (txns_per_set * txn_size);
- std::cout << "Written " << total_txns << " txns, " << total_bytes << " bytes" << std::endl;
-}
+ ptime end = microsec_clock::local_time();
 
-void filetest1() {
- ofile_t outfile("filetest1.out");
- for (int i=0; i<num_sets; i++) {
- log_a_set(outfile);
- }
+ std::cout << "Written " << loop_size << " txns, "
+ << loop_size*txn_size << " bytes, in "
+ << (end-start) << " microseconds"
+ << std::endl;
 }
 
+
 int main(int, const char *[]){
- filetest1();
+ // write loop_size transactions to disk, each 3k in size.
+ filetest1<ofile_t>("filetest1.out", 3000);
+ filetest1<direct_ofile_t>("filetest2.out", 3000);
+
+ // write loop_size transactions to disk, each 10k in size.
+ filetest1<ofile_t>("filetest3.out", 10000);
+ filetest1<direct_ofile_t>("filetest4.out", 10000);
+
     return 0;
 }

Modified: sandbox/transaction/libs/transact/test/Jamfile.v2
==============================================================================
--- sandbox/transaction/libs/transact/test/Jamfile.v2 (original)
+++ sandbox/transaction/libs/transact/test/Jamfile.v2 2010-07-11 22:38:14 EDT (Sun, 11 Jul 2010)
@@ -51,7 +51,7 @@
 
     alias filewriteperf
     :
- [ run ../perf/filewrite.cpp : : : <library>/boost//date_time <library>/boost//system <link>static ]
+ [ run ../perf/filewrite.cpp : : : <library>/boost//filesystem <library>/boost//date_time <library>/boost//system <link>static ]
     ;
         
     alias all


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk