// July 6 2010, test boost MPI multiple message transfer // I used #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define PRINTFOUT 1 // T1 task : taskTotalNum <= availableRank // T2 task : taskTotalNum > availableRank so tasks maybe evenly or non-evenly distributed to workers const int downStreamTaskTag= 123; const int upStreamTaskTag = 456; const int generationNum = 1; const int TOTALTASKNUM = 3; const int ISTRUE =1; const int ISFALSE =0; const int SLEEPTIME=1; const bool BOOLFALSE = 0; const bool BOOLTRUE = 1; // Puts thread to sleep. using namespace Rha; void sleepFor(int seconds) { // Sleep. // boost::xtime xt; boost::xtime_get(&xt, boost::TIME_UTC); xt.sec += seconds; boost::thread::sleep(xt); } void task5(int destRank, int sourceRank, int taskTag, mpi::communicator world, TaskPackage myTaskPackage) //void task5(int destRank) { mpi::request reqs; //reqs = world.isend(destRank, taskTag, myTaskPackage); world.send(destRank, taskTag, myTaskPackage); std::cout << "Send Task package from " << sourceRank << ", the task will reach node: " << destRank << std::endl; } int main(int argc, char** argv) { mpi::environment env(argc, argv); mpi::communicator world; int availableRank = world.size()-1; std::cout << "I am rank " << world.rank() << " , out of total size " << world.size() << ", the taskTotalNum is " << TOTALTASKNUM << " , the availableRank is " << availableRank << ", the generationNum is " << generationNum << ". \n\n" << std::endl; int beginIndex, taskCounter=0; if (world.rank() == 0) // manager node send task to worker node { FILE * pFile; pFile = fopen ("task_seq_output.txt","w"); if (pFile == NULL) { printf("error openging a file. \n\n"); exit (1); } FILE * paraFile; paraFile = fopen ("task_para_output.txt","w"); if (paraFile == NULL) { printf("error openging a file. \n\n"); exit (1); } FILE * paraT1File; paraT1File = fopen ("task_T1para_output.txt","w"); if (paraT1File == NULL) { printf("error openging a file. \n\n"); exit (1); } FILE * paraT2File; paraT2File = fopen ("task_T2para_output.txt","w"); if (paraT2File == NULL) { printf("error openging a file. \n\n"); exit (1); } int sourceRank=0, taskTag=0, tempResult1=0, tempResult2=0, stepLength =2, taskCounterT1=0, taskCounterT2, taskCounterT3=0; int iRank,destRank, taski, aveTaskNum, leftTaskNum =0, taskOperand, startRank=1; int taskTotalNum = TOTALTASKNUM, eachRank, recvReqIndex, columnNum, taskIndexT1=0; // define a task package to get results if (world.size() <= 2) // only 1 CPU sequential { columnNum = taskTotalNum; } else if ( world.size() > 2) // more than 2 CPUs parallel { //columnNum = (int)(taskTotalNum/availableRank) + taskTotalNum % availableRank ; columnNum = (taskTotalNum - taskTotalNum % availableRank)/availableRank +1 ; } // master use pointer array to collect result of T2 std::vector< std::vector > resultTaskPackageT2Pr(availableRank, std::vector()); std::vector< std::vector >::iterator t2TaskResultItr_ii; // T1 data container // master use pointer array to collect result of T1 std::vector resultTaskPackageT1Pr(availableRank); std::vector::iterator t1TaskResultItr_i; std::vector recvReqsT1(taskTotalNum); std::vector recvReqsT2(taskTotalNum); std::vector sendReqsT1(taskTotalNum); std::vector sendReqsT2(taskTotalNum); std::vector::iterator recvReqsT2Itr; std::vector::iterator recvReqsT1Itr; std::vector recvReqMark(taskTotalNum); std::vector::iterator recvReqMarkItr; std::cout << "I am rank " << world.rank() << ", availableRank is " << availableRank << ", columnNum is " << columnNum << ". \n\n " << std::endl; int myGenerationNum = 1, tempResult = 0, t2FinishStatusCounter =0, reqCounter=1; // T1 T2 task has different aveTaskPerNode int resultSourceRank, t1Flag=0, t2Flag=0; int myUpStreamTaskTag=0, myDownStreamTaskTag; bool t2FinishStatus = BOOLFALSE; std::vector taskPerRank(world.size()-1); //int taskArray[world.size()]; std::vector taskArray(world.size()-1); TaskSleep seqTasksleep; TaskPackage myTaskPackageM(world); // do timging mpi::timer seqTimer; mpi::timer paraTimerT1; mpi::timer paraTimerT2; while (myGenerationNum <= generationNum) { taskCounterT2 =0; myTaskPackageM.setEndRun(0); TaskPackage myTaskPackageT0; tempResult = 0; // sequential algorithm only 1 CPU if (availableRank < 1) { #ifdef PRINTFOUT fprintf(pFile, "In node %d , taskTotalNum = %d, availableRank = %d \n\n", world.rank(), taskTotalNum, availableRank ); fprintf(pFile,"========================================================= \n\n") ; fprintf(pFile, " node | task# | OperA| OperB| Oper| result \n") ; #endif for (taski = 1; taski <= taskTotalNum; taski++) { // sleep timing seqTasksleep.setSleepTime(SLEEPTIME); seqTasksleep.doSleep(); sourceRank = 0; TaskPackage myNewTaskPackage(world); myNewTaskPackage.setMyRank(sourceRank); myNewTaskPackage.setMyGeneration(myGenerationNum); myNewTaskPackage.setTaskOperationType(PLUS); taskOperand = (taski-1) * stepLength + 1; myNewTaskPackage.setOperandA(taskOperand); myNewTaskPackage.setOperandB(taskOperand+1); std::cout << " In node " << world.rank() << ", taskTotalNum = " << taskTotalNum << ", availableRank = " << availableRank << ". \n\n" << std::endl; std::cout << "At generation " << myNewTaskPackage.getMyGeneration() << ", My rank is " << world.rank() << " OperandA is " << myNewTaskPackage.getOperandA() << ", and OperandB is " << myNewTaskPackage.getOperandB() << " . \n\n" << std::endl; if (myNewTaskPackage.getTaskOperationType() == PLUS) { tempResult = myNewTaskPackage.getOperandA() + myNewTaskPackage.getOperandB()+ tempResult; myTaskPackageT0.setResult(tempResult); std::cout << "my rank is " << world.rank() << "my receved OperandA is " << myNewTaskPackage.getOperandA() << ", and OperandB is " << myNewTaskPackage.getOperandB() << ", their operation result is " << myNewTaskPackage.getOperandA() + myNewTaskPackage.getOperandB() << ", the current total result is " << myTaskPackageT0.getResult() << "\n\n" << std::endl; #ifdef PRINTFOUT fprintf(pFile, " %3d %6d %6d %6d %6d %6d \n ", world.rank(), taski, myNewTaskPackage.getOperandA(), myNewTaskPackage.getOperandB() , myNewTaskPackage.getTaskOperationType(), myTaskPackageT0.getResult() ) ; #endif } else if(myNewTaskPackage.getTaskOperationType() == MINUS) { myNewTaskPackage.setResult(myNewTaskPackage.getOperandA() - myNewTaskPackage.getOperandB()); } } std::cout << "I am rank " << world.rank() << ", the seq final result is " << tempResult << ", I use time "<< seqTimer.elapsed() << " seconds, \n\n " << std::endl; } else if (availableRank >= 1) // parallel algorithm with 2 cases T1 T2 { #ifdef PRINTFOUT fprintf(paraT1File, "In node %d , taskTotalNum = %d, availableRank = %d \n\n", world.rank(), taskTotalNum, availableRank ); fprintf(paraT1File,"========================================================= \n\n") ; #endif fprintf(paraT1File, " node | task# | OperA| OperB| Oper| result \n") ; // master sends out task by real value and collects results by pointer to avoid truncate error. if ( taskTotalNum <= availableRank) // T1 { // master sends a message about task assignments to each worker so that they know what tags they should use to recev and send task to mastr. // for T1, worker can use their rank ID to recev and send task. // for T2, worker needs to use the head message from master to decide their tags for recv and send with master. t1Flag = 1; t2Flag = 0; for (iRank = 1; iRank <= taskTotalNum; iRank++) { destRank = iRank; sourceRank = world.rank(); //taskTag = downStreamTaskTag; TaskPackage myTaskPackage(world); myTaskPackage.setEndRun(0); myTaskPackage.setTaskNum(iRank); myTaskPackage.setTaskTag(iRank); // avoid truncate error myTaskPackage.setTotalTaskNum(taskTotalNum); myTaskPackage.setMyRank(sourceRank); myTaskPackage.setMyGeneration(myGenerationNum); myTaskPackage.setTaskOperationType(PLUS); taskOperand = (iRank-1) * stepLength + 1; myTaskPackage.setOperandA(taskOperand); myTaskPackage.setOperandB(taskOperand+1); std::cout << "At generation " << myTaskPackage.getMyGeneration() << ", My rank is " << world.rank() << ", this is T1 task " << iRank << ", out of total tasks " << taskTotalNum << ", send OperandA is " << myTaskPackage.getOperandA() << "and OperandB is " << myTaskPackage.getOperandB() << " . \n\n" << std::endl; //Rha::BasicScheduler()<<(&task1, world, destRank, myTaskTag); resultSourceRank = destRank; myUpStreamTaskTag = myTaskPackage.getTaskTag(); // use myUpStreamTaskTag to recv a specified package recvReqsT1[iRank-1] = world.irecv(resultSourceRank, myUpStreamTaskTag, resultTaskPackageT1Pr[iRank-1]); Rha::BasicScheduler()<<(boost::bind(task5, destRank, sourceRank, downStreamTaskTag, world, myTaskPackage)); taskPerRank[iRank-1] = 1; // post recv to get the result for the above send ++taskCounterT1; #ifdef PRINTFOUT fprintf(paraT1File, " %3d %6d %6d %6d %6d \n ", iRank, iRank, myTaskPackage.getOperandA(), myTaskPackage.getOperandB() , myTaskPackage.getTaskOperationType() ) ; #endif } while(iRank <= availableRank) { taskPerRank[iRank-1]=0; ++iRank; } } else if (taskTotalNum > availableRank) //T2 { #ifdef PRINTFOUT fprintf(paraT2File, "In node %d , taskTotalNum = %d, availableRank = %d \n\n", world.rank(), taskTotalNum, availableRank ); fprintf(paraT2File,"========================================================= \n\n") ; fprintf(paraT2File, " node | task# | OperandA | OperandB | Operation | result \n") ; #endif t1Flag = 0; t2Flag = 1; //aveTaskNum = (int)(taskTotalNum/availableRank); leftTaskNum = (taskTotalNum % availableRank); aveTaskNum = (taskTotalNum - leftTaskNum)/availableRank; for (iRank = 0; iRank < availableRank ; iRank++) { destRank = iRank+1; taskPerRank[iRank] = aveTaskNum; beginIndex = iRank * aveTaskNum; taskArray[iRank]=0;// each rank's task num for (taski = iRank * aveTaskNum; taski < beginIndex + aveTaskNum; taski++) { sourceRank = world.rank(); //taskTag = downStreamTaskTag; TaskPackage myTaskPackage(world); myTaskPackage.setEndRun(0); myTaskPackage.setTotalTaskNum(taskTotalNum); myTaskPackage.setTaskNum(taski+1); // avoid truncate error myTaskPackage.setTaskTag(taski+1); myTaskPackage.setMyRank(sourceRank); myTaskPackage.setMyGeneration(myGenerationNum); myTaskPackage.setTaskOperationType(PLUS); taskOperand = (taski) * stepLength + 1; myTaskPackage.setOperandA(taskOperand); myTaskPackage.setOperandB(taskOperand+1); std::cout << "At generation " << myTaskPackage.getMyGeneration() << ", My rank is " << world.rank() << ", this is T2, the aveTaskNum is " << aveTaskNum << ", leftTaskNum is " << leftTaskNum << ", beginIndex is" << beginIndex << ", send OperandA is " << myTaskPackage.getOperandA() << ", and OperandB is " << myTaskPackage.getOperandB() << ". This is task "<< taski+1 << ", out of total tasks " << taskTotalNum << " , it is sent to " << destRank << " . \n\n" << std::endl; // master sends out a T2 package to worker nodes sendReqsT2[taskCounterT2] = world.isend(destRank, downStreamTaskTag, myTaskPackage); // master post a recv for a T2 task assigned with a distinct tag to get result from worker resultSourceRank = destRank; myUpStreamTaskTag = myTaskPackage.getTaskTag(); (resultTaskPackageT2Pr[iRank]).push_back(new TaskPackage); recvReqsT2[taskCounterT2] = world.irecv(resultSourceRank, myUpStreamTaskTag, *(resultTaskPackageT2Pr[iRank].back())); std::cout << "At generation " << myTaskPackage.getMyGeneration() << ", My rank is " << world.rank() << " , after posting isend, I post irecv, the sizeof resultTaskPackageT2Pr[" << iRank << "][" << taskCounterT3 << "] is " << sizeof(resultTaskPackageT2Pr[iRank][taskCounterT3]) << ", I need to get the task result with tag as "<< myUpStreamTaskTag << " from node " << resultSourceRank << " . \n\n" << std::endl; ++taskCounterT2;// task counter for type 2 with possible left tasks ++taskCounterT3; // record num of tasks assigned to each rank taskArray[iRank] = taskArray[iRank] + 1; #ifdef PRINTFOUT fprintf(paraT2File, " %3d %6d %6d %6d %6d %6d \n ", destRank, taski, myTaskPackage.getOperandA(), myTaskPackage.getOperandB() , myTaskPackage.getTaskOperationType(), myTaskPackage.getOperandA()+myTaskPackage.getOperandB() ) ; #endif } taskCounterT3 = 0; } std::cout << "I am rank " << world.rank() << ", I have sent " << taski << "tasks out of total tasks " << TOTALTASKNUM << "\n\n" << std::endl; startRank =1; // send left task from worker 1 until left ones are all send out. for (taski; taski < taskTotalNum; taski++) // send left tasks to nodes { destRank = startRank; sourceRank = world.rank(); //taskTag = downStreamTaskTag; TaskPackage myTaskPackage(world); myTaskPackage.setEndRun(0); myTaskPackage.setTotalTaskNum(taskTotalNum); myTaskPackage.setTaskNum(taski+1); // avoid truncate error myTaskPackage.setTaskTag(taski+1); myTaskPackage.setMyRank(sourceRank); myTaskPackage.setMyGeneration(myGenerationNum); myTaskPackage.setTaskOperationType(PLUS); taskOperand = (taski) * stepLength + 1; myTaskPackage.setOperandA(taskOperand); myTaskPackage.setOperandB(taskOperand+1); //std::cout << "At generation " << myTaskPackage.getMyGeneration() << ", My // master sends out a T2 package to worker nodes sendReqsT2[taskCounterT2] = world.isend(destRank, downStreamTaskTag, myTaskPackage); // post recv to get the result for the above send resultSourceRank = destRank; myUpStreamTaskTag = myTaskPackage.getTaskTag(); // master collect results from workers // post it here // allocate space for receiver (resultTaskPackageT2Pr[destRank-1]).push_back(new TaskPackage); recvReqsT2[taskCounterT2] = world.irecv(resultSourceRank, myUpStreamTaskTag, *(resultTaskPackageT2Pr[destRank-1].back())); std::cout << "At generation " << myTaskPackage.getMyGeneration() << ", My rank is " << world.rank() << ", I have left tasks to send, OperandA is " << myTaskPackage.getOperandA() << "and OperandB is " << myTaskPackage.getOperandB() << ". This is task "<< taski+1 << ", out of total tasks " << taskTotalNum << ", taskArray[startRank-1] is " << taskArray[startRank-1] << " taskCounterT2 is " << taskCounterT2 << ", this task is sent to destRank " << destRank << ". The sizeof resultTaskPackageT2[" << destRank-1 << "][" << taskArray[destRank-1] << "] is " << sizeof(resultTaskPackageT2Pr[destRank-1][taskArray[destRank-1]])<< ", I need to get the task result with tag as "<< myUpStreamTaskTag << "from node " << resultSourceRank << " . \n\n" << std::endl; taskArray[destRank-1] = taskArray[destRank-1] +1 ; ++startRank; ++taskCounterT2; #ifdef PRINTFOUT fprintf(paraT2File, " %3d %6d %6d %6d %6d %6d \n ", destRank, taski, myTaskPackage.getOperandA(), myTaskPackage.getOperandB() , myTaskPackage.getTaskOperationType(), myTaskPackage.getOperandA()+myTaskPackage.getOperandB() ) ; #endif } taskPerRank[destRank] = taskPerRank[destRank] + leftTaskNum; } std::cout << "I am rank " << world.rank() << ", I have sent out all tasks. The taskTotalNum is " << taskTotalNum << ", taskCounterT2 is " << taskCounterT2 << " . I am waiting for results from other nodes. \n\n" << std::endl; //mpi::wait_all(recvReqs, recvReqs+(taskTotalNum)); t2FinishStatusCounter = taskTotalNum; taskCounter = 1; // Master needs to get T1 task results from workers if (t1Flag == 1 && t2Flag == 0) { mpi::wait_all(sendReqsT1.begin(), sendReqsT1.end()); mpi::wait_all(recvReqsT1.begin(), recvReqsT1.end()); } else if (t1Flag == 0 && t2Flag == 1) // Master needs to get T2 task results from workers { mpi::wait_all(sendReqsT2.begin(), sendReqsT2.end()); } std::cout << "I am rank " << world.rank() << ", I have colleted results from all ranks for generation " << myGenerationNum << " with total tasks " << taskTotalNum << " .\n\n" << std::endl; // master integrate final results #ifdef PRINTFOUT fprintf(paraT1File, "In node %d , taskTotalNum = %d, availableRank = %d \n\n", world.rank(), taskTotalNum, availableRank ); fprintf(paraT1File,"========================================================= \n\n") ; fprintf(paraT1File, " node | task # | result | taskPerRank[iRank] | \n") ; fprintf(paraT2File, "In node %d , taskTotalNum = %d, availableRank = %d \n\n", world.rank(), taskTotalNum, availableRank ); fprintf(paraT2File,"========================================================= \n\n") ; fprintf(paraT2File, " node | task # | result | taskPerRank[iRank] | \n") ; #endif tempResult1 =0 ; tempResult2 =0 ; // master print out resuls collected from workers // master collects T1 results from workers if(t1Flag == 1 && t2Flag == 0) // master get T1 task results { #ifdef PRINTFOUT fprintf(paraT1File, " this is results collected from workers for T1. \n\n"); #endif // master get T1 task result from the pointers recevied from workers for (t1TaskResultItr_i = (resultTaskPackageT1Pr.begin()); t1TaskResultItr_i != (resultTaskPackageT1Pr).end(); t1TaskResultItr_i++) { //tempResult1 = tempResult1 + resultTaskPackageT1[eachRank-1].getResult() ; tempResult1 = tempResult1 + (**t1TaskResultItr_i).getResult() ; #ifdef PRINTFOUT fprintf(paraT1File, " %3d %6d %6d %6d \n ", iRank+1, taskIndexT1, tempResult1, taskPerRank[iRank] ) ; #endif std::cout << "At generation " << myGenerationNum << ", T1 task, " << ", I am rank " << world.rank() << ", I have got results from node " << (**t1TaskResultItr_i).getSourceRank() << ", its result value is " << (**t1TaskResultItr_i).getResult() << ". This is task "<< (**t1TaskResultItr_i).getTaskNum() << " out of total task " << (**t1TaskResultItr_i).getTotalTaskNum() <<", the current accumulated result tempResult2 is " << tempResult2 << ", its T1 task tag is " << (**t1TaskResultItr_i).getTaskTag() << " .\n\n" << std::endl; } } else if(t1Flag == 0 && t2Flag == 1) // master get T2 task results { #ifdef PRINTFOUT fprintf(paraT2File, " this is results collected from workers for T2. \n\n"); #endif // master get T2 task result from the pointers recevied from workers // outer iterator index row of the T1 result vector for (t2TaskResultItr_ii = ((resultTaskPackageT2Pr.begin())); t2TaskResultItr_ii != (resultTaskPackageT2Pr.end()); t2TaskResultItr_ii++) { for (t1TaskResultItr_i = (*t2TaskResultItr_ii).begin(); t1TaskResultItr_i != (*t2TaskResultItr_ii).end(); t1TaskResultItr_i++) { // inner iterator index colmun of the T2 result vector tempResult2 = tempResult2 + (**t1TaskResultItr_i).getResult() ; #ifdef PRINTFOUT fprintf(paraT2File, " %3d %6d %6d %6d \n ", iRank+1, taskIndexT1, tempResult2, taskPerRank[iRank] ) ; #endif std::cout << "At generation " << myGenerationNum << ", T2 task, " << ", I am rank " << world.rank() << ", I have got results from node " << (**t1TaskResultItr_i).getSourceRank() << ", its result value is " << (**t1TaskResultItr_i).getResult() << ". This is task "<< (**t1TaskResultItr_i).getTaskNum() << " out of total task " << (**t1TaskResultItr_i).getTotalTaskNum() <<", the current accumulated result tempResult2 is " << tempResult2 << ", its T2 task tag is " << (**t1TaskResultItr_i).getTaskTag() << " .\n\n" << std::endl; } } } } taskCounter =0 ; t1Flag == 0; t2Flag == 0; ++myGenerationNum; } // seq time #ifdef PRINTFOUT fprintf(pFile, "I am rank is %d, the seq final result is %d. I use time %d seconds \n\n ", world.rank(), tempResult, seqTimer.elapsed()); #endif myTaskPackageM.setEndRun(1); std::cout << "-------------------------------------------------------------- \n " << std::endl; // master node sends out ending task signal for (iRank = 1; iRank <= availableRank ; iRank++) { destRank = iRank; sourceRank = world.rank(); //taskTag = downStreamTaskTag; myTaskPackageM.setMyRank(sourceRank); //Rha::BasicScheduler()<<(&task1, world, destRank, myTaskTag); //Rha::BasicScheduler()<<(&task5); Rha::BasicScheduler()<<(boost::bind(task5, destRank, sourceRank, downStreamTaskTag, world, myTaskPackageM)); } // print out timing results if (taskTotalNum <= availableRank) { std::cout << "I am rank " << world.rank() << ", T1 taskTotalNum = " << taskTotalNum << ", availableRank = " << availableRank << ", I use total parallel time " << paraTimerT1.elapsed() << ", the SLEEPTIME is "<< SLEEPTIME << ", the tempResult1 is "<< tempResult1 << ", the leftTaskNum is " << leftTaskNum << ", the aveTaskNum is " << aveTaskNum << ". \n\n" << std::endl; } if (taskTotalNum > availableRank) { std::cout << "I am rank " << world.rank() << ", T2 taskTotalNum = " << taskTotalNum << ", availableRank = " << availableRank << ", I use total parallel time " << paraTimerT2.elapsed() << ", the SLEEPTIME is "<< SLEEPTIME << ", the tempResult2 is "<< tempResult2 << ", the leftTaskNum is " << leftTaskNum << ", the aveTaskNum is " << aveTaskNum << std::endl; } #ifdef PRINTFOUT fprintf(paraT1File, "In node %d , T1 taskTotalNum = %d, availableRank = %d, master use time %d , the SLEEPTIME is %d . \n\n", world.rank(), taskTotalNum, availableRank, paraTimerT1.elapsed(), SLEEPTIME ); fprintf(paraT2File, "In node %d , T2 taskTotalNum = %d, availableRank = %d, master use time %d, the SLEEPTIME is %d . \n\n", world.rank(), taskTotalNum, availableRank, paraTimerT2.elapsed(), SLEEPTIME ); #endif fclose(pFile); fclose(paraFile); fclose(paraT2File); fclose(paraT1File); return 0; } else // worker nodes recv task from manager node and then run it , after that return the result to manager { { // (const Task& taskRecv, mpi::communicator world.rank, int myTaskTag) int rank, taskTag, sourceRank, managerRank =0, destRank = 0, resultTask, workerUpStreamTaskTag; //mpi::request recvReqsW; TaskPackage resultTaskPackageW(world); mpi::request sendReqsW[1]; mpi::timer workerTimer; TaskSleep paraTasksleep; TaskPackage* resultTaskPackageWPr ; // worker node receives task from managwer node while (1) { world.recv(managerRank, downStreamTaskTag, resultTaskPackageW); mpi::timer workerTimer; if (resultTaskPackageW.getEndRun() == ISFALSE) { paraTasksleep.setSleepTime(SLEEPTIME); paraTasksleep.doSleep(); std::cout << "I am rank " << world.rank() << ", I recived task from node " << managerRank << resultTaskPackageW.getMyRank() << ", the EndRun = " << resultTaskPackageW.getEndRun() << ", the recved task generation is "<< resultTaskPackageW.getMyGeneration() << ". This is the task " << resultTaskPackageW.getTaskNum() << " out of total task " << resultTaskPackageW.getTotalTaskNum() << ". \n\n" << std::endl; std::cout << "Now, I run my task \n" << std::endl; std::cout << resultTaskPackageW; //if (taskTypeNumber == ) // perform task operations according to the taskpackage if (resultTaskPackageW.getTaskOperationType() == PLUS) { resultTaskPackageW.setResult(resultTaskPackageW.getOperandA() + resultTaskPackageW.getOperandB()); std::cout << "my rank is " << world.rank() << "my receved OperandA is " << resultTaskPackageW.getOperandA() << " and OperandB is " << resultTaskPackageW.getOperandB() << ", their operation result is " << resultTaskPackageW.getResult() << ". This is the task " << resultTaskPackageW.getTaskNum() << "out of total task " << resultTaskPackageW.getTotalTaskNum() << " .\n\n" << std::endl; } else if(resultTaskPackageW.getTaskOperationType() == MINUS) { resultTaskPackageW.setResult(resultTaskPackageW.getOperandA() - resultTaskPackageW.getOperandB()); } // return result to manager node //taskTag = upStreamTaskTag; sourceRank = world.rank(); workerUpStreamTaskTag = resultTaskPackageW.getTaskTag(); world.send(destRank, upStreamTaskTag, resultTaskPackageW); // post recv to get the result for the std::cout << "my rank is " << world.rank() << ", after sending a result to master for generation " << resultTaskPackageW.getMyGeneration() << ". This is the task " << resultTaskPackageW.getTaskNum() << " out of total task " << resultTaskPackageW.getTotalTaskNum() << ", the reuslt is " << resultTaskPackageW.getResult() << ", I use time %d seconds " << workerTimer.elapsed() << ", the sizeof resultTaskPackageW is " << sizeof(resultTaskPackageW) << ", the sizeof resultTaskPackageWPr is "<< sizeof(resultTaskPackageWPr) << ", the workerUpStreamTaskTag is " << workerUpStreamTaskTag << " . \n\n" << std::endl; std::cout << "---------------------------------------------------------" << std::endl; } else if (resultTaskPackageW.getEndRun() == ISTRUE) // if end taks is true, donot wait for new tasks { std::cout << "I am rank " << world.rank() << ", I recived task from node " << resultTaskPackageW.getMyRank() << ". I am told that all generations are done. So, I do not wait for new tasks. \n\n" << std::endl; break; } else { std::cout << " I am rank " << world.rank() << ", ther is error, do not wait. \n\n " << std::endl; break; } } return 0; } } // EOF