Boost logo

Boost Users :

From: Torsten Mohr (tmohr_at_[hidden])
Date: 2007-10-07 05:18:02


Hi,

i wrote a small test project that makes use of boost::thread
to split jobs into parts that are delegated to threads.

A class "Job" contains a state machine (IDLE, EXECUTING, ABORTED, ...).
That class can be derived from and methods "execute" and "report"
can be overwritten.

A class JobProvider can be derived from to feed new data to a Job.

A Template "Foreman" can then start several threads and delegate
Jobs that get their data from "JobProvider" to the Jobs and they
report when they are done.

In the example, MyJobProvider provides numbers from 3 to 100.

MyJob::execute calculates if these numbers are prime.
MyJob::report prints out the numbers that are prime.

All the methods "report" are protected, only one of them at a time
is ever called, methods "execute" are run in parallel.

The performance is poor, can anybody give me a hint why?

Best regards,
Torsten.

######## job.h:
#ifndef JOB_H
#define JOB_H 1

#include <boost/thread.hpp>

extern boost::mutex io_mutex;

#define QWE { boost::mutex::scoped_lock lck(io_mutex); printf("file %s line
%i\n", __FILE__, __LINE__); }

class Job {
  boost::mutex job_mutex;
  boost::mutex cond_mutex;

  boost::mutex* m_report_mutex;
  boost::condition* m_condition;

  boost::thread* thr;

  bool m_stopped;

  int m_state;

  void setState(int state);

 protected:
  bool is_stopped(void);

 public:

  enum {
    CMD_IDLE = 212,
    CMD_EXEC,
    CMD_KILL
  };

  enum {
    STATE_IDLE = 37,
    STATE_EXEC_PREPARE,
    STATE_EXEC,
    STATE_EXEC_ABORTED,
    STATE_REPORT_PREPARE,
    STATE_REPORT,
    STATE_DEAD
  };

  void set_foreman_data(boost::mutex* report_mutex, boost::condition*
condition);
  void init(void);
  virtual void execute_handler(void);
  void report_handler(void);
  void fork(void);

  int getState(void);
  void doCommand(int command);

  virtual void execute(void);
  virtual void report(void);
};

class JobProvider {
 public:
  bool provide(Job& j);

};

#endif

######## job.cc:
#include <boost/thread.hpp>
#include <boost/bind.hpp>

#include "job.h"

int Job::getState(void)
{
  boost::mutex::scoped_lock lock(job_mutex);

  return m_state;
}

void Job::setState(int state)
{
  boost::mutex::scoped_lock lock(job_mutex);

  m_state = state;
}

void Job::doCommand(int cmd)
{
  boost::mutex::scoped_lock lock(job_mutex);

  // sparse transitions
  if((m_state == STATE_EXEC || m_state == STATE_EXEC_PREPARE) && cmd ==
CMD_KILL)
    {
      m_state = STATE_EXEC_ABORTED;
      m_stopped = true;
    }
  else if(m_state == STATE_IDLE && cmd == CMD_EXEC)
    {
      m_state = STATE_EXEC_PREPARE;
    }
  else if(m_state == STATE_IDLE && cmd == CMD_KILL)
    {
      m_state = STATE_DEAD;
      m_stopped = true;
    }

}

bool Job::is_stopped(void)
{
  boost::mutex::scoped_lock lock(job_mutex);

  return m_stopped;
}

void Job::execute_handler(void)
{
  int state;

  do {
    state = getState();

    switch(state)
      {
      case STATE_EXEC_PREPARE:
        execute();

        state = getState();
        if(state != STATE_EXEC_ABORTED)
        {
          boost::mutex::scoped_lock lock(*m_report_mutex);

          report();
          m_condition->notify_one();

          setState(STATE_IDLE);
        }
        break;

      case STATE_DEAD:
      case STATE_EXEC_ABORTED:
        return;
      }

  } while(1);
}

void Job::init(void)
{
  m_state = STATE_IDLE;
}

void Job::fork(void)
{
  thr = new boost::thread(boost::bind(execute_handler, this));
}

void Job::report_handler(void)
{
  boost::mutex::scoped_lock lock(*m_report_mutex);

  report();
}

void Job::set_foreman_data(boost::mutex* report_mutex, boost::condition*
condition)
{
  m_report_mutex = report_mutex;
  m_condition = condition;
}

void Job::execute(void)
{
}

void Job::report(void)
{
}

######## foreman.h:
#ifndef FOREMAN_H
#define FOREMAN_H 1

#include <boost/thread.hpp>
#include <boost/bind.hpp>

#include "job.h"

template<class JOBB, class JOBB_PROVIDER> class Foreman {
  JOBB* jb;
  JOBB_PROVIDER* m_provider;
  int m_threads;
  bool m_finished;

  boost::mutex report_mutex;

  boost::mutex m_cond_mutex;
  boost::condition m_condition;

  boost::thread* me;

  void handler(void);

 public:
  Foreman(int threads, JOBB_PROVIDER* provider);
  ~Foreman();

  void spawn(void);
  void cleanup(void);
  void abort(void);
  bool is_finished(void);
};

template<class JOBB, class JOBB_PROVIDER>
  Foreman<JOBB, JOBB_PROVIDER>::Foreman(int threads, JOBB_PROVIDER* provider)
  : m_threads(threads), m_provider(provider)
{
  jb = new JOBB[m_threads];

}

template<class JOBB, class JOBB_PROVIDER>
Foreman<JOBB, JOBB_PROVIDER>::~Foreman()
{
  delete[] jb;
}

template<class JOBB, class JOBB_PROVIDER>
void Foreman<JOBB, JOBB_PROVIDER>::handler(void)
{
  bool all_done;

  do
    {
      boost::mutex::scoped_lock lock(m_cond_mutex);

      all_done = true;
      for(int i = 0; i < m_threads; ++i)
        {
          int state = jb[i].getState();

          if(state == Job::STATE_IDLE)
            {
              if(m_provider->provide(jb[i]))
                {
                  jb[i].doCommand(Job::CMD_EXEC);
                  all_done = false;
                }
              else
                {
                  m_finished = true;
                }
            }
          else
            {
              all_done = false;
            }

        }

      if(m_finished || all_done)
        {
          return;
        }

      m_condition.wait(lock);

    } while(1); // (!m_finished || !all_done);
}

template<class JOBB, class JOBB_PROVIDER>
void Foreman<JOBB, JOBB_PROVIDER>::spawn(void)
{
  for(int i = 0; i < m_threads; ++i)
    {
      jb[i].set_foreman_data(&report_mutex, &m_condition);
      jb[i].init();
      jb[i].fork();
    }

  m_finished = false;
  // me = new boost::thread(boost::bind(handler, this));
  handler();
}

template<class JOBB, class JOBB_PROVIDER>
void Foreman<JOBB, JOBB_PROVIDER>::cleanup(void)
{
  // me->join();
}

template<class JOBB, class JOBB_PROVIDER>
void Foreman<JOBB, JOBB_PROVIDER>::abort(void)
{

}

template<class JOBB, class JOBB_PROVIDER>
bool Foreman<JOBB, JOBB_PROVIDER>::is_finished(void)
{
  return m_finished;
}

#endif

######## my_job.h:
#ifndef MY_JOB_H
#define MY_JOB_H 1

#include "job.h"

class MyJob : public Job
{
  int m_suspect;
  bool is_prime;

 public:
  MyJob();

  virtual void execute(void);
  virtual void report(void);

  void setJobStuff(int suspect);
};

class MyJobProvider : public JobProvider
{
  int m_actual;

 public:
  MyJobProvider();

  bool provide(MyJob& j);
};

#endif

######## my_job.cc:
#include <iostream>
#include <math.h>

#include "my_job.h"
#include "job.h"

MyJob::MyJob()
{
  m_suspect = 0;
}

void MyJob::setJobStuff(int suspect)
{
  m_suspect = suspect;
}

void MyJob::execute(void)
{
  int root = (int)sqrt((double)m_suspect);

  for(int j = 2; j <= root; j++)
    {
      if(is_stopped())
        {
          return;
        }

      if((m_suspect % j) == 0)
        {
          is_prime = false;
          return;
        }
    }

  is_prime = true;
}

void MyJob::report(void)
{
  if(is_prime)
    {
      std::cout << m_suspect << " is prime" << std::endl;
    }
}

MyJobProvider::MyJobProvider() : m_actual(3) {}

#define NN 100

bool MyJobProvider::provide(MyJob& j)
{
  j.setJobStuff(m_actual);
  m_actual++;

  if(m_actual > NN) return false;

  return true;
}

######## qwe.cc:
#include <iostream>

#include "job.h"
#include "my_job.h"

#include "foreman.h"

boost::mutex io_mutex;

void test1(void)
{
  MyJobProvider p;
  MyJob j;

  while(p.provide(j)) {
    j.execute();
    j.report();
  }

}

void test2(void)
{
  MyJobProvider p;
  Foreman<MyJob, MyJobProvider> fm(7, &p);

  fm.spawn();

  do {
  } while(!fm.is_finished());

  fm.cleanup();
}

int main(int argc, char** argv) {
  test2();

  return 0;
}


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net