|
Boost : |
From: Ion Gaztañaga (igaztanaga_at_[hidden])
Date: 2007-09-28 07:41:13
Peter Dimov wrote:
> Ion Gaztañaga wrote:
>
>> Does this support building those objects in mapped files, rebooting
>> the system, mapping the file again again and continue working?
>
> Is this guaranteed by POSIX? Which specific cases need to be supported? How
> would a condition that is rebooted in the middle of four threads waiting and
> one thread somewhere inside notify_one be able to resume operation?
At least in the classic "UNIX Network Programming, Volume 2, Second
Edition: Interprocess Communications" (W. Richard Stevens) there is a
chapter explaining POSIX message queues and an implementation using
Memory mapped I/O and posix mutexes and condition variables.
Whether this is officially supported by POSIX, I thought it was, but I
can't find an explicit comment on this. Is there any POSIX expert out
there that can solve this? It seems that Solaris + Linux and other
UNIXes support it without problems.
Book TOC excerpt taken from http://www.kohala.com/start/unpv22e/unpv22e.html
Chapter 5.Posix Message Queues 75
5.1 Introduction 75
5.2 mq_open, mq_close, and mq_unlink Functions 76
5.3 mq_getattr and mq_setattr Functions 79
5.4 mq_send and mq_receive Functions 82
5.5 Message Queue Limits 86
5.6 mq_notify Function 87
5.7 Posix Realtime Signals 98
5.8 Implementation Using Memory-Mapped I/O 106
5.9 Summary 126
You can download the source code of the book (including the example of
building mq_xxx using mmapped files and posix cond/mutexes) in that page.
I've attached the code from that book for "mq_open" using pthread
mutexes/conditions + memory mapped files.
Regards,
Ion
/* include mq_open1 */
#include "unpipc.h"
#include "mqueue.h"
#include <stdarg.h>
#define MAX_TRIES 10 /* for waiting for initialization */
struct mymq_attr defattr = { 0, 128, 1024, 0 };
mymqd_t
mymq_open(const char *pathname, int oflag, ...)
{
int i, fd, nonblock, created, save_errno;
long msgsize, filesize, index;
va_list ap;
mode_t mode;
int8_t *mptr;
struct stat statbuff;
struct mymq_hdr *mqhdr;
struct mymsg_hdr *msghdr;
struct mymq_attr *attr;
struct mymq_info *mqinfo;
pthread_mutexattr_t mattr;
pthread_condattr_t cattr;
created = 0;
nonblock = oflag & O_NONBLOCK;
oflag &= ~O_NONBLOCK;
mptr = (int8_t *) MAP_FAILED;
mqinfo = NULL;
again:
if (oflag & O_CREAT) {
va_start(ap, oflag); /* init ap to final named argument */
mode = va_arg(ap, va_mode_t) & ~S_IXUSR;
attr = va_arg(ap, struct mymq_attr *);
va_end(ap);
/* 4open and specify O_EXCL and user-execute */
fd = open(pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
if (fd < 0) {
if (errno == EEXIST && (oflag & O_EXCL) == 0)
goto exists; /* already exists, OK */
else
return((mymqd_t) -1);
}
created = 1;
/* 4first one to create the file initializes it */
if (attr == NULL)
attr = &defattr;
else {
if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
errno = EINVAL;
goto err;
}
}
/* end mq_open1 */
/* include mq_open2 */
/* 4calculate and set the file size */
msgsize = MSGSIZE(attr->mq_msgsize);
filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *
(sizeof(struct mymsg_hdr) + msgsize));
if (lseek(fd, filesize - 1, SEEK_SET) == -1)
goto err;
if (write(fd, "", 1) == -1)
goto err;
/* 4memory map the file */
mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (mptr == MAP_FAILED)
goto err;
/* 4allocate one mymq_info{} for the queue */
/* *INDENT-OFF* */
if ( (mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
goto err;
/* *INDENT-ON* */
mqinfo->mqi_hdr = mqhdr = (struct mymq_hdr *) mptr;
mqinfo->mqi_magic = MQI_MAGIC;
mqinfo->mqi_flags = nonblock;
/* 4initialize header at beginning of file */
/* 4create free list with all messages on it */
mqhdr->mqh_attr.mq_flags = 0;
mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
mqhdr->mqh_attr.mq_curmsgs = 0;
mqhdr->mqh_nwait = 0;
mqhdr->mqh_pid = 0;
mqhdr->mqh_head = 0;
index = sizeof(struct mymq_hdr);
mqhdr->mqh_free = index;
for (i = 0; i < attr->mq_maxmsg - 1; i++) {
msghdr = (struct mymsg_hdr *) &mptr[index];
index += sizeof(struct mymsg_hdr) + msgsize;
msghdr->msg_next = index;
}
msghdr = (struct mymsg_hdr *) &mptr[index];
msghdr->msg_next = 0; /* end of free list */
/* 4initialize mutex & condition variable */
if ( (i = pthread_mutexattr_init(&mattr)) != 0)
goto pthreaderr;
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
pthread_mutexattr_destroy(&mattr); /* be sure to destroy */
if (i != 0)
goto pthreaderr;
if ( (i = pthread_condattr_init(&cattr)) != 0)
goto pthreaderr;
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
pthread_condattr_destroy(&cattr); /* be sure to destroy */
if (i != 0)
goto pthreaderr;
/* 4initialization complete, turn off user-execute bit */
if (fchmod(fd, mode) == -1)
goto err;
close(fd);
return((mymqd_t) mqinfo);
}
/* end mq_open2 */
/* include mq_open3 */
exists:
/* 4open the file then memory map */
if ( (fd = open(pathname, O_RDWR)) < 0) {
if (errno == ENOENT && (oflag & O_CREAT))
goto again;
goto err;
}
/* 4make certain initialization is complete */
for (i = 0; i < MAX_TRIES; i++) {
if (stat(pathname, &statbuff) == -1) {
if (errno == ENOENT && (oflag & O_CREAT)) {
close(fd);
goto again;
}
goto err;
}
if ((statbuff.st_mode & S_IXUSR) == 0)
break;
sleep(1);
}
if (i == MAX_TRIES) {
errno = ETIMEDOUT;
goto err;
}
filesize = statbuff.st_size;
mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (mptr == MAP_FAILED)
goto err;
close(fd);
/* 4allocate one mymq_info{} for each open */
/* *INDENT-OFF* */
if ( (mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
goto err;
/* *INDENT-ON* */
mqinfo->mqi_hdr = (struct mymq_hdr *) mptr;
mqinfo->mqi_magic = MQI_MAGIC;
mqinfo->mqi_flags = nonblock;
return((mymqd_t) mqinfo);
/* $$.bp$$ */
pthreaderr:
errno = i;
err:
/* 4don't let following function calls change errno */
save_errno = errno;
if (created)
unlink(pathname);
if (mptr != MAP_FAILED)
munmap(mptr, filesize);
if (mqinfo != NULL)
free(mqinfo);
close(fd);
errno = save_errno;
return((mymqd_t) -1);
}
/* end mq_open3 */
mymqd_t
Mymq_open(const char *pathname, int oflag, ...)
{
mymqd_t mqd;
va_list ap;
mode_t mode;
struct mymq_attr *attr;
if (oflag & O_CREAT) {
va_start(ap, oflag); /* init ap to final named argument */
mode = va_arg(ap, va_mode_t);
attr = va_arg(ap, struct mymq_attr *);
if ( (mqd = mymq_open(pathname, oflag, mode, attr)) == (mymqd_t) -1)
err_sys("mymq_open error for %s", pathname);
va_end(ap);
} else {
if ( (mqd = mymq_open(pathname, oflag)) == (mymqd_t) -1)
err_sys("mymq_open error for %s", pathname);
}
return(mqd);
}
Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk