Thanks for the suggestions, Gavin! I feel as though I've made some progress. However, my app is still "hanging" when executing the io_service. Here is a full example of my code:(Sorry if this is long)

Main Thread:
I am using wxWidgets for my platform so if your familiar at all with it you'll see I'm basically reading from a listview a list of ip's.....

--------------------------------------------------------
int TOTAL_ITEMS = lstUsers->GetItemCount();
    int TOTAL_FINISHED = 0;
    int MAX_THREADS = 4;

    if ( TOTAL_ITEMS < MAX_THREADS )
        MAX_THREADS = TOTAL_ITEMS;

    std::vector < XPing* > p;
    std::vector< std::string> ping_results (lstUsers->GetItemCount());
    std::vector< boost::thread* > threads;

    boost::asio::io_service io_;
    
    wxString ip_;
    wxCharBuffer buffer;

    //create ping objects
    for ( int i=0; i <TOTAL_ITEMS; ++i)
    {

        ip_ = lstUsers->GetItemText(i, 3);
        buffer = ip_.ToUTF8();

        if ( ! (buffer.data()[0]=='0' ) )
        {

            //ip *should* be valid
            try
            {

                p.push_back( new XPing ( io_, buffer.data(), boost::ref(ping_results), TOTAL_FINISHED) );

            }
            catch ( std::exception & e )
            {
                wxMessageBox(e.what());
            }

            TOTAL_FINISHED++;


        }

    }

    //create threads
    for ( int i = 0; i < MAX_THREADS; i++ )
    {
        threads.push_back( new boost::thread ( boost::bind(&boost::asio::io_service::run, boost::ref(io_) ) ) );
    }

    //join/delete threads
    for ( int i=0; i < MAX_THREADS; i++ )
    {
        threads[i]->join();
        delete threads[i];
    }

    //delete ping objects
    for ( int i=0; i < TOTAL_FINISHED; i++ )
    {
        delete p[i];
    }

    //populate listview with ping values
    for ( int i=0; i<TOTAL_FINISHED; i++ )
    {
        lstUsers->SetItem(i,11,ping_results[i]);
    }

-------------------------------------------------------------------------------------------------------------------

Now, here is the XPing class. I made the resolver async as you suggested and within the resolver handler I start the send/receive functions:

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

class XPing
{
public:
  XPing( boost::asio::io_service & io_, const char* destination, std::vector<std::string> & ping_results, int index=0) : ping_results_(ping_results),
  ip_addr(destination), /*, work_(new boost::asio::io_service::work(io_service_))*/ strand_(io_), resolver_(io_), socket_(io_, icmp::v4()),
  timer_(io_), io_service_(io_), sequence_number_(0), num_replies_(0)
  {

    socket_.non_blocking(true);
    boost::asio::socket_base::reuse_address option(true);
    socket_.set_option(option);
    _index_ = index;

    strand_.post(  boost::bind( &XPing::resolve, this) );

  }

  void resolve()
  {
      icmp::resolver::query query(icmp::v4(), ip_addr, "");
      resolver_.async_resolve(query, boost::bind(&XPing::handle_resolve, this, boost::asio::placeholders::error, boost::asio::placeholders::iterator) );

  }

  void handle_resolve(const boost::system::error_code& err, icmp::resolver::iterator endpoint_iterator)
  {

      if ( !err )
      {

          //icmp::endpoint endpoint = *endpoint_iterator;
          destination_ = *endpoint_iterator;
          socket_.async_connect( destination_, strand_.wrap( boost::bind(&XPing::handle_connect, this, boost::asio::placeholders::error, ++endpoint_iterator) ) );

      }
      else
      {
          //error
          wxMessageBox("There was an error. Code #2");
      }

  }

  void handle_connect( const boost::system::error_code& err, icmp::resolver::iterator endpoint_iterator)
  {
    if (!err)
    {
        // The connection was successful. Send the request.
        strand_.wrap( boost::bind( &XPing::start_send, this) );
        strand_.wrap( boost::bind( &XPing::start_receive, this) );

    }
    else if (endpoint_iterator != icmp::resolver::iterator())
    {
        wxMessageBox("There was an error. Code #2");
      // The connection failed. Try the next endpoint in the list.
        socket_.close();
        //icmp::endpoint endpoint = *endpoint_iterator;
        destination_ = *endpoint_iterator;
        socket_.async_connect( destination_, strand_.wrap( boost::bind(&XPing::handle_connect, this, boost::asio::placeholders::error, ++endpoint_iterator) ) );
    }
    else
    {
      //error
      wxMessageBox("There was an error. Code #3");
    }
  }

  void Destroy()
  {
      this->~XPing();
  }

std::string GetIP()
{
    std::string result = ip_addr;

    return result;
}

private:

  void start_send()
  {

    std::string body("\"Hello!\" from Asio ping.");

    // Create an ICMP header for an echo request.
    icmp_header echo_request;
    echo_request.type(icmp_header::echo_request);
    echo_request.code(0);
    echo_request.identifier(get_identifier());
    echo_request.sequence_number(++sequence_number_);
    compute_checksum(echo_request, body.begin(), body.end());

    // Encode the request packet.
    boost::asio::streambuf request_buffer;
    std::ostream os(&request_buffer);
    os << echo_request << body;

    // Send the request.
    time_sent_ = posix_time::microsec_clock::universal_time();

    try
    {

        socket_.async_send_to(request_buffer.data(), destination_, strand_.wrap( boost::bind(&XPing::handle_send, this) ) );

    }
    catch ( std::exception & e )
    {
        wxMessageBox( e.what() );
    }

  }

  void handle_send()
  {

    // Wait 500ms
    num_replies_ = 0;
    timer_.expires_at(time_sent_ + posix_time::milliseconds(500));

    timer_.async_wait( strand_.wrap ( boost::bind(&XPing::handle_timeout, this ) ) );


  }

  void handle_timeout()
  {

    if (num_replies_ == 0)
    {

        ping_results_[_index_] = "NO";
        timer_.cancel();

    }

  }

  void start_receive()
  {

    // Discard any data already in the buffer.
    reply_buffer_.consume(reply_buffer_.size());

    // Wait for a reply. We prepare the buffer to receive up to 64KB.
    socket_.async_receive(reply_buffer_.prepare(65536), strand_.wrap( boost::bind(&XPing::handle_receive, this, _2) ) );

  }

  void handle_receive(std::size_t length)
{

    // The actual number of bytes received is committed to the buffer so that we
    // can extract it using a std::istream object.
    reply_buffer_.commit(length);

    // Decode the reply packet.
    std::istream is(&reply_buffer_);
    ipv4_header ipv4_hdr;
    icmp_header icmp_hdr;
    is >> ipv4_hdr >> icmp_hdr;

    // We can receive all ICMP packets received by the host, so we need to
    // filter out only the echo replies that match the our identifier and
    // expected sequence number.
    if (is && icmp_hdr.type() == icmp_header::echo_reply
          && icmp_hdr.identifier() == get_identifier()
          && icmp_hdr.sequence_number() == sequence_number_)
    {
      // If this is the first reply, interrupt the five second timeout.
        if (num_replies_++ == 0)
        {
            timer_.cancel();
        }

        posix_time::ptime now = posix_time::microsec_clock::universal_time();

        try
        {

            ping_results_[_index_] = boost::lexical_cast<std::string>( (now - time_sent_).total_milliseconds() ) + "ms"; //this may need to be locked by mutex?

        }
        catch (std::exception & e)
        {
            wxMessageBox( e.what() );
        }


    }
    else if ( (icmp_hdr.sequence_number() == 5) || (icmp_hdr.sequence_number() == 3) || (icmp_hdr.sequence_number() == 11) )
    {

        if (num_replies_++ == 0)
        {
            timer_.cancel();
        }

        try
        {
            ping_results_[_index_] = "DNS"; //this may need to be locked by mutex?

        }
        catch (std::exception & e)
        {
            wxMessageBox( e.what() );
        }

    }

  }

----------------------------------------------------------------------------------------------------------

Is there something that I'm missing? I appreciate your help :) 


On Wed, Dec 18, 2013 at 8:24 PM, Gavin Lambert <gavinl@compacsort.com> wrote:
On 19/12/2013 13:40, Quoth Kyle Ketterer:

Since I will now be using one io_service object, how can I stop the
threads from blocking? When I had an io_service per object, I could just
call io_service.stop() and I could successfully call a join() on the
thread. Since I am now passing a reference to an io_service object, it
seems as though the thread will not join().

You only stop() the io_service once all the pings are done.  Or if the PingX object internally knows when it's done (eg. if it reads the right things, or times out) then you just let them not requeue their async_* work when they're done and they'll "fall out" automatically.  If you don't have an explicit io_service::work object then the run() will automatically terminate once there are no outstanding async_* jobs or in-progress handler calls; then you don't need to stop() at all unless the user wants to cancel before all the pings have finished.  (And even then, you can just cancel the pings instead.)


     icmp::resolver::query query(icmp::v4(), ip_addr, "");
     destination_ = *resolver_.resolve(query);

You should probably make this async as well, otherwise it will limit performance.


The async functions in start_send and start_receive are strand.wrap()
'd. I call timer.cancel() as well as socket.close() and it seems I can't
get it to unblock. Any ideas?

Technically you should cancel/close on the same strand as the read/write operations, as these are not officially cross-thread-safe operations. In practice it usually seems to be safe to not do this though, but it might depend on your platform.


boost::asio::io_service::work work_(io_);

You only need the explicit work object if you are going to have a moment when you're run()ing with no other work (no outstanding async_* requests).  Typically this is only an issue if you create the threads first and have some other action that may or may not happen (eg. user activity not involving an incoming network request) that occurs later to initiate the async operations, which does not appear to be the case in your example.

In your case, you should just be able to create all your ping objects, which should just queue up an async_resolve (which will then internally queue the async_read/async_send when the resolve completes), so you shouldn't need explicit work.


   //buffer etc etc

   p.push_back( new XPing ( io_, buffer.data(),
boost::ref(ping_results), i) );

Remember that you can't share writable buffers between concurrent workers.  If this is constant data that you're sending then this is ok, but otherwise not.


   threads.push_back( new boost::thread (
boost::bind(&boost::asio::io_service::run, boost::ref(io_) ) ) );

   t_count++;

   if(t_count >= max_threads) //join threads after max threads
   {
     for(...j...)
     {
       threads[j]->join();
     }

     t_count = 0;

   }

}

This is wrong.  You should have one loop creating all your ping objects (and setting their initial async work).  Then a *separate* loop that creates the threads, and then finally after that a *third* loop that joins them all.  And as I said before, your number of threads should not be related to your number of ping objects; it should be related to the # of CPUs, or just a fixed (small) number.



_______________________________________________
Boost-users mailing list
Boost-users@lists.boost.org
http://lists.boost.org/mailman/listinfo.cgi/boost-users



--
Kyle Ketterer
<reminisc3@gmail.com>
215-208-8523