|
Boost-Commit : |
Subject: [Boost-commit] svn:boost r58624 - trunk/boost/asio/detail
From: chris_at_[hidden]
Date: 2010-01-02 01:09:03
Author: chris_kohlhoff
Date: 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
New Revision: 58624
URL: http://svn.boost.org/trac/boost/changeset/58624
Log:
Windows needs the OVERLAPPED structure to be valid until both the initiating
function call has returned and the completion packet has been delivered.
Text files modified:
trunk/boost/asio/detail/win_iocp_handle_service.hpp | 56 ++++++++----------
trunk/boost/asio/detail/win_iocp_io_service.hpp | 76 ++++++++++++++++++------
trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp | 6 +
trunk/boost/asio/detail/win_iocp_socket_service.hpp | 120 +++++++++++++++++----------------------
4 files changed, 136 insertions(+), 122 deletions(-)
Modified: trunk/boost/asio/detail/win_iocp_handle_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_handle_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_handle_service.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -457,13 +457,6 @@
void async_write_some_at(implementation_type& impl, boost::uint64_t offset,
const ConstBufferSequence& buffers, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
// Update the ID of the thread from which cancellation is safe.
if (impl.safe_cancellation_thread_id_ == 0)
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
@@ -476,6 +469,13 @@
raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
// Find first buffer of non-zero length.
boost::asio::const_buffer buffer;
typename ConstBufferSequence::const_iterator iter = buffers.begin();
@@ -490,10 +490,8 @@
// A request to write 0 bytes on a handle is a no-op.
if (boost::asio::buffer_size(buffer) == 0)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
return;
}
@@ -510,14 +508,12 @@
// Check if the operation completed immediately.
if (!ok && last_error != ERROR_IO_PENDING)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
@@ -716,13 +712,6 @@
void async_read_some_at(implementation_type& impl, boost::uint64_t offset,
const MutableBufferSequence& buffers, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
// Update the ID of the thread from which cancellation is safe.
if (impl.safe_cancellation_thread_id_ == 0)
impl.safe_cancellation_thread_id_ = ::GetCurrentThreadId();
@@ -735,6 +724,13 @@
raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
// Find first buffer of non-zero length.
boost::asio::mutable_buffer buffer;
typename MutableBufferSequence::const_iterator iter = buffers.begin();
@@ -749,10 +745,8 @@
// A request to receive 0 bytes on a stream handle is a no-op.
if (boost::asio::buffer_size(buffer) == 0)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
return;
}
@@ -767,14 +761,12 @@
DWORD last_error = ::GetLastError();
if (!ok && last_error != ERROR_IO_PENDING && last_error != ERROR_MORE_DATA)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
Modified: trunk/boost/asio/detail/win_iocp_io_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_io_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_io_service.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -50,6 +50,8 @@
// This class inherits from OVERLAPPED so that we can downcast to get back to
// the operation pointer from the LPOVERLAPPED out parameter of
// GetQueuedCompletionStatus.
+ class operation;
+ friend class operation;
class operation
: public OVERLAPPED
{
@@ -59,7 +61,10 @@
operation(win_iocp_io_service& iocp_service,
invoke_func_type invoke_func, destroy_func_type destroy_func)
- : outstanding_operations_(&iocp_service.outstanding_operations_),
+ : iocp_service_(iocp_service),
+ ready_(0),
+ last_error_(~DWORD(0)),
+ bytes_transferred_(0),
invoke_func_(invoke_func),
destroy_func_(destroy_func)
{
@@ -69,12 +74,48 @@
OffsetHigh = 0;
hEvent = 0;
- ::InterlockedIncrement(outstanding_operations_);
+ ::InterlockedIncrement(&iocp_service_.outstanding_operations_);
+ }
+
+ void reset()
+ {
+ Internal = 0;
+ InternalHigh = 0;
+ Offset = 0;
+ OffsetHigh = 0;
+ hEvent = 0;
+ ready_ = 0;
+ last_error_ = ~DWORD(0);
+ bytes_transferred_ = 0;
+ }
+
+ void on_pending()
+ {
+ if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
+ iocp_service_.post_completion(this, last_error_, bytes_transferred_);
}
- void do_completion(DWORD last_error, size_t bytes_transferred)
+ void on_immediate_completion(DWORD last_error, size_t bytes_transferred)
{
- invoke_func_(this, last_error, bytes_transferred);
+ ready_ = 1;
+ iocp_service_.post_completion(this, last_error, bytes_transferred);
+ }
+
+ bool on_completion(DWORD last_error, size_t bytes_transferred)
+ {
+ if (last_error_ == ~DWORD(0))
+ {
+ last_error_ = last_error;
+ bytes_transferred_ = bytes_transferred;
+ }
+
+ if (::InterlockedCompareExchange(&ready_, 1, 0) == 1)
+ {
+ invoke_func_(this, last_error_, bytes_transferred_);
+ return true;
+ }
+
+ return false;
}
void destroy()
@@ -86,16 +127,18 @@
// Prevent deletion through this type.
~operation()
{
- ::InterlockedDecrement(outstanding_operations_);
+ ::InterlockedDecrement(&iocp_service_.outstanding_operations_);
}
private:
- long* outstanding_operations_;
+ win_iocp_io_service& iocp_service_;
+ long ready_;
+ DWORD last_error_;
+ std::size_t bytes_transferred_;
invoke_func_type invoke_func_;
destroy_func_type destroy_func_;
};
-
// Constructor.
win_iocp_io_service(boost::asio::io_service& io_service)
: boost::asio::detail::service_base<win_iocp_io_service>(io_service),
@@ -296,15 +339,7 @@
handler_ptr<alloc_traits> ptr(raw_ptr, *this, handler);
// Enqueue the operation on the I/O completion port.
- if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, ptr.get()))
- {
- DWORD last_error = ::GetLastError();
- boost::system::system_error e(
- boost::system::error_code(last_error,
- boost::asio::error::get_system_category()),
- "pqcs");
- boost::throw_exception(e);
- }
+ ptr.get()->on_immediate_completion(0, 0);
// Operation has been successfully posted.
ptr.release();
@@ -507,10 +542,11 @@
// Dispatch the operation.
operation* op = static_cast<operation*>(overlapped);
- op->do_completion(last_error, bytes_transferred);
-
- ec = boost::system::error_code();
- return 1;
+ if (op->on_completion(last_error, bytes_transferred))
+ {
+ ec = boost::system::error_code();
+ return 1;
+ }
}
else if (completion_key == transfer_timer_dispatching)
{
Modified: trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_overlapped_ptr.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -92,6 +92,9 @@
// Release ownership of the OVERLAPPED object.
OVERLAPPED* release()
{
+ if (ptr_)
+ ptr_->on_pending();
+
OVERLAPPED* tmp = ptr_;
ptr_ = 0;
return tmp;
@@ -104,8 +107,7 @@
if (ptr_)
{
ptr_->ec_ = ec;
- ptr_->io_service_.post_completion(ptr_, 0,
- static_cast<DWORD>(bytes_transferred));
+ ptr_->on_immediate_completion(0, static_cast<DWORD>(bytes_transferred));
ptr_ = 0;
}
}
Modified: trunk/boost/asio/detail/win_iocp_socket_service.hpp
==============================================================================
--- trunk/boost/asio/detail/win_iocp_socket_service.hpp (original)
+++ trunk/boost/asio/detail/win_iocp_socket_service.hpp 2010-01-02 01:09:02 EST (Sat, 02 Jan 2010)
@@ -823,13 +823,6 @@
void async_send(implementation_type& impl, const ConstBufferSequence& buffers,
socket_base::message_flags flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
#if defined(BOOST_ASIO_ENABLE_CANCELIO)
// Update the ID of the thread from which cancellation is safe.
if (impl.safe_cancellation_thread_id_ == 0)
@@ -845,6 +838,13 @@
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
impl.cancel_token_, buffers, handler);
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
// Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers];
typename ConstBufferSequence::const_iterator iter = buffers.begin();
@@ -863,10 +863,8 @@
// A request to receive 0 bytes on a stream socket is a no-op.
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
return;
}
@@ -879,14 +877,12 @@
// Check if the operation completed immediately.
if (result != 0 && last_error != WSA_IO_PENDING)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
@@ -1102,13 +1098,6 @@
const ConstBufferSequence& buffers, const endpoint_type& destination,
socket_base::message_flags flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
#if defined(BOOST_ASIO_ENABLE_CANCELIO)
// Update the ID of the thread from which cancellation is safe.
if (impl.safe_cancellation_thread_id_ == 0)
@@ -1123,6 +1112,13 @@
raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
// Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers];
typename ConstBufferSequence::const_iterator iter = buffers.begin();
@@ -1145,14 +1141,12 @@
// Check if the operation completed immediately.
if (result != 0 && last_error != WSA_IO_PENDING)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
@@ -1375,13 +1369,6 @@
const MutableBufferSequence& buffers,
socket_base::message_flags flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
#if defined(BOOST_ASIO_ENABLE_CANCELIO)
// Update the ID of the thread from which cancellation is safe.
if (impl.safe_cancellation_thread_id_ == 0)
@@ -1398,6 +1385,13 @@
handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
iocp_service_, impl.cancel_token_, buffers, handler);
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
// Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers];
typename MutableBufferSequence::const_iterator iter = buffers.begin();
@@ -1415,10 +1409,8 @@
// A request to receive 0 bytes on a stream socket is a no-op.
if (impl.protocol_.type() == SOCK_STREAM && total_buffer_size == 0)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code error;
- iocp_service_.post(bind_handler(handler, error, 0));
+ ptr.get()->on_immediate_completion(0, 0);
+ ptr.release();
return;
}
@@ -1430,14 +1422,12 @@
DWORD last_error = ::WSAGetLastError();
if (result != 0 && last_error != WSA_IO_PENDING)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
@@ -1482,14 +1472,12 @@
DWORD last_error = ::WSAGetLastError();
if (result != 0 && last_error != WSA_IO_PENDING)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
@@ -1710,13 +1698,6 @@
const MutableBufferSequence& buffers, endpoint_type& sender_endp,
socket_base::message_flags flags, Handler handler)
{
- if (!is_open(impl))
- {
- this->get_io_service().post(bind_handler(handler,
- boost::asio::error::bad_descriptor, 0));
- return;
- }
-
#if defined(BOOST_ASIO_ENABLE_CANCELIO)
// Update the ID of the thread from which cancellation is safe.
if (impl.safe_cancellation_thread_id_ == 0)
@@ -1733,6 +1714,13 @@
handler_ptr<alloc_traits> ptr(raw_ptr, protocol_type,
iocp_service_, sender_endp, buffers, handler);
+ if (!is_open(impl))
+ {
+ ptr.get()->on_immediate_completion(WSAEBADF, 0);
+ ptr.release();
+ return;
+ }
+
// Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers];
typename MutableBufferSequence::const_iterator iter = buffers.begin();
@@ -1754,14 +1742,12 @@
DWORD last_error = ::WSAGetLastError();
if (result != 0 && last_error != WSA_IO_PENDING)
{
- boost::asio::io_service::work work(this->get_io_service());
- ptr.reset();
- boost::system::error_code ec(last_error,
- boost::asio::error::get_system_category());
- iocp_service_.post(bind_handler(handler, ec, bytes_transferred));
+ ptr.get()->on_immediate_completion(last_error, bytes_transferred);
+ ptr.release();
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
@@ -1925,11 +1911,7 @@
&& !ptr.get()->enable_connection_aborted_)
{
// Reset OVERLAPPED structure.
- ptr.get()->Internal = 0;
- ptr.get()->InternalHigh = 0;
- ptr.get()->Offset = 0;
- ptr.get()->OffsetHigh = 0;
- ptr.get()->hEvent = 0;
+ ptr.get()->reset();
// Create a new socket for the next connection, since the AcceptEx call
// fails with WSAEINVAL if we try to reuse the same socket.
@@ -1954,7 +1936,7 @@
|| last_error == WSAECONNABORTED)
{
// Post this handler so that operation will be restarted again.
- ptr.get()->io_service_.post_completion(ptr.get(), last_error, 0);
+ ptr.get()->on_immediate_completion(last_error, 0);
ptr.release();
return;
}
@@ -1966,6 +1948,7 @@
else
{
// Asynchronous operation has been successfully restarted.
+ ptr.get()->on_pending();
ptr.release();
return;
}
@@ -2141,7 +2124,7 @@
// Post handler so that operation will be restarted again. We do not
// perform the AcceptEx again here to avoid the possibility of starving
// other handlers.
- iocp_service_.post_completion(ptr.get(), last_error, 0);
+ ptr.get()->on_immediate_completion(last_error, 0);
ptr.release();
}
else
@@ -2155,6 +2138,7 @@
}
else
{
+ ptr.get()->on_pending();
ptr.release();
}
}
Boost-Commit list run by bdawes at acm.org, david.abrahams at rcn.com, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk