Boost logo

Boost Users :

Subject: [Boost-users] [asio] suitable chunked circular buffer implementation?
From: Steve Lorimer (steve.lorimer_at_[hidden])
Date: 2010-07-20 12:29:48


Hi folks

I'm trying (in vain so far) to find a suitable circular_buffer that allows
chunked reads/writes which I can use in boost asio.

This is my use case:
I have 2 threads, Thread 1 reads data from a
socket (boost::asio::tp::tcp::socket) and writes it into a buffer
(boost::asio::mutable_buffer)
Thread 2 reads the buffer and parses out individual messages.
Thread 2 shouldn't care about what Thread 1 is doing, specifically whether
or not it has completed (or is in the process of) more writes than what
Thread 2 has read.
All Thread 2 should receive is a small packet of information (the interface
being perhaps only iterator begin() and size_t size() ) relating to the
chunk of data that was written into the buffer. Reading from the iterator
should be safe (ie: it should wrap when it reaches the end of the buffer)
Writing into the buffer shouldn't invalidate the iterators that were created
previously (which were passed to the 2nd thread)

I have an implementation from before I started using boost::asio, which is
below. It's quite clearly not standard compliant, but it worked for me.

Is there anything out there that works like I've described?
Is there an alternative method which I should pursue rather than "chunked
circular buffers"?
Will I be able to create something like I've pasted below successfully to
work with boost asio, or are there issues I'm unaware of? (The reason I ask
this question is because I find it surprising that something like this
doesn't exist already)

Any and all help greatly appreciated.

TIA
Steve

#ifndef _ring_buf_h_
#define _ring_buf_h_

#include <errno.h>
#include <boost/thread/mutex.hpp>

namespace ring {

typedef unsigned char byte;
static const uint32_t RING_SIZE = 2 * 1024 * 1024; // 2 meg

class iterator
{
private:
    const byte* buf;
    int cur;
    size_t buf_size;

public:
    iterator() : buf(), cur(), buf_size() {}
    iterator(const byte* _buf, int _cur, size_t _buf_size) : buf(_buf),
cur(_cur), buf_size(_buf_size) {}

    const byte& operator*() const { return buf[cur % RING_SIZE]; }
    const byte& operator[](int n) const { return buf[(cur + n) % RING_SIZE];
}

    iterator& operator+=(int n) { cur += n; buf_size -= n; return *this; }
    iterator& operator++() { ++cur; --buf_size; return *this; }
             // post-inc
    iterator operator++(int) { iterator tmp(*this); ++cur; --buf_size;
return tmp; } // pre-inc

    bool operator==(iterator b) const { return cur == b.cur; }
    bool operator!=(iterator b) const { return cur != b.cur; }

    size_t size() const { return buf_size; }
    int offset() const { return cur; }
};
//----------------------------------------------------------------

inline iterator operator+(iterator a, int b) { return a += b; }
inline iterator operator+(int b, iterator a) { return a += b; }

class buffer
{
    mutable boost::mutex mtx;

    byte buf[RING_SIZE];
    unsigned beg;
    unsigned end; // may wrap, wrapping is blessed for unsigned types only
public:
    buffer() : beg(), end()
    {
        memset(buf, 0, RING_SIZE);
    }
    // create an iterator object whose size = unread data in buf
    iterator data() const
    {
        boost::mutex::scoped_lock l(mtx);
        return iterator(buf, beg % RING_SIZE, end - beg);
    }
    // free space in ring
    unsigned size() const
    {
        boost::mutex::scoped_lock l(mtx);
        return end - beg;
    }
    // frees up space in the ring (called once data has been read)
    void adv(unsigned n)
    {
        boost::mutex::scoped_lock l(mtx);
        beg += n;
    }
    // read data from a file descriptor into the buffer
    // returns:
    // -errno for error
    // 0 for EOF
    // > 0 for successful reads
    int read(int fd)
    {
        mtx.lock();
        unsigned free = RING_SIZE - (end - beg);
        if (!free)
        {
            mtx.unlock();
            return -ENOMEM; // must never get here
        }

        unsigned free_end = std::min(free, RING_SIZE - end % RING_SIZE);
        unsigned free_beg = free - free_end;
        iovec io[] = { { (char*)buf + end % RING_SIZE, free_end }, {
(char*)buf, free_beg } };

        mtx.unlock();

        ssize_t n = readv(fd, io, sizeof io / sizeof *io);
        if (n == -1)
            return -errno;

        mtx.lock();
        end += n;

        mtx.unlock();
        return n;
    }
};
//----------------------------------------------------------------

} // namespace ring

#endif



Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net