Boost logo

Boost Users :

Subject: Re: [Boost-users] mpi non-blocking communication
From: Philipp Kraus (philipp.kraus_at_[hidden])
Date: 2010-11-29 10:17:17


On 2010-11-28 13:42:04 +0100, Riccardo Murri said:

> On Sun, Nov 28, 2010 at 1:25 PM, Philipp Kraus
> <philipp.kraus_at_[hidden]> wrote:
>> I understand the recv in this way, that it waits until a message is send. I
>> think if I have no messages, the receive blocks the loop, do it?
>> I would like to create the code in that way:
>>
>> while (...) {
>> while mpi-message-is-there
>>    do something
>> }
>>
>
> Indeed, `recv` is a blocking call. To receive a message only if one
> is available, you need to probe for it first, and then receive.
> Something along these lines::
>
>
> while(...) {
> while(boost::optional<mpi::status> status =
> p_comm.iprobe(mpi::any_source, TAG)) {
> // a message is available, receive it
> mpi::recv(status->source(), status->tag(), l_str);
> // process `l_str`
> };
> // ...
> };
>
> Caveat: use `status->source()` and `status->tag()` in the recv call to
> ensure you recv the message that you probed for (this may still fail
> if you're running iprobe/recv loops in several threads concurrently
> within the same MPI process).

Thanks for the hint with the treads. I have written my code, but at the
moment there is a problem to receive all messages:
Every process run a "startmethod", which creates on CPU 0 a thread (at
the end of the methode a barrier synchronize all CPUs).

    inline void logger::startListener( const mpi::communicator& p_mpi )
    {
        if ((p_mpi.size() == 1) || (m_listenerrunnging))
            return;

        m_listenerrunnging = true;
        if (p_mpi.rank() == 0)
            boost::thread l_thread( boost::bind( &logger::listener,
this, boost::cref(p_mpi)) );

        p_mpi.barrier();
    }

The thread method:

    inline void logger::listener( const mpi::communicator& p_mpi )
    {
        boost::lock_guard<boost::mutex> l_lock(m_muxfinalize);

        while (m_listenerrunnging) {
            while (boost::optional<mpi::status> l_status =
p_mpi.iprobe(mpi::any_source, LOGGER_MPI_TAG)) {
                p_mpi.recv( l_status->source(), l_status->tag(), l_str );
            }

            boost::this_thread::yield();
        }
    }

The lister must be shutdown with this code:

    inline void logger::shutdownListener( const mpi::communicator& p_mpi ) {
        if (!m_listenerrunnging)
            return;

        m_listenerrunnging = false;
        boost::lock_guard<boost::mutex> l_lock(m_muxfinalize);
        p_mpi.barrier();
    }

At this time I have two problems. Sometimes the programm create a error with:
Adress not mapped (it seems that the iprobe creates the problem). I
can't reproduce the error.

Another problem is, that I don't received all messages, the messages
are send with isend command.
I have try to create a loop with iprobe in my shutdown method to get
the rest of the messages, but that doesn't work well.

I would send "every time" messages to CPU 0. The CPU 0 should collecte
the messages, the last point to collect all messages shoudl be the
shutdown method.

Thx for helping with the problem

Phil


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net