|
Boost Users : |
Subject: Re: [Boost-users] boost::asio::io_service and posix message queues
From: Sergio Martinez (sergiom_at_[hidden])
Date: 2009-08-28 17:34:03
I didn't get a response to my email, but I ended up figuring out how
to use POSIX message queues with the boost::asio::io_service. Under
Linux you use select/epoll with message queue descriptors. I ended up
using boost::asio::posix::stream_descriptor to interact with message
queues. You can specify boost::asio::null_buffers() for the
async_read_some() and async_write_some() calls. These usually do
some reading or writing, but with null_buffers() they do not do any
reading or writing. You get a callbacks when the select/epoll
indicates that a message queue can be written to or read from. In the
callbacks, you can do the actual mq_send() and mq_receive() calls.
I've written some test code and included it below. There is one
message queue, a reader thread, and a writer thread. The writer
thread uses a timer to send a message through the message queue every
100 ms. The reader thread wakes up immediately to process the
messages.
-- Sergio
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include <fcntl.h>
#include <sys/stat.h>
#include <mqueue.h>
using namespace std;
const std::string MSG_QUEUE_NAME = "/test_msg_queue";
struct message
{
unsigned char buffer[128];
};
class Writer
{
public:
Writer(boost::asio::io_service& ioService)
: ioService(ioService),
streamDescriptor(ioService),
timer(ioService)
{
struct mq_attr mattr;
mattr.mq_maxmsg = 10;
mattr.mq_msgsize = sizeof(struct message);
mqid = mq_open(MSG_QUEUE_NAME.c_str(), O_CREAT | O_WRONLY |
O_NONBLOCK, S_IREAD | S_IWRITE, &mattr);
cout << "writer mqid = " << mqid << endl;
streamDescriptor.assign(mqid);
streamDescriptor.async_write_some(
boost::asio::null_buffers(),
boost::bind(&Writer::handleWrite,
this,
boost::asio::placeholders::error));
}
void handleWrite(boost::system::error_code ec)
{
timer.expires_from_now(boost::posix_time::microseconds(100));
timer.async_wait(boost::bind(&Writer::handleTimer, this,
boost::asio::placeholders::error));
}
void handleTimer(boost::system::error_code& ec)
{
message msg;
int sendRet = mq_send(mqid, (const char*)&msg, sizeof(message), 0);
cout << "sendRet = " << sendRet << endl;
streamDescriptor.async_write_some(
boost::asio::null_buffers(),
boost::bind(&Writer::handleWrite,
this,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& ioService;
boost::asio::posix::stream_descriptor streamDescriptor;
boost::asio::deadline_timer timer;
mqd_t mqid;
};
void writerThread()
{
try
{
boost::asio::io_service ioService;
Writer writer(ioService);
ioService.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}
class Reader
{
public:
Reader(boost::asio::io_service& ioService)
: ioService(ioService),
streamDescriptor(ioService)
{
struct mq_attr mattr;
mattr.mq_maxmsg = 10;
mattr.mq_msgsize = sizeof(struct message);
mqid = mq_open(MSG_QUEUE_NAME.c_str(), O_CREAT | O_RDONLY |
O_NONBLOCK, S_IREAD | S_IWRITE, &mattr);
cout << "reader mqid = " << mqid << endl;
streamDescriptor.assign(mqid);
streamDescriptor.async_read_some(
boost::asio::null_buffers(),
boost::bind(&Reader::handleRead,
this,
boost::asio::placeholders::error));
}
void handleRead(boost::system::error_code ec)
{
u_int pri;
message msg;
ssize_t receiveRet = mq_receive(mqid, (char *) &msg, sizeof(msg), &pri);
cout << "receiveRet = " << receiveRet << endl;
streamDescriptor.async_read_some(
boost::asio::null_buffers(),
boost::bind(&Reader::handleRead,
this,
boost::asio::placeholders::error));
}
private:
boost::asio::io_service& ioService;
boost::asio::posix::stream_descriptor streamDescriptor;
mqd_t mqid;
};
void readerThread()
{
try
{
boost::asio::io_service ioService;
Reader reader(ioService);
ioService.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}
int main(int argc, char* argv[])
{
mq_unlink(MSG_QUEUE_NAME.c_str());
boost::thread writer(writerThread);
boost::thread reader(readerThread);
writer.join();
reader.join();
return 0;
}
On Tue, Aug 25, 2009 at 8:52 AM, Sergio Martinez<sergiom_at_[hidden]> wrote:
> I'm working on a project where we want a single thread to
> asynchronously interact with posix message queues, TCP sockets and
> drivers. From what I've read, boost::asio::io_service provides a
> common framework dealing with asynchronous IO. It looks like there's
> a asio library for TCP and posix::stream_descriptor could be used for
> interacting with a driver file descriptor. What I don't see is
> support for posix message queues.
>
> The io_service documentation states the following: "The io_service
> class also includes facilities intended for developers of custom
> asynchronous services.".
>
> I was thinking of adding a custom service for posix message queues.
> Has anyone already done this? Is there any fundamental reason why it
> could not be done? (i.e. message queues are not byte streams, etc.)
>
> Thanks,
> Sergio
>
Boost-users list run by williamkempf at hotmail.com, kalb at libertysoft.com, bjorn.karlsson at readsoft.com, gregod at cs.rpi.edu, wekempf at cox.net