Boost logo

Boost Users :

Subject: Re: [Boost-users] [boost.asio] Concurrently Ping 1000+ Hosts
From: Kyle Ketterer (reminisc3_at_[hidden])
Date: 2013-12-18 23:27:35


Thanks for the suggestions, Gavin! I feel as though I've made some
progress. However, my app is still "hanging" when executing the io_service.
Here is a full example of my code:(Sorry if this is long)

Main Thread:
I am using wxWidgets for my platform so if your familiar at all with it
you'll see I'm basically reading from a listview a list of ip's.....

--------------------------------------------------------
int TOTAL_ITEMS = lstUsers->GetItemCount();
    int TOTAL_FINISHED = 0;
    int MAX_THREADS = 4;

    if ( TOTAL_ITEMS < MAX_THREADS )
        MAX_THREADS = TOTAL_ITEMS;

    std::vector < XPing* > p;
    std::vector< std::string> ping_results (lstUsers->GetItemCount());
    std::vector< boost::thread* > threads;

    boost::asio::io_service io_;

    wxString ip_;
    wxCharBuffer buffer;

    //create ping objects
    for ( int i=0; i <TOTAL_ITEMS; ++i)
    {

        ip_ = lstUsers->GetItemText(i, 3);
        buffer = ip_.ToUTF8();

        if ( ! (buffer.data()[0]=='0' ) )
        {

            //ip *should* be valid
            try
            {

                p.push_back( new XPing ( io_, buffer.data(),
boost::ref(ping_results), TOTAL_FINISHED) );

            }
            catch ( std::exception & e )
            {
                wxMessageBox(e.what());
            }

            TOTAL_FINISHED++;

        }

    }

    //create threads
    for ( int i = 0; i < MAX_THREADS; i++ )
    {
        threads.push_back( new boost::thread (
boost::bind(&boost::asio::io_service::run, boost::ref(io_) ) ) );
    }

    //join/delete threads
    for ( int i=0; i < MAX_THREADS; i++ )
    {
        threads[i]->join();
        delete threads[i];
    }

    //delete ping objects
    for ( int i=0; i < TOTAL_FINISHED; i++ )
    {
        delete p[i];
    }

    //populate listview with ping values
    for ( int i=0; i<TOTAL_FINISHED; i++ )
    {
        lstUsers->SetItem(i,11,ping_results[i]);
    }

-------------------------------------------------------------------------------------------------------------------

Now, here is the XPing class. I made the resolver async as you suggested
and within the resolver handler I start the send/receive functions:

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

class XPing
{
public:
  XPing( boost::asio::io_service & io_, const char* destination,
std::vector<std::string> & ping_results, int index=0) :
ping_results_(ping_results),
  ip_addr(destination), /*, work_(new
boost::asio::io_service::work(io_service_))*/ strand_(io_), resolver_(io_),
socket_(io_, icmp::v4()),
  timer_(io_), io_service_(io_), sequence_number_(0), num_replies_(0)
  {

    socket_.non_blocking(true);
    boost::asio::socket_base::reuse_address option(true);
    socket_.set_option(option);
    _index_ = index;

    strand_.post( boost::bind( &XPing::resolve, this) );

  }

  void resolve()
  {
      icmp::resolver::query query(icmp::v4(), ip_addr, "");
      resolver_.async_resolve(query, boost::bind(&XPing::handle_resolve,
this, boost::asio::placeholders::error,
boost::asio::placeholders::iterator) );

  }

  void handle_resolve(const boost::system::error_code& err,
icmp::resolver::iterator endpoint_iterator)
  {

      if ( !err )
      {

          //icmp::endpoint endpoint = *endpoint_iterator;
          destination_ = *endpoint_iterator;
          socket_.async_connect( destination_, strand_.wrap(
boost::bind(&XPing::handle_connect, this, boost::asio::placeholders::error,
++endpoint_iterator) ) );

      }
      else
      {
          //error
          wxMessageBox("There was an error. Code #2");
      }

  }

  void handle_connect( const boost::system::error_code& err,
icmp::resolver::iterator endpoint_iterator)
  {
    if (!err)
    {
        // The connection was successful. Send the request.
        strand_.wrap( boost::bind( &XPing::start_send, this) );
        strand_.wrap( boost::bind( &XPing::start_receive, this) );

    }
    else if (endpoint_iterator != icmp::resolver::iterator())
    {
        wxMessageBox("There was an error. Code #2");
      // The connection failed. Try the next endpoint in the list.
        socket_.close();
        //icmp::endpoint endpoint = *endpoint_iterator;
        destination_ = *endpoint_iterator;
        socket_.async_connect( destination_, strand_.wrap(
boost::bind(&XPing::handle_connect, this, boost::asio::placeholders::error,
++endpoint_iterator) ) );
    }
    else
    {
      //error
      wxMessageBox("There was an error. Code #3");
    }
  }

  void Destroy()
  {
      this->~XPing();
  }

std::string GetIP()
{
    std::string result = ip_addr;

    return result;
}

private:

  void start_send()
  {

    std::string body("\"Hello!\" from Asio ping.");

    // Create an ICMP header for an echo request.
    icmp_header echo_request;
    echo_request.type(icmp_header::echo_request);
    echo_request.code(0);
    echo_request.identifier(get_identifier());
    echo_request.sequence_number(++sequence_number_);
    compute_checksum(echo_request, body.begin(), body.end());

    // Encode the request packet.
    boost::asio::streambuf request_buffer;
    std::ostream os(&request_buffer);
    os << echo_request << body;

    // Send the request.
    time_sent_ = posix_time::microsec_clock::universal_time();

    try
    {

        socket_.async_send_to(request_buffer.data(), destination_,
strand_.wrap( boost::bind(&XPing::handle_send, this) ) );

    }
    catch ( std::exception & e )
    {
        wxMessageBox( e.what() );
    }

  }

  void handle_send()
  {

    // Wait 500ms
    num_replies_ = 0;
    timer_.expires_at(time_sent_ + posix_time::milliseconds(500));

    timer_.async_wait( strand_.wrap ( boost::bind(&XPing::handle_timeout,
this ) ) );

  }

  void handle_timeout()
  {

    if (num_replies_ == 0)
    {

        ping_results_[_index_] = "NO";
        timer_.cancel();

    }

  }

  void start_receive()
  {

    // Discard any data already in the buffer.
    reply_buffer_.consume(reply_buffer_.size());

    // Wait for a reply. We prepare the buffer to receive up to 64KB.
    socket_.async_receive(reply_buffer_.prepare(65536), strand_.wrap(
boost::bind(&XPing::handle_receive, this, _2) ) );

  }

  void handle_receive(std::size_t length)
{

    // The actual number of bytes received is committed to the buffer so
that we
    // can extract it using a std::istream object.
    reply_buffer_.commit(length);

    // Decode the reply packet.
    std::istream is(&reply_buffer_);
    ipv4_header ipv4_hdr;
    icmp_header icmp_hdr;
    is >> ipv4_hdr >> icmp_hdr;

    // We can receive all ICMP packets received by the host, so we need to
    // filter out only the echo replies that match the our identifier and
    // expected sequence number.
    if (is && icmp_hdr.type() == icmp_header::echo_reply
          && icmp_hdr.identifier() == get_identifier()
          && icmp_hdr.sequence_number() == sequence_number_)
    {
      // If this is the first reply, interrupt the five second timeout.
        if (num_replies_++ == 0)
        {
            timer_.cancel();
        }

        posix_time::ptime now =
posix_time::microsec_clock::universal_time();

        try
        {

            ping_results_[_index_] = boost::lexical_cast<std::string>( (now
- time_sent_).total_milliseconds() ) + "ms"; //this may need to be locked
by mutex?

        }
        catch (std::exception & e)
        {
            wxMessageBox( e.what() );
        }

    }
    else if ( (icmp_hdr.sequence_number() == 5) ||
(icmp_hdr.sequence_number() == 3) || (icmp_hdr.sequence_number() == 11) )
    {

        if (num_replies_++ == 0)
        {
            timer_.cancel();
        }

        try
        {
            ping_results_[_index_] = "DNS"; //this may need to be locked by
mutex?

        }
        catch (std::exception & e)
        {
            wxMessageBox( e.what() );
        }

    }

  }

----------------------------------------------------------------------------------------------------------

Is there something that I'm missing? I appreciate your help :)

On Wed, Dec 18, 2013 at 8:24 PM, Gavin Lambert <gavinl_at_[hidden]>wrote:

> On 19/12/2013 13:40, Quoth Kyle Ketterer:
>
> Since I will now be using one io_service object, how can I stop the
>> threads from blocking? When I had an io_service per object, I could just
>> call io_service.stop() and I could successfully call a join() on the
>> thread. Since I am now passing a reference to an io_service object, it
>> seems as though the thread will not join().
>>
>
> You only stop() the io_service once all the pings are done. Or if the
> PingX object internally knows when it's done (eg. if it reads the right
> things, or times out) then you just let them not requeue their async_* work
> when they're done and they'll "fall out" automatically. If you don't have
> an explicit io_service::work object then the run() will automatically
> terminate once there are no outstanding async_* jobs or in-progress handler
> calls; then you don't need to stop() at all unless the user wants to cancel
> before all the pings have finished. (And even then, you can just cancel
> the pings instead.)
>
>
> icmp::resolver::query query(icmp::v4(), ip_addr, "");
>> destination_ = *resolver_.resolve(query);
>>
>
> You should probably make this async as well, otherwise it will limit
> performance.
>
>
> The async functions in start_send and start_receive are strand.wrap()
>> 'd. I call timer.cancel() as well as socket.close() and it seems I can't
>> get it to unblock. Any ideas?
>>
>
> Technically you should cancel/close on the same strand as the read/write
> operations, as these are not officially cross-thread-safe operations. In
> practice it usually seems to be safe to not do this though, but it might
> depend on your platform.
>
>
> boost::asio::io_service::work work_(io_);
>>
>
> You only need the explicit work object if you are going to have a moment
> when you're run()ing with no other work (no outstanding async_* requests).
> Typically this is only an issue if you create the threads first and have
> some other action that may or may not happen (eg. user activity not
> involving an incoming network request) that occurs later to initiate the
> async operations, which does not appear to be the case in your example.
>
> In your case, you should just be able to create all your ping objects,
> which should just queue up an async_resolve (which will then internally
> queue the async_read/async_send when the resolve completes), so you
> shouldn't need explicit work.
>
>
> //buffer etc etc
>>
>> p.push_back( new XPing ( io_, buffer.data(),
>> boost::ref(ping_results), i) );
>>
>
> Remember that you can't share writable buffers between concurrent workers.
> If this is constant data that you're sending then this is ok, but
> otherwise not.
>
>
> threads.push_back( new boost::thread (
>> boost::bind(&boost::asio::io_service::run, boost::ref(io_) ) ) );
>>
>> t_count++;
>>
>> if(t_count >= max_threads) //join threads after max threads
>> {
>> for(...j...)
>> {
>> threads[j]->join();
>> }
>>
>> t_count = 0;
>>
>> }
>>
>> }
>>
>
> This is wrong. You should have one loop creating all your ping objects
> (and setting their initial async work). Then a *separate* loop that
> creates the threads, and then finally after that a *third* loop that joins
> them all. And as I said before, your number of threads should not be
> related to your number of ping objects; it should be related to the # of
> CPUs, or just a fixed (small) number.
>
>
>
> _______________________________________________
> Boost-users mailing list
> Boost-users_at_[hidden]
> http://lists.boost.org/mailman/listinfo.cgi/boost-users
>

-- 
Kyle Ketterer
<reminisc3_at_[hidden]>
215-208-8523


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net