Boost logo

Boost Users :

Subject: Re: [Boost-users] boost::asio::io_service and posix message queues
From: Sergio Martinez (sergiom_at_[hidden])
Date: 2009-08-28 17:34:03


I didn't get a response to my email, but I ended up figuring out how
to use POSIX message queues with the boost::asio::io_service. Under
Linux you use select/epoll with message queue descriptors. I ended up
using boost::asio::posix::stream_descriptor to interact with message
queues. You can specify boost::asio::null_buffers() for the
async_read_some() and async_write_some() calls. These usually do
some reading or writing, but with null_buffers() they do not do any
reading or writing. You get a callbacks when the select/epoll
indicates that a message queue can be written to or read from. In the
callbacks, you can do the actual mq_send() and mq_receive() calls.

I've written some test code and included it below. There is one
message queue, a reader thread, and a writer thread. The writer
thread uses a timer to send a message through the message queue every
100 ms. The reader thread wakes up immediately to process the
messages.

-- Sergio

#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
using namespace std;

const std::string MSG_QUEUE_NAME = "/test_msg_queue";

struct message
{
    unsigned char buffer[128];
};

class Writer
{
public:
    Writer(boost::asio::io_service& ioService)
        : ioService(ioService),
          streamDescriptor(ioService),
          timer(ioService)
    {
        struct mq_attr mattr;
        mattr.mq_maxmsg = 10;
        mattr.mq_msgsize = sizeof(struct message);
        mqid = mq_open(MSG_QUEUE_NAME.c_str(), O_CREAT | O_WRONLY |
O_NONBLOCK, S_IREAD | S_IWRITE, &mattr);
        cout << "writer mqid = " << mqid << endl;
        streamDescriptor.assign(mqid);

        streamDescriptor.async_write_some(
            boost::asio::null_buffers(),
            boost::bind(&Writer::handleWrite,
              this,
              boost::asio::placeholders::error));
    }

    void handleWrite(boost::system::error_code ec)
    {
        timer.expires_from_now(boost::posix_time::microseconds(100));
        timer.async_wait(boost::bind(&Writer::handleTimer, this,
                         boost::asio::placeholders::error));
    }

    void handleTimer(boost::system::error_code& ec)
    {
        message msg;
        int sendRet = mq_send(mqid, (const char*)&msg, sizeof(message), 0);
        cout << "sendRet = " << sendRet << endl;

        streamDescriptor.async_write_some(
            boost::asio::null_buffers(),
            boost::bind(&Writer::handleWrite,
              this,
              boost::asio::placeholders::error));
    }

private:
    boost::asio::io_service& ioService;
    boost::asio::posix::stream_descriptor streamDescriptor;
    boost::asio::deadline_timer timer;
    mqd_t mqid;
};

void writerThread()
{
    try
    {
        boost::asio::io_service ioService;
        Writer writer(ioService);
        ioService.run();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }
}

class Reader
{
public:
    Reader(boost::asio::io_service& ioService)
        : ioService(ioService),
          streamDescriptor(ioService)
    {
        struct mq_attr mattr;
        mattr.mq_maxmsg = 10;
        mattr.mq_msgsize = sizeof(struct message);
        mqid = mq_open(MSG_QUEUE_NAME.c_str(), O_CREAT | O_RDONLY |
O_NONBLOCK, S_IREAD | S_IWRITE, &mattr);
        cout << "reader mqid = " << mqid << endl;
        streamDescriptor.assign(mqid);

        streamDescriptor.async_read_some(
            boost::asio::null_buffers(),
            boost::bind(&Reader::handleRead,
              this,
              boost::asio::placeholders::error));
    }

    void handleRead(boost::system::error_code ec)
    {
        u_int pri;
        message msg;
        ssize_t receiveRet = mq_receive(mqid, (char *) &msg, sizeof(msg), &pri);
        cout << "receiveRet = " << receiveRet << endl;

        streamDescriptor.async_read_some(
            boost::asio::null_buffers(),
            boost::bind(&Reader::handleRead,
              this,
              boost::asio::placeholders::error));
    }

private:
    boost::asio::io_service& ioService;
    boost::asio::posix::stream_descriptor streamDescriptor;
    mqd_t mqid;
};

void readerThread()
{
    try
    {
        boost::asio::io_service ioService;
        Reader reader(ioService);
        ioService.run();
    }
    catch (std::exception& e)
    {
        std::cerr << "Exception: " << e.what() << "\n";
    }
}

int main(int argc, char* argv[])
{
    mq_unlink(MSG_QUEUE_NAME.c_str());
    boost::thread writer(writerThread);
    boost::thread reader(readerThread);
    writer.join();
    reader.join();

    return 0;
}

On Tue, Aug 25, 2009 at 8:52 AM, Sergio Martinez<sergiom_at_[hidden]> wrote:
> I'm working on a project where we want a single thread to
> asynchronously interact with posix message queues, TCP sockets and
> drivers.  From what I've read, boost::asio::io_service provides a
> common framework dealing with asynchronous IO.  It looks like there's
> a asio library for TCP and posix::stream_descriptor could be used for
> interacting with a driver file descriptor.  What I don't see is
> support for posix message queues.
>
> The io_service documentation states the following:  "The io_service
> class also includes facilities intended for developers of custom
> asynchronous services.".
>
> I was thinking of adding a custom service for posix message queues.
> Has anyone already done this?  Is there any fundamental reason why it
> could not be done?  (i.e. message queues are not byte streams, etc.)
>
> Thanks,
> Sergio
>


Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net