|
Boost Users : |
Subject: Re: [Boost-users] [MPI, serialization] Segmentation fault in heterogeneous cluster
From: Martin Huenniger (m.huenniger_at_[hidden])
Date: 2010-09-21 04:59:23
Hi,
so here are the pieces of code that cause the problem:
The following routine is the main loop of the master process. It
dispatches the data to and from the worker processes:
template<typename Float>
void My_complex<Float>::dispatch()
{
FC_ASSERT( comm.rank() == 0 );
Master<Ball<Float>, Ball<Float> > master_process( comm );
while( !points_to_explore->empty() ||
master_process.some_working() ) {
int command, w_rank;
Ball<Float> *the_ball;
Ball<Float> result( dim, S, S[0], 0);
if( w_rank = master_process.listen( command ) ) {
switch( command ) {
case MW::ask_for_job:
if( !points_to_explore->empty() ) {
the_ball = points_to_explore->top();
points_to_explore->pop();
// Here is the place where the error occurs:
master_process.send_work( w_rank, *the_ball );
delete the_ball;
} else {
master_process.worker_try_again( w_rank );
}
break;
case MW::return_result:
// In this piece of code the problem also appears, but the
// code is essetialy the same with the roles of master and
// worker exchanged...
master_process.get_result( w_rank, result );
the_ball = new Ball<Float>( result );
enqueue( the_ball ); // enques in points_to_explore
break;
case MW::job_done:
master_process.free_worker( w_rank );
break;
}
}
sleep(1);
}
master_process.suspend_all_workers();
return;
}
At the very same time the worker processes run the next routine. They
receive a ball from the master and explore it. During exploration new
found balls are sent back to the master.
template<typename Float>
void My_complex<Float>::working_horse( std::ostream &s )
{
FC_ASSERT( comm.rank() != 0 );
worker_process = new Worker_type( comm );
Ball<Float> data( dim, S, S[0], 0);
// here the data is received from the master process:
while( worker_process->get_work( data ) ) {
Ball<Float> *the_ball = new Ball<Float>( data );
explore_cell( the_ball, the_ball, s );
worker_process->done();
}
delete worker_process;
}
Communication from master to worker is implemented in the next snippet.
That is the code that generates the output provided in the last mail.
The idea is the following: Since the boost::mpi::communicator::send()
and boost::mpi::communicator::recv() routines refuse to work with my
serialization, I implemented the communication using the C Bindings of
MPI. So the data gets serialized in some text_[io]archive over a
std::stringstream (binary_[io]archive isn't working), and the string
extracted from this stringstream is sent over the communication channel.
On receiving it gets corrupted sometimes.
#include <boost/mpi.hpp>
#include <string>
#include <sstream>
#include <boost/serialization/string.hpp>
#include <boost/archive/text_oarchive.hpp>
#include <boost/archive/text_iarchive.hpp>
#include <mpi.h>
#include <cstdlib>
typedef boost::archive::text_oarchive Oarchive;
typedef boost::archive::text_iarchive Iarchive;
template<typename Jobtype, typename Restype>
void Master<Jobtype,Restype>::send_work( const int w,
const Jobtype &data )
{
FC_DEBUG_OUTPUT( << "Master sends data to worker " << w << "\n" );
/*
* comm.send( w, MW::send_job_data_tag, data );
*/
{
std::stringstream ss;
{
Oarchive oa(ss);
oa << data;
}
int size = ss.str().size();
comm.send( w, MW::send_job_data_tag );
FC_DEBUG_OUTPUT( << "Master sends size of data to worker "
<< w << "\n" );
MPI_Send( &size, 1, MPI_INT, w,
MW::send_job_data_size, MPI_Comm(comm) );
FC_DEBUG_OUTPUT( << "Master sends the data to worker "
<< w << "\n" );
FC_DEBUG_OUTPUT( << ss.str() << "\n" );
char *buf = const_cast<char*>( ss.str().c_str() );
MPI_Send( buf , size, MPI_CHAR,
w, MW::send_job_data_tag, MPI_Comm(comm) );
}
if( working[w] == MW::worker_free ) {
working[w] = MW::worker_working;
++num_working_workers;
}
}
template<typename Jobtype, typename Restype>
bool Worker<Jobtype,Restype>::get_work( Jobtype &data )
{
FC_DEBUG_OUTPUT( << "Worker process " << comm.rank()
<< " is trying to get some work.\n" );
while( true ) {
int ask_for_job = MW::ask_for_job;
comm.send( 0, MW::listen_tag, ask_for_job );
mpi::status status = comm.probe( 0, mpi::any_tag );
if( status.tag() == MW::try_again_tag ) {
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " keeps on waiting...\n" );
sleep(1); //usleep(1);
continue;
}
if( status.tag() == MW::suspend_tag ) {
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " is suspended.\n" );
comm.recv( 0, MW::suspend_tag );
return false;
}
/*
* comm.recv( 0, MW::send_job_data_tag, data );
*/
if( status.tag() == MW::send_job_data_tag ) {
comm.recv( 0, MW::send_job_data_tag );
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " is receiving job from Master\n" );
int size;
MPI_Status mstatus;
MPI_Recv( &size, 1, MPI_INT, 0,
MW::send_job_data_size, MPI_Comm(comm), &mstatus );
char *buf = static_cast<char*>( malloc( size ) );
MPI_Recv( buf, size, MPI_CHAR, 0,
MW::send_job_data_tag, MPI_Comm(comm), &mstatus );
std::string s( buf );
std::stringstream ss( s );
FC_DEBUG_OUTPUT( << ss.str() << "\n" );
{
Iarchive ia( ss );
ia >> data;
}
free( buf );
FC_DEBUG_OUTPUT( << "Worker " << comm.rank()
<< " received job.\n" );
return true;
}
}
}
Once again the serialization:
void Ball<Float>::serialize( Archive & ar, const unsigned int version )
{
ar & BOOST_SERIALIZATION_NVP( membership ); //vector<bool>
ar & BOOST_SERIALIZATION_NVP( members ); //vector<int>
ar & BOOST_SERIALIZATION_NVP( r ); //int
ar & BOOST_SERIALIZATION_NVP( is_infinity ); //bool
ar & BOOST_SERIALIZATION_NVP( up_to_date ); //bool
ar & BOOST_SERIALIZATION_NVP( *QR ); //own type, see below
}
void Subspan<Float>::serialize( Archive & ar,
const unsigned int version )
{
ar & BOOST_SERIALIZATION_NVP( membership ); // vector<bool>
ar & BOOST_SERIALIZATION_NVP( members ); // vector<int>
for( int i=0; i<dim; ++i )
for( int j=0; j<dim; ++j )
ar & BOOST_SERIALIZATION_NVP( Q[i][j] ); // double
for( int i=0; i<dim; ++i )
for( int j=0; j<dim; ++j )
ar & BOOST_SERIALIZATION_NVP( R[i][j] ); // double
for( int i=0; i<dim; ++i )
ar & BOOST_SERIALIZATION_NVP( x[i] ); // double
for( int i=0; i<dim; ++i )
ar & BOOST_SERIALIZATION_NVP( d[i] ); // double
ar & BOOST_SERIALIZATION_NVP( r ); // int
}
The template-parameters Jobtype and Restype are instatiated to Ball<double>.
I read elsewhere that using text_archives with doubles is generally a
bad idea since receiving NaN's and inf's is not possible. As the output
in the last mail shows, this was not the case. Maybe you have an idea
what is causing the trouble with the binary_archive, because using these
would seem a lot more safer to me.
All the best
Martin
Matthias Troyer wrote:
> On 20 Sep 2010, at 17:49, Martin Huenniger wrote:
>
>> Hi Matthias,
>>
>> yes //and so on does further serializations. Here is the code, there a some redundancies for later design decisions:
>
> Could you please send a stripped down but complete example code that exhibits your problems? That way it will be easiest to find what is going on
>
> Matthias
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net