Boost logo

Boost :

From: Arash Partow (arash_at_[hidden])
Date: 2004-05-22 05:20:07


Hi all,

I was wondering if there would be any interest in a template
library based around producers and consumers.

I've developed something already and have a series of working examples,
I propose a library that consist of 3 basic entities, a producer a
consumer and a hybrid entity called an interchange.

the entities themselves will be threaded and will be linked together
via a linking mechanism that is itself a base template but which can
be extended to provide data transfer between same system processes and
threads and also extended versions which provide data transfer over
pipes, sockets, serial lines etc..

the idea of the producer-consumer is very simple the producer produces
a type of data ie: string, a class, a struct etc.. and a consumer that
is instantiated as a consumer of the type will consume the data
provided by the producer and do something with it.

The interchange entity is a transformation entity, it acts both as a
consumer and as a producer, it consumes from a producer or another
interchange, and then invokes the user implemented transmute method
to convert/munge/process the data from one type to another then sends it
to its consumer.

Simple class syntax:

template <
           typename OutputType
           // class IdleProductionPolicy = Policy class for implementing producer's
           //behavior during idle production periods.
>
class Producer : public Thread
{
};

template <typename InputType>
class Consumer : public Thread
{
};

template <
            typename InputType,
            typename OutputType
            // template < typename > class InternalProducer possible future policy
>
class Interchange : public Thread
{
};

The link mechanism is a basically a type that supports the common type
between a producer and its consumer, it is a one way flow of data, the
consumer is signaled when data is ready for it to collect if its
asleep it will awake and fetch the data if its doing something else
when it comes back to go and wait for the condition from the producer
to be signaled it will instantly be awaken and go off and fetch the
data off the link. the data on the link or more precisely in the
container which resides in the link is made thread safe via mutexes
around the add and get methods.

A link has a basic get and add functionality but depending what entity
is being implemented they will only see it via an interface and hence
only be able to invoke 1 kind of method, a producer can only invoke
"add" where-as a consumer can only invoke a get, an interchange can
invoke both because it is instantiated with both a consumer and
producer link interface, but the links are separate for an
interchange, because it requires 2 links, one link for the incoming
data and one link for the out going data. both link maybes of different
types, ie:

Producer <string> ----> Interchange <string, AClass> ----> Consumer <AClass>

I need some opinions, about whether or not people will find this
library useful and hence have it as part of Boost, if so could people
give me their opinions about my design, its just a simple first trial
run at designing it, i intended for the producers and interchange
entities to be policy driven as far as production of data is
concerned.

I think this library can be used in distributed computing, in network
packet assembly, and also for interfacing and passing data up and down
threaded software layers.

Any advice, opinions would be very much appreciated.

Kind regards

Arash Partow

__________________________________________________
Be one who knows what they don't know,
Instead of being one who knows not what they don't know,
Thinking they know everything about all things.
http://www.partow.net

A more detailed class syntax and usage is here:

template <
           typename OutputType
           // class IdleProductionPolicy = Policy class for implementing producer's
           //behavior during idle production periods.
>
class Producer : public Thread
{

    private:

      typedef vector < OutputType > TypeVector;
      typedef ProducerLink < OutputType > TypeLink;

    public:

      Producer(TypeLink* _link,
               unsigned int _minpendbufsize = MIN_PEND_BUF_SIZE,
               unsigned int _maxpendbufsize = MAX_PEND_BUF_SIZE,
               unsigned int _id = 0)
      {
      };

     ~Producer(){};

      unsigned int getMinPenBufferSize() { return minpendbufsize; };
      unsigned int getMaxPenBufferSize() { return maxpendbufsize; };

      void setMinPenBufferSize(int _minpenbufsize = MIN_PEND_BUF_SIZE) { minpendbufsize = _minpenbufsize; };
      void setMaxPenBufferSize(int _maxpenbufsize = MAX_PEND_BUF_SIZE) { maxpendbufsize = _maxpenbufsize; };

      virtual void setActiveMode(bool state = true)
      {
      };

    protected:

      void add(OutputType t) {};
      void add(const TypeVector& tlist) {};
      virtual bool manufacture()=0;

    private:

      void execute() {};

};

template <typename InputType>
class Consumer : public Thread
{

    private:

      typedef vector < InputType > TypeVector;
      typedef ConsumerLink < InputType > TypeLink;

    public:

     Consumer(TypeLink* _link){};

    ~Consumer(){};

    protected:

      void get(TypeVector& tlist) { };
      void setLink(TypeLink& _link) { };

      virtual bool consume()=0;

    private:

      void execute(){};

};

template <
            typename InputType,
            typename OutputType
            // template < typename > class InternalProducer possible future policy
>
class Interchange : public Thread
{

    public:

      typedef vector < InputType > ConsDataList;
      typedef vector < OutputType > ProdDataList;

      typedef ConsumerLink < InputType > ConsLink;
      typedef ProducerLink < OutputType > ProdLink;

    public:

      Interchange(
                  ConsLink* _consLink, // Link from the producer entity
                  ProdLink* _prodLink, // Link to the consumer entity
                  unsigned int _minpendbufsize = MIN_PEND_BUF_SIZE,
                  unsigned int _maxpendbufsize = MAX_PEND_BUF_SIZE,
                  unsigned int _id = 0 // for debugging purposes
                 )
      {

      };

     ~Interchange() {};

      unsigned int getMinPenBufferSize() {};
      unsigned int getMaxPenBufferSize() {};

      void setMinPenBufferSize(int _minpenbufsize = MIN_PEND_BUF_SIZE) { };
      void setMaxPenBufferSize(int _maxpenbufsize = MAX_PEND_BUF_SIZE) { };

      void setActiveMode(bool state = true){};

    protected:

      void add(OutputType t) { prodLink->add(t); };
      void add(const ProdDataList& tlist) { prodLink->add(tlist); };
      void get(ConsDataList& tlist) { consLink->get(tlist); };
      void setConsumerLink(ConsLink& _link) { consLink = _link; };
      void setProducerLink(ProdLink& _link) { prodLink = _link; };

      virtual bool transmute()=0;

    private:

      void execute(){};

};

Usage would be as follows:

class AClass
{
    public:

      AClass(string str):val(str){}
     ~AClass(){};

      void displa(){cout << val << endl;};

    private:
      string val;
};

class MyProducer : public Producer<string>
{

    public:

     typedef ProducerLink < string > LinkType;

     MyProducer(LinkType* _link, int minpenbs, int maxpenpbs):Producer<string>(_link, minpenbs, maxpenpbs){};

     bool manufacture()
     {

        add("abc");
        return true;

     };

};

class MyInterchange : public Interchange<string,AClass>
{

    public:

     typedef ProducerLink < string > ProdLinkType;
     typedef ConsumerLink < string > ConsLinkType;

     MyInterchange(
                   ConsLinkType* _conslink,
                   ProdLinkType* _prodlink,
                   unsigned int minpenbs,
                   unsigned int maxpenbs
                  ):Interchange<string,string>(
                                               _conslink,
                                               _prodlink,
                                               minpenbs,
                                               maxpenbs,
                                              ){};

     bool transmute()
     {

        vector <string> slist;

        get(slist);

        for(unsigned int i = 0; i < slist.size(); i++)
        {
           add(AClass(slist[i]));
        }

        add(slist);

        return true;

     };

    private:

};

class MyConsumer: public Consumer<AClass>
{

    public:

     typedef ConsumerLink < string > LinkType;

     MyConsumer(LinkType* _link, int mpID):Consumer<string>(_link){};

     bool consume()
     {

        vector <AClass> alist;

        get(alist);

        for(unsigned int i = 0; i < slist.size(); i++)
        {
           alist[i].display();
        }

        return true;

     };

};

int main ()
{

    Link < string > link1;
    Link < AClass > link2;

    MyProducer myproducer = MyProducer(new ProducerLink <string>(&link1),10,50);
    MyInterchange myinterchange = MyInterchange(new ConsumerLink <string>(&link1),new ProducerLink <AClass>(&link2),1,10);
    MyConsumer myconsumer = MyConsumer(new ConsumerLink <AClass>(&link2));

    myproducer.start();
    myinterchange.start();
    myconsumer.start();

    while(true)
    {
     .
     .
     .
    }

   return true;

}


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk