|
Boost Users : |
Subject: Re: [Boost-users] [boost.asio] Concurrently Ping 1000+ Hosts
From: Kyle Ketterer (reminisc3_at_[hidden])
Date: 2013-12-18 23:27:35
Thanks for the suggestions, Gavin! I feel as though I've made some
progress. However, my app is still "hanging" when executing the io_service.
Here is a full example of my code:(Sorry if this is long)
Main Thread:
I am using wxWidgets for my platform so if your familiar at all with it
you'll see I'm basically reading from a listview a list of ip's.....
--------------------------------------------------------
int TOTAL_ITEMS = lstUsers->GetItemCount();
int TOTAL_FINISHED = 0;
int MAX_THREADS = 4;
if ( TOTAL_ITEMS < MAX_THREADS )
MAX_THREADS = TOTAL_ITEMS;
std::vector < XPing* > p;
std::vector< std::string> ping_results (lstUsers->GetItemCount());
std::vector< boost::thread* > threads;
boost::asio::io_service io_;
wxString ip_;
wxCharBuffer buffer;
//create ping objects
for ( int i=0; i <TOTAL_ITEMS; ++i)
{
ip_ = lstUsers->GetItemText(i, 3);
buffer = ip_.ToUTF8();
if ( ! (buffer.data()[0]=='0' ) )
{
//ip *should* be valid
try
{
p.push_back( new XPing ( io_, buffer.data(),
boost::ref(ping_results), TOTAL_FINISHED) );
}
catch ( std::exception & e )
{
wxMessageBox(e.what());
}
TOTAL_FINISHED++;
}
}
//create threads
for ( int i = 0; i < MAX_THREADS; i++ )
{
threads.push_back( new boost::thread (
boost::bind(&boost::asio::io_service::run, boost::ref(io_) ) ) );
}
//join/delete threads
for ( int i=0; i < MAX_THREADS; i++ )
{
threads[i]->join();
delete threads[i];
}
//delete ping objects
for ( int i=0; i < TOTAL_FINISHED; i++ )
{
delete p[i];
}
//populate listview with ping values
for ( int i=0; i<TOTAL_FINISHED; i++ )
{
lstUsers->SetItem(i,11,ping_results[i]);
}
-------------------------------------------------------------------------------------------------------------------
Now, here is the XPing class. I made the resolver async as you suggested
and within the resolver handler I start the send/receive functions:
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
class XPing
{
public:
XPing( boost::asio::io_service & io_, const char* destination,
std::vector<std::string> & ping_results, int index=0) :
ping_results_(ping_results),
ip_addr(destination), /*, work_(new
boost::asio::io_service::work(io_service_))*/ strand_(io_), resolver_(io_),
socket_(io_, icmp::v4()),
timer_(io_), io_service_(io_), sequence_number_(0), num_replies_(0)
{
socket_.non_blocking(true);
boost::asio::socket_base::reuse_address option(true);
socket_.set_option(option);
_index_ = index;
strand_.post( boost::bind( &XPing::resolve, this) );
}
void resolve()
{
icmp::resolver::query query(icmp::v4(), ip_addr, "");
resolver_.async_resolve(query, boost::bind(&XPing::handle_resolve,
this, boost::asio::placeholders::error,
boost::asio::placeholders::iterator) );
}
void handle_resolve(const boost::system::error_code& err,
icmp::resolver::iterator endpoint_iterator)
{
if ( !err )
{
//icmp::endpoint endpoint = *endpoint_iterator;
destination_ = *endpoint_iterator;
socket_.async_connect( destination_, strand_.wrap(
boost::bind(&XPing::handle_connect, this, boost::asio::placeholders::error,
++endpoint_iterator) ) );
}
else
{
//error
wxMessageBox("There was an error. Code #2");
}
}
void handle_connect( const boost::system::error_code& err,
icmp::resolver::iterator endpoint_iterator)
{
if (!err)
{
// The connection was successful. Send the request.
strand_.wrap( boost::bind( &XPing::start_send, this) );
strand_.wrap( boost::bind( &XPing::start_receive, this) );
}
else if (endpoint_iterator != icmp::resolver::iterator())
{
wxMessageBox("There was an error. Code #2");
// The connection failed. Try the next endpoint in the list.
socket_.close();
//icmp::endpoint endpoint = *endpoint_iterator;
destination_ = *endpoint_iterator;
socket_.async_connect( destination_, strand_.wrap(
boost::bind(&XPing::handle_connect, this, boost::asio::placeholders::error,
++endpoint_iterator) ) );
}
else
{
//error
wxMessageBox("There was an error. Code #3");
}
}
void Destroy()
{
this->~XPing();
}
std::string GetIP()
{
std::string result = ip_addr;
return result;
}
private:
void start_send()
{
std::string body("\"Hello!\" from Asio ping.");
// Create an ICMP header for an echo request.
icmp_header echo_request;
echo_request.type(icmp_header::echo_request);
echo_request.code(0);
echo_request.identifier(get_identifier());
echo_request.sequence_number(++sequence_number_);
compute_checksum(echo_request, body.begin(), body.end());
// Encode the request packet.
boost::asio::streambuf request_buffer;
std::ostream os(&request_buffer);
os << echo_request << body;
// Send the request.
time_sent_ = posix_time::microsec_clock::universal_time();
try
{
socket_.async_send_to(request_buffer.data(), destination_,
strand_.wrap( boost::bind(&XPing::handle_send, this) ) );
}
catch ( std::exception & e )
{
wxMessageBox( e.what() );
}
}
void handle_send()
{
// Wait 500ms
num_replies_ = 0;
timer_.expires_at(time_sent_ + posix_time::milliseconds(500));
timer_.async_wait( strand_.wrap ( boost::bind(&XPing::handle_timeout,
this ) ) );
}
void handle_timeout()
{
if (num_replies_ == 0)
{
ping_results_[_index_] = "NO";
timer_.cancel();
}
}
void start_receive()
{
// Discard any data already in the buffer.
reply_buffer_.consume(reply_buffer_.size());
// Wait for a reply. We prepare the buffer to receive up to 64KB.
socket_.async_receive(reply_buffer_.prepare(65536), strand_.wrap(
boost::bind(&XPing::handle_receive, this, _2) ) );
}
void handle_receive(std::size_t length)
{
// The actual number of bytes received is committed to the buffer so
that we
// can extract it using a std::istream object.
reply_buffer_.commit(length);
// Decode the reply packet.
std::istream is(&reply_buffer_);
ipv4_header ipv4_hdr;
icmp_header icmp_hdr;
is >> ipv4_hdr >> icmp_hdr;
// We can receive all ICMP packets received by the host, so we need to
// filter out only the echo replies that match the our identifier and
// expected sequence number.
if (is && icmp_hdr.type() == icmp_header::echo_reply
&& icmp_hdr.identifier() == get_identifier()
&& icmp_hdr.sequence_number() == sequence_number_)
{
// If this is the first reply, interrupt the five second timeout.
if (num_replies_++ == 0)
{
timer_.cancel();
}
posix_time::ptime now =
posix_time::microsec_clock::universal_time();
try
{
ping_results_[_index_] = boost::lexical_cast<std::string>( (now
- time_sent_).total_milliseconds() ) + "ms"; //this may need to be locked
by mutex?
}
catch (std::exception & e)
{
wxMessageBox( e.what() );
}
}
else if ( (icmp_hdr.sequence_number() == 5) ||
(icmp_hdr.sequence_number() == 3) || (icmp_hdr.sequence_number() == 11) )
{
if (num_replies_++ == 0)
{
timer_.cancel();
}
try
{
ping_results_[_index_] = "DNS"; //this may need to be locked by
mutex?
}
catch (std::exception & e)
{
wxMessageBox( e.what() );
}
}
}
----------------------------------------------------------------------------------------------------------
Is there something that I'm missing? I appreciate your help :)
On Wed, Dec 18, 2013 at 8:24 PM, Gavin Lambert <gavinl_at_[hidden]>wrote:
> On 19/12/2013 13:40, Quoth Kyle Ketterer:
>
> Since I will now be using one io_service object, how can I stop the
>> threads from blocking? When I had an io_service per object, I could just
>> call io_service.stop() and I could successfully call a join() on the
>> thread. Since I am now passing a reference to an io_service object, it
>> seems as though the thread will not join().
>>
>
> You only stop() the io_service once all the pings are done. Or if the
> PingX object internally knows when it's done (eg. if it reads the right
> things, or times out) then you just let them not requeue their async_* work
> when they're done and they'll "fall out" automatically. If you don't have
> an explicit io_service::work object then the run() will automatically
> terminate once there are no outstanding async_* jobs or in-progress handler
> calls; then you don't need to stop() at all unless the user wants to cancel
> before all the pings have finished. (And even then, you can just cancel
> the pings instead.)
>
>
> icmp::resolver::query query(icmp::v4(), ip_addr, "");
>> destination_ = *resolver_.resolve(query);
>>
>
> You should probably make this async as well, otherwise it will limit
> performance.
>
>
> The async functions in start_send and start_receive are strand.wrap()
>> 'd. I call timer.cancel() as well as socket.close() and it seems I can't
>> get it to unblock. Any ideas?
>>
>
> Technically you should cancel/close on the same strand as the read/write
> operations, as these are not officially cross-thread-safe operations. In
> practice it usually seems to be safe to not do this though, but it might
> depend on your platform.
>
>
> boost::asio::io_service::work work_(io_);
>>
>
> You only need the explicit work object if you are going to have a moment
> when you're run()ing with no other work (no outstanding async_* requests).
> Typically this is only an issue if you create the threads first and have
> some other action that may or may not happen (eg. user activity not
> involving an incoming network request) that occurs later to initiate the
> async operations, which does not appear to be the case in your example.
>
> In your case, you should just be able to create all your ping objects,
> which should just queue up an async_resolve (which will then internally
> queue the async_read/async_send when the resolve completes), so you
> shouldn't need explicit work.
>
>
> //buffer etc etc
>>
>> p.push_back( new XPing ( io_, buffer.data(),
>> boost::ref(ping_results), i) );
>>
>
> Remember that you can't share writable buffers between concurrent workers.
> If this is constant data that you're sending then this is ok, but
> otherwise not.
>
>
> threads.push_back( new boost::thread (
>> boost::bind(&boost::asio::io_service::run, boost::ref(io_) ) ) );
>>
>> t_count++;
>>
>> if(t_count >= max_threads) //join threads after max threads
>> {
>> for(...j...)
>> {
>> threads[j]->join();
>> }
>>
>> t_count = 0;
>>
>> }
>>
>> }
>>
>
> This is wrong. You should have one loop creating all your ping objects
> (and setting their initial async work). Then a *separate* loop that
> creates the threads, and then finally after that a *third* loop that joins
> them all. And as I said before, your number of threads should not be
> related to your number of ping objects; it should be related to the # of
> CPUs, or just a fixed (small) number.
>
>
>
> _______________________________________________
> Boost-users mailing list
> Boost-users_at_[hidden]
> http://lists.boost.org/mailman/listinfo.cgi/boost-users
>
-- Kyle Ketterer <reminisc3_at_[hidden]> 215-208-8523
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net