Boost logo

Boost-Commit :

Subject: [Boost-commit] svn:boost r58624 - trunk/boost/asio/detail
From: chris_at_[hidden]
Date: 2010-01-02 01:09:03


Author: chris_kohlhoff
Date: 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
New Revision: 58624
URL: http://svn.boost.org/trac/boost/changeset/58624

Log:
Windows needs the OVERLAPPED structure to be valid until both the initiating
function call has returned and the completion packet has been delivered.

Text files modified:
   trunk/boost/asio/detail/win_iocp_handle_service.hpp | 56 ++++++++----------
   trunk/boost/asio/detail/win_iocp_io_service.hpp | 76 ++++++++++++++++++------
   trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp | 6 +
   trunk/boost/asio/detail/win_iocp_socket_service.hpp | 120 +++++++++++++++++----------------------
   4 files changed, 136 insertions(+), 122 deletions(-)

Modified: trunk/boost/asio/detail/win_iocp_handle_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_handle_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_handle_service.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -457,13 +457,6 @@
   void async_write_some_at(implementation_type& impl, boost::uint64_t offset,
       const ConstBufferSequence& buffers, Handler handler)
   {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
     // Update the ID of the thread from which cancellation is safe.
     if (impl.safe_cancellation_thread_id_ == 0)
       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
@@ -476,6 +469,13 @@
     raw_handler_ptr<alloc_traits> raw_ptr(handler);
     handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
 
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
     // Find first buffer of non-zero length.
     boost::asio::const_buffer buffer;
     typename ConstBufferSequence::const_iterator iter = buffers.begin();
@@ -490,10 +490,8 @@
     // A request to write 0 bytes on a handle is a no-op.
     if (boost::asio::buffer_size(buffer) == 0)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
       return;
     }
 
@@ -510,14 +508,12 @@
     // Check if the operation completed immediately.
     if (!ok && last_error != ERROR_IO_PENDING)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }
@@ -716,13 +712,6 @@
   void async_read_some_at(implementation_type& impl, boost::uint64_t offset,
       const MutableBufferSequence& buffers, Handler handler)
   {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
     // Update the ID of the thread from which cancellation is safe.
     if (impl.safe_cancellation_thread_id_ == 0)
       impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
@@ -735,6 +724,13 @@
     raw_handler_ptr<alloc_traits> raw_ptr(handler);
     handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
 
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
     // Find first buffer of non-zero length.
     boost::asio::mutable_buffer buffer;
     typename MutableBufferSequence::const_iterator iter = buffers.begin();
@@ -749,10 +745,8 @@
     // A request to receive 0 bytes on a stream handle is a no-op.
     if (boost::asio::buffer_size(buffer) == 0)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
       return;
     }
 
@@ -767,14 +761,12 @@
     DWORD last_error = ::GetLastError();
     if (!ok && last_error != ERROR_IO_PENDING && last_error != ERROR_MORE_DATA)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }

Modified: trunk/boost/asio/detail/win_iocp_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_io_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_io_service.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -50,6 +50,8 @@
   // This class inherits from OVERLAPPED so that we can downcast to get back to
   // the operation pointer from the LPOVERLAPPED out parameter of
   // GetQueuedCompletionStatus.
+ class operation;
+ friend class operation;
   class operation
     : public OVERLAPPED
   {
@@ -59,7 +61,10 @@
 
     operation(win_iocp_io_service& iocp_service,
         invoke_func_type invoke_func, destroy_func_type destroy_func)
- : outstanding_operations_(&iocp_service.outstanding_operations_),
+ : iocp_service_(iocp_service),
+ ready_(0),
+ last_error_(~DWORD(0)),
+ bytes_transferred_(0),
         invoke_func_(invoke_func),
         destroy_func_(destroy_func)
     {
@@ -69,12 +74,48 @@
       OffsetHigh = 0;
       hEvent = 0;
 
- ::InterlockedIncrement(outstanding_operations_);
+ ::InterlockedIncrement(&iocp_service_.outstanding_operations_);
+ }
+
+ void reset()
+ {
+ Internal = 0;
+ InternalHigh = 0;
+ Offset = 0;
+ OffsetHigh = 0;
+ hEvent = 0;
+ ready_ = 0;
+ last_error_ = ~DWORD(0);
+ bytes_transferred_ = 0;
+ }
+
+ void on_pending()
+ {
+ if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
+ iocp_service_.post_completion(this, last_error_, bytes_transferred_);
     }
 
- void do_completion(DWORD last_error, size_t bytes_transferred)
+ void on_immediate_completion(DWORD last_error, size_t bytes_transferred)
     {
- invoke_func_(this, last_error, bytes_transferred);
+ ready_ = 1;
+ iocp_service_.post_completion(this, last_error, bytes_transferred);
+ }
+
+ bool on_completion(DWORD last_error, size_t bytes_transferred)
+ {
+ if (last_error_ == ~DWORD(0))
+ {
+ last_error_ = last_error;
+ bytes_transferred_ = bytes_transferred;
+ }
+
+ if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
+ {
+ invoke_func_(this, last_error_, bytes_transferred_);
+ return true;
+ }
+
+ return false;
     }
 
     void destroy()
@@ -86,16 +127,18 @@
     // Prevent deletion through this type.
     ~operation()
     {
- ::InterlockedDecrement(outstanding_operations_);
+ ::InterlockedDecrement(&iocp_service_.outstanding_operations_);
     }
 
   private:
- long* outstanding_operations_;
+ win_iocp_io_service& iocp_service_;
+ long ready_;
+ DWORD last_error_;
+ std::size_t bytes_transferred_;
     invoke_func_type invoke_func_;
     destroy_func_type destroy_func_;
   };
 
-
   // Constructor.
   win_iocp_io_service(boost::asio::io_service& io_service)
     : boost::asio::detail::service_base<win_iocp_io_service>(io_service),
@@ -296,15 +339,7 @@
     handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
 
     // Enqueue the operation on the I/O completion port.
- if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get()))
- {
- DWORD last_error = ::GetLastError();
- boost::system::system_error e(
- boost::system::error_code(last_error,
- boost::asio::error::get_system_category()),
- "pqcs");
- boost::throw_exception(e);
- }
+ ptr.get()->on_immediate_completion(0, 0);
 
     // Operation has been successfully posted.
     ptr.release();
@@ -507,10 +542,11 @@
 
         // Dispatch the operation.
         operation* op = static_cast<operation*>(overlapped);
- op->do_completion(last_error, bytes_transferred);
-
- ec = boost::system::error_code();
- return 1;
+ if (op->on_completion(last_error, bytes_transferred))
+ {
+ ec = boost::system::error_code();
+ return 1;
+ }
       }
       else if (completion_key == transfer_timer_dispatching)
       {

Modified: trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -92,6 +92,9 @@
   // Release ownership of the OVERLAPPED object.
   OVERLAPPED* release()
   {
+ if (ptr_)
+ ptr_->on_pending();
+
     OVERLAPPED* tmp = ptr_;
     ptr_ = 0;
     return tmp;
@@ -104,8 +107,7 @@
     if (ptr_)
     {
       ptr_->ec_ = ec;
- ptr_->io_service_.post_completion(ptr_, 0,
- static_cast<DWORD>(bytes_transferred));
+ ptr_->on_immediate_completion(0, static_cast<DWORD>(bytes_transferred));
       ptr_ = 0;
     }
   }

Modified: trunk/boost/asio/detail/win_iocp_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_socket_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_socket_service.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -823,13 +823,6 @@
   void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
       socket_base::message_flags flags, Handler handler)
   {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
 #if defined(BOOST_ASIO_ENABLE_CANCELIO)
     // Update the ID of the thread from which cancellation is safe.
     if (impl.safe_cancellation_thread_id_ == 0)
@@ -845,6 +838,13 @@
     handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
         impl.cancel_token_, buffers, handler);
 
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
     // Copy buffers into WSABUF array.
     ::WSABUF bufs[max_buffers];
     typename ConstBufferSequence::const_iterator iter = buffers.begin();
@@ -863,10 +863,8 @@
     // A request to receive 0 bytes on a stream socket is a no-op.
     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
       return;
     }
 
@@ -879,14 +877,12 @@
     // Check if the operation completed immediately.
     if (result != 0 && last_error != WSA_IO_PENDING)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }
@@ -1102,13 +1098,6 @@
       const ConstBufferSequence& buffers, const endpoint_type& destination,
       socket_base::message_flags flags, Handler handler)
   {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
 #if defined(BOOST_ASIO_ENABLE_CANCELIO)
     // Update the ID of the thread from which cancellation is safe.
     if (impl.safe_cancellation_thread_id_ == 0)
@@ -1123,6 +1112,13 @@
     raw_handler_ptr<alloc_traits> raw_ptr(handler);
     handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
 
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
     // Copy buffers into WSABUF array.
     ::WSABUF bufs[max_buffers];
     typename ConstBufferSequence::const_iterator iter = buffers.begin();
@@ -1145,14 +1141,12 @@
     // Check if the operation completed immediately.
     if (result != 0 && last_error != WSA_IO_PENDING)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }
@@ -1375,13 +1369,6 @@
       const MutableBufferSequence& buffers,
       socket_base::message_flags flags, Handler handler)
   {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
 #if defined(BOOST_ASIO_ENABLE_CANCELIO)
     // Update the ID of the thread from which cancellation is safe.
     if (impl.safe_cancellation_thread_id_ == 0)
@@ -1398,6 +1385,13 @@
     handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
         iocp_service_, impl.cancel_token_, buffers, handler);
 
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
     // Copy buffers into WSABUF array.
     ::WSABUF bufs[max_buffers];
     typename MutableBufferSequence::const_iterator iter = buffers.begin();
@@ -1415,10 +1409,8 @@
     // A request to receive 0 bytes on a stream socket is a no-op.
     if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
       return;
     }
 
@@ -1430,14 +1422,12 @@
     DWORD last_error = ::WSAGetLastError();
     if (result != 0 && last_error != WSA_IO_PENDING)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }
@@ -1482,14 +1472,12 @@
       DWORD last_error = ::WSAGetLastError();
       if (result != 0 && last_error != WSA_IO_PENDING)
       {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
       }
       else
       {
+ ptr.get()->on_pending();
         ptr.release();
       }
     }
@@ -1710,13 +1698,6 @@
       const MutableBufferSequence& buffers, endpoint_type& sender_endp,
       socket_base::message_flags flags, Handler handler)
   {
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
 #if defined(BOOST_ASIO_ENABLE_CANCELIO)
     // Update the ID of the thread from which cancellation is safe.
     if (impl.safe_cancellation_thread_id_ == 0)
@@ -1733,6 +1714,13 @@
     handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
         iocp_service_, sender_endp, buffers, handler);
 
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
     // Copy buffers into WSABUF array.
     ::WSABUF bufs[max_buffers];
     typename MutableBufferSequence::const_iterator iter = buffers.begin();
@@ -1754,14 +1742,12 @@
     DWORD last_error = ::WSAGetLastError();
     if (result != 0 && last_error != WSA_IO_PENDING)
     {
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }
@@ -1925,11 +1911,7 @@
           && !ptr.get()->enable_connection_aborted_)
       {
         // Reset OVERLAPPED structure.
- ptr.get()->Internal = 0;
- ptr.get()->InternalHigh = 0;
- ptr.get()->Offset = 0;
- ptr.get()->OffsetHigh = 0;
- ptr.get()->hEvent = 0;
+ ptr.get()->reset();
 
         // Create a new socket for the next connection, since the AcceptEx call
         // fails with WSAEINVAL if we try to reuse the same socket.
@@ -1954,7 +1936,7 @@
                 || last_error == WSAECONNABORTED)
             {
               // Post this handler so that operation will be restarted again.
- ptr.get()->io_service_.post_completion(ptr.get(), last_error, 0);
+ ptr.get()->on_immediate_completion(last_error, 0);
               ptr.release();
               return;
             }
@@ -1966,6 +1948,7 @@
           else
           {
             // Asynchronous operation has been successfully restarted.
+ ptr.get()->on_pending();
             ptr.release();
             return;
           }
@@ -2141,7 +2124,7 @@
         // Post handler so that operation will be restarted again. We do not
         // perform the AcceptEx again here to avoid the possibility of starving
         // other handlers.
- iocp_service_.post_completion(ptr.get(), last_error, 0);
+ ptr.get()->on_immediate_completion(last_error, 0);
         ptr.release();
       }
       else
@@ -2155,6 +2138,7 @@
     }
     else
     {
+ ptr.get()->on_pending();
       ptr.release();
     }
   }


Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk