|
Boost : |
Subject: [boost] Interested in bounded buffer class for threaded producer/consumer communication?
From: André Offringa (offringa_at_[hidden])
Date: 2011-12-12 18:47:13
Hi all,
I seem to have come across a producer/consumer situation a few times
now, and everytime I do, I seem to want a slightly different
functionality for a bounded buffer-kind of class that is not generally
provided. I am looking for such a class that is also efficient for large
amounts of data, and so I wonder if there does already exists a class in
boost to efficiently push work between producers and consumers when they
are running in different threads, i.e., that is thread safe. Asio seems
to have a lot of communicating mechanisms, but I did not yet come across
such a mechanism (pipe/channel/lane/unidirectional stream/bounded
buffer... I'll call it a 'lane' for now) between two threads.
If it does not exist yet in boost, I wonder if there is a general
interest for such a class.
The situation, is that
- one or more threads produce data 'packages' of some type T, until the
buffer is full.
- one or more other threads have to wait for the availability of these
packages, and once available, get one and process them.
Googling around there are numerous suggestions (e.g. blog entries) for
an implementation of thread safe queues that synchronize writing/reading
and wake readers when data is available. However, I think there are
fewer implementations that both use a circular buffer and will also
block the producer when the buffer is 'full', and even less that are
efficient for large amounts of data. Typical examples for this situation
occur in signal processing. For example, it could be that one thread
reads packets of work from the hard disk or some other device, write it
to the lane-object, while one or a few different other threads try to
read packets from the lane-object for processing. In a particular case,
I had a consumer which needed a certain amount of packets at a time. The
lane holds a (circular) buffer of a given size, and if it is full,
producing threads will be blocked. Consuming threads will be blocked if
the buffer is empty. There is a bounded_buffer example given to
implement such a thing with the circular buffer. It does exactly what I
suggest, but I assume it is not so efficient for pushing large amounts
of data over, as each individual object requires a push, thus a lock if
implemented as in bounded_buffer, and involves unnecessary updates of
the counters (involving a decrement() each time) on each push, which is
not necessary if you know you are pushing a number of elements at once.
I assume this could be fixed by wrapping the insert(range) method
instead of the individual push functions.
What I found most useful however, is when the 'main' interface of such a
class consists of these methods:
template<typename T> class lane {
public:
lane(size_t capacity);
void write(const T& element); // (or push if you wish)
void write(const T* elements, size_t n); // or as a data range
void write_end();
bool read(const T& element); // false if write_end() has been called and
empty()
size_t read(const T* elements, size_t n); // returns n, unless
write_end() has been called and is empty()
size(), empty(), etc.
}
..and that's basically how I implemented it right now.
The lane is just like the circular buffer in boost, and owns the
elements, and thus copies the data once on write to its own buffer and
once on read. The write_end() method will change the state of the lane,
such that blocked reading threads will no longer be blocked, i.e., it
assumes the producer(s) is finished and the consumer should finish as
well after the last data has been read. No element ever gets overwritten
before being read. In my case I never needed it to be an actually "real"
container, thus valid iterators and begin() and end()'s are not necessary.
With such a class, setting up an efficient multi-threaded
consumer/producer scenario, that can be easily end by either the
producer or yet another separate (e.g., user interface) thread, is
really easy. As said, especially in signal processing application (why
it is somewhat low-leveler), this kind of class is very nice to have.
I guess there are many flavours of such a 'lane', e.g. with deque's or
circular_buffers, with or without owning data, with or without bounding
the producer and with or without providing efficient large data pushers.
In my case, the above interface seemed to have been the solution a few
times, but I don't know how common it really is. So, I'm curious if this
problem frequently arises, if it's solved already and/or if there would
be interest in such a class.
Regards,
André Offringa
Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk