More problems with boost::asio and async_read

Hi I posted on this a couple of hours ago with some problems. It turned out that some of the problems I had might have been due to buggy hardware. But there is still some thing I do not understand... So, now I have reproduced some of of the problems I have with out any hardware (except from a PC). Basically I'm trying to read from a input source (in this case stdin) with a timeout. Due to the design of the existing application where this have to fit is it not possible to call run on my io_service. Here is my try so far: #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/optional.hpp> using namespace std; void set_result( boost::optional<boost::system::error_code> * a, boost::system::error_code b ) { if( b == 0) a->reset( b ); } void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, boost::asio::streambuf & result){ boost::optional<boost::system::error_code> timer_result; boost::optional<boost::system::error_code> read_result; boost::asio::deadline_timer timer( io ); timer.expires_from_now( boost::posix_time::milliseconds(5000) ); // allow up to 50ms of timeout for every char timer.async_wait( boost::bind(&set_result, &timer_result, _1) ); boost::asio::async_read( stream, result, boost::asio::transfer_at_least(1), boost::bind( &set_result, &read_result, _1 )); boost::system::error_code ec; while(1) { io.reset(); io.poll_one(ec); if ( read_result ) { timer.cancel(); return; } else if ( timer_result ) throw std::runtime_error("timeout"); } } void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, size_t size, boost::asio::streambuf & result){ while( result.size() < size ) receive(io, stream, result); } int main(int argc, const char *argv[]) { boost::asio::io_service io; boost::asio::posix::stream_descriptor in(io, ::dup(STDIN_FILENO)); for(int i = 0; i < 5; i++){ std::cout << i << " Type in 4 chareters and press enter" << std::endl << std::endl; std::cout << "in> "; std::cout.flush(); try { boost::asio::streambuf buf; receive(io, in, 5, buf); std::cout << "out> "; std::copy(boost::asio::buffer_cast<const char *>(buf.data()), boost::asio::buffer_cast<const char *>(buf.data()) + buf.size(), ostream_iterator<char>(std::cout)); } catch (const std::exception & e) { std::cout << e.what() << endl; } } return 0; } At my first look, I thought that it was working, but after playing more around with the test app, I got some segmentations faults. I found that something bad happens if I wait for the first query to timeout, and then enter 5 chars the next. Here is what valgrind says: ==17848== Memcheck, a memory error detector ==17848== Copyright (C) 2002-2010, and GNU GPL'd, by Julian Seward et al. ==17848== Using Valgrind-3.6.1 and LibVEX; rerun with -h for copyright info ==17848== Command: ./ccTalkScan ==17848== ==17848== 0 Type in 4 chareters and press enter in> ==17848== Invalid read of size 8 ==17848== at 0x546EB4A: ??? (setcontext.S:60) ==17848== by 0x425847: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:3 20) ==17848== by 0x425982: main (ccTalkScan.cxx:334) ==17848== Address 0x7feffdf68 is not stack'd, malloc'd or (recently) free'd ==17848== ==17848== Invalid read of size 8 ==17848== at 0x546EB52: ??? (setcontext.S:63) ==17848== by 0x425847: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:3 20) ==17848== by 0x425982: main (ccTalkScan.cxx:334) ==17848== Address 0x7feffdf58 is not stack'd, malloc'd or (recently) free'd ==17848== ==17848== Invalid read of size 8 ==17848== at 0x546EB59: ??? (setcontext.S:64) ==17848== by 0x425847: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:320) ==17848== by 0x425982: main (ccTalkScan.cxx:334) ==17848== Address 0x7feffdf28 is not stack'd, malloc'd or (recently) free'd ==17848== ==17848== Invalid read of size 8 ==17848== at 0x546EB4A: ??? (setcontext.S:60) ==17848== by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so) ==17848== Address 0x7feffdfd8 is not stack'd, malloc'd or (recently) free'd ==17848== ==17848== Invalid read of size 8 ==17848== at 0x546EB52: ??? (setcontext.S:63) ==17848== by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so) ==17848== Address 0x7feffdfc8 is not stack'd, malloc'd or (recently) free'd ==17848== ==17848== Invalid read of size 8 ==17848== at 0x546EB59: ??? (setcontext.S:64) ==17848== by 0x5E4BD2C: (below main) (in /lib64/libc-2.12.2.so) ==17848== Address 0x7feffdf98 is not stack'd, malloc'd or (recently) free'd ==17848== timeout 1 Type in 4 chareters and press enter in> asdf ==17848== Syscall param readv(vector[...]) points to unaddressable byte(s) ==17848== at 0x5EF7A81: readv (in /lib64/libc-2.12.2.so) ==17848== by 0x42AF1C: boost::asio::detail::descriptor_ops::non_blocking_read(int, iovec*, unsigned long, boost::system::error_code&, unsigned long&) (descriptor_ops.ipp:153) ==17848== by 0x4360BF: boost::asio::detail::descriptor_read_op_base<boost::asio::mutable_buffers_1>::do_perform(boost::asio::detail::reactor_op*) (descriptor_read_op.hpp:55) ==17848== by 0x427CE4: boost::asio::detail::reactor_op::perform() (reactor_op.hpp:40) ==17848== by 0x429003: boost::asio::detail::epoll_reactor::run(bool, boost::asio::detail::op_queue<boost::asio::detail::task_io_service_operation>&) (epoll_reactor.ipp:286) ==17848== by 0x429C81: boost::asio::detail::task_io_service::do_one(boost::asio::detail::scoped_lock<boost::asio::detail::posix_mutex>&, boost::asio::detail::task_io_service::idle_thread_info*) (task_io_service.ipp:264) ==17848== by 0x42986F: boost::asio::detail::task_io_service::poll_one(boost::system::error_code&) (task_io_service.ipp:188) ==17848== by 0x42A0C2: boost::asio::io_service::poll_one(boost::system::error_code&) (io_service.ipp:103) ==17848== by 0x4255E1: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:307) ==17848== by 0x425847: receive(boost::asio::io_service&, boost::asio::posix::basic_stream_descriptor<boost::asio::posix::stream_descriptor_service>&, unsigned long, boost::asio::basic_streambuf<std::allocator<char> >&) (ccTalkScan.cxx:320) ==17848== by 0x425982: main (ccTalkScan.cxx:334) ==17848== Address 0x65ba4f0 is 0 bytes inside a block of size 512 free'd ==17848== at 0x4C25C4F: operator delete(void*) (vg_replace_malloc.c:387) ==17848== by 0x432D07: __gnu_cxx::new_allocator<char>::deallocate(char*, unsigned long) (new_allocator.h:95) ==17848== by 0x4316BF: std::_Vector_base<char, std::allocator<char>
::_M_deallocate(char*, unsigned long) (stl_vector.h:146) ==17848== by 0x42FD64: std::_Vector_base<char, std::allocator<char> ::~_Vector_base() (stl_vector.h:132) ==17848== by 0x42D6A4: std::vector<char, std::allocator<char> ::~vector() (stl_vector.h:313) ==17848== by 0x42B657: boost::asio::basic_streambuf<std::allocator<char> ::~basic_streambuf() (basic_streambuf.hpp:114) ==17848== by 0x425A77: main (ccTalkScan.cxx:336) ==17848== out> 2 Type in 4 chareters and press enter
in> timeout 3 Type in 4 chareters and press enter in> timeout 4 Type in 4 chareters and press enter in> timeout ==17848== ==17848== HEAP SUMMARY: ==17848== in use at exit: 168 bytes in 3 blocks ==17848== total heap usage: 65 allocs, 62 frees, 22,246 bytes allocated ==17848== ==17848== LEAK SUMMARY: ==17848== definitely lost: 0 bytes in 0 blocks ==17848== indirectly lost: 0 bytes in 0 blocks ==17848== possibly lost: 0 bytes in 0 blocks ==17848== still reachable: 168 bytes in 3 blocks ==17848== suppressed: 0 bytes in 0 blocks ==17848== Rerun with --leak-check=full to see details of leaked memory ==17848== ==17848== For counts of detected and suppressed errors, rerun with: -v ==17848== Use --track-origins=yes to see where uninitialised values come from ==17848== ERROR SUMMARY: 27 errors from 9 contexts (suppressed: 4 from 4) I have tried different things to fix, but I that I might have mis understood a thing or two here. So some help would be most appreciated. -- Allan W. Nielsen

Basically I'm trying to read from a input source (in this case stdin) with a timeout. Due to the design of the existing application where this have to fit is it not possible to call run on my io_service.
Here is my try so far:
#include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/optional.hpp>
using namespace std;
void set_result( boost::optional<boost::system::error_code> * a, boost::system::error_code b ) { if( b == 0) a->reset( b ); }
void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, boost::asio::streambuf & result){ boost::optional<boost::system::error_code> timer_result; boost::optional<boost::system::error_code> read_result;
boost::asio::deadline_timer timer( io ); timer.expires_from_now( boost::posix_time::milliseconds(5000) ); // allow up to 50ms of timeout for every char timer.async_wait( boost::bind(&set_result, &timer_result, _1) );
boost::asio::async_read( stream, result, boost::asio::transfer_at_least(1), boost::bind( &set_result, &read_result, _1 )); boost::system::error_code ec;
while(1) { io.reset(); io.poll_one(ec);
if ( read_result ) { timer.cancel(); return;
} else if ( timer_result ) throw std::runtime_error("timeout"); } }
void receive(boost::asio::io_service & io, boost::asio::posix::stream_descriptor & stream, size_t size, boost::asio::streambuf & result){ while( result.size() < size ) receive(io, stream, result); }
int main(int argc, const char *argv[]) { boost::asio::io_service io; boost::asio::posix::stream_descriptor in(io, ::dup(STDIN_FILENO));
for(int i = 0; i < 5; i++){ std::cout << i << " Type in 4 chareters and press enter" << std::endl << std::endl; std::cout << "in> "; std::cout.flush(); try { boost::asio::streambuf buf; receive(io, in, 5, buf); std::cout << "out> "; std::copy(boost::asio::buffer_cast<const char *>(buf.data()), boost::asio::buffer_cast<const char *>(buf.data()) + buf.size(), ostream_iterator<char>(std::cout));
} catch (const std::exception & e) { std::cout << e.what() << endl; } }
return 0; }
You bind your completion handlers to addresses of local objects. I admit I didn't investigate your code in depth, but at a glance - I'm not sure all these locals always outlive the appropriate functors...

Hi
You bind your completion handlers to addresses of local objects. I admit I didn't investigate your code in depth, but at a glance - I'm not sure all these locals always outlive the appropriate functors...
Well, that is also what I think, but how am I suppose to cancel the async_read such that it will not interrupt me later on or??

It's asynchronous function, and it's completion handler will be called whether the operation is completed or cancelled.
Yes, but how do I tell the async_read that it is cancelled? It is just a free function which returns no handle or anything?? Should I let the boost::asio::posix::stream_descriptor object go out of scope to clean up all the references? -- Allan

Hi Again I have now adjusted it to not take references to any temporary objects: #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/optional.hpp> #include <boost/system/error_code.hpp> class ReadWithTimeout { public: ReadWithTimeout() : io_stream(io_service, ::dup(STDIN_FILENO)) { } void set_result ( boost::optional<boost::system::error_code> * a, boost::system::error_code b ) { if( b == 0) a->reset( b ); } void receive(){ // clear pending flags timer_result = boost::optional<boost::system::error_code>(); read_result = boost::optional<boost::system::error_code>(); boost::asio::deadline_timer timer( io_service ); timer.expires_from_now( boost::posix_time::seconds(5) ); timer.async_wait( boost::bind(&ReadWithTimeout::set_result, this, &timer_result, _1) ); boost::asio::async_read( io_stream, buf, boost::asio::transfer_at_least(1), boost::bind(&ReadWithTimeout::set_result, this, &read_result, _1)); boost::system::error_code ec; while(1) { io_service.reset(); io_service.poll_one(ec); if ( read_result ) { timer.cancel(); return; } else if ( timer_result ) throw std::runtime_error("Timeout"); } } std::string receive(size_t size){ while( buf.size() < size ) receive(); std::string s(boost::asio::buffer_cast<const char *
(buf.data()), boost::asio::buffer_cast<const char * >(buf.data()) + size); buf.consume(size);
return s; } private: boost::asio::io_service io_service; boost::asio::posix::stream_descriptor io_stream; boost::asio::streambuf buf; boost::optional<boost::system::error_code> timer_result; boost::optional<boost::system::error_code> read_result; }; int main(int argc, const char *argv[]) { ReadWithTimeout xx; for(int i = 0; i < 5; i++){ std::cout << i << " Type in 4 chareters and press enter" << std::endl << std::endl; std::cout << "in> "; std::cout.flush(); try { std::cout << "out> " << xx.receive(5); } catch (const std::exception & e) { std::cout << e.what() << std::endl; } } return 0; } But, I'm not sure whether this is a good idea, because I keep on creating new async_read (things, objects??) how are they being cleaned up? -- Allan

But, I'm not sure whether this is a good idea, because I keep on creating new async_read (things, objects??) how are they being cleaned up?
async_read() is just a free function call. One of its parameters is a completion handler, which you create using bind(). This completion hanlder is put to some internal queue managed by io_service, and when the operation is successfully completed or failed or cancelled, the io_serivce cares to invoke the associated completion handler (for this purpose you have pump io_sevice queue with poll or run methods). After the handler is invoked, it's destroyed. So you don't have to "clean-up" anything here, but you do have to ensure that everything you bind into a handler remains alive until the handler gets invoked.

Yes, but how do I tell the async_read that it is cancelled?
It is just a free function which returns no handle or anything??
Should I let the boost::asio::posix::stream_descriptor object go out of scope to clean up all the references?
Calling cancel() on the i/o object cancels all async. operations associated with it: http://www.boost.org/doc/libs/1_47_0/doc/html/boost_asio/reference/posix__ba...

Just for the record, I finally found the problem. The async_read operation needs to be canceled just like the timer: while(1) { io.reset(); io.poll_one(ec); if ( read_result ) { timer.cancel(); return; } else if ( timer_result ) { stream.cancel(); throw std::runtime_error("timeout"); } } } -- Allan
participants (2)
-
Allan Nielsen
-
Igor R