Boost logo

Boost :

From: Ion Gaztañaga (igaztanaga_at_[hidden])
Date: 2007-09-28 07:41:13


Peter Dimov wrote:
> Ion Gaztañaga wrote:
>
>> Does this support building those objects in mapped files, rebooting
>> the system, mapping the file again again and continue working?
>
> Is this guaranteed by POSIX? Which specific cases need to be supported? How
> would a condition that is rebooted in the middle of four threads waiting and
> one thread somewhere inside notify_one be able to resume operation?

At least in the classic "UNIX Network Programming, Volume 2, Second
Edition: Interprocess Communications" (W. Richard Stevens) there is a
chapter explaining POSIX message queues and an implementation using
Memory mapped I/O and posix mutexes and condition variables.

Whether this is officially supported by POSIX, I thought it was, but I
can't find an explicit comment on this. Is there any POSIX expert out
there that can solve this? It seems that Solaris + Linux and other
UNIXes support it without problems.

Book TOC excerpt taken from http://www.kohala.com/start/unpv22e/unpv22e.html

Chapter 5.Posix Message Queues 75
     5.1 Introduction 75
     5.2 mq_open, mq_close, and mq_unlink Functions 76
     5.3 mq_getattr and mq_setattr Functions 79
     5.4 mq_send and mq_receive Functions 82
     5.5 Message Queue Limits 86
     5.6 mq_notify Function 87
     5.7 Posix Realtime Signals 98
     5.8 Implementation Using Memory-Mapped I/O 106
     5.9 Summary 126

You can download the source code of the book (including the example of
building mq_xxx using mmapped files and posix cond/mutexes) in that page.

I've attached the code from that book for "mq_open" using pthread
mutexes/conditions + memory mapped files.

Regards,

Ion


/* include mq_open1 */
#include "unpipc.h"
#include "mqueue.h"

#include <stdarg.h>
#define MAX_TRIES 10 /* for waiting for initialization */

struct mymq_attr defattr = { 0, 128, 1024, 0 };

mymqd_t
mymq_open(const char *pathname, int oflag, ...)
{
        int i, fd, nonblock, created, save_errno;
        long msgsize, filesize, index;
        va_list ap;
        mode_t mode;
        int8_t *mptr;
        struct stat statbuff;
        struct mymq_hdr *mqhdr;
        struct mymsg_hdr *msghdr;
        struct mymq_attr *attr;
        struct mymq_info *mqinfo;
        pthread_mutexattr_t mattr;
        pthread_condattr_t cattr;

        created = 0;
        nonblock = oflag & O_NONBLOCK;
        oflag &= ~O_NONBLOCK;
        mptr = (int8_t *) MAP_FAILED;
        mqinfo = NULL;
again:
        if (oflag & O_CREAT) {
                va_start(ap, oflag); /* init ap to final named argument */
                mode = va_arg(ap, va_mode_t) & ~S_IXUSR;
                attr = va_arg(ap, struct mymq_attr *);
                va_end(ap);

                        /* 4open and specify O_EXCL and user-execute */
                fd = open(pathname, oflag | O_EXCL | O_RDWR, mode | S_IXUSR);
                if (fd < 0) {
                        if (errno == EEXIST && (oflag & O_EXCL) == 0)
                                goto exists; /* already exists, OK */
                        else
                                return((mymqd_t) -1);
                }
                created = 1;
                        /* 4first one to create the file initializes it */
                if (attr == NULL)
                        attr = &defattr;
                else {
                        if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
                                errno = EINVAL;
                                goto err;
                        }
                }
/* end mq_open1 */
/* include mq_open2 */
                        /* 4calculate and set the file size */
                msgsize = MSGSIZE(attr->mq_msgsize);
                filesize = sizeof(struct mymq_hdr) + (attr->mq_maxmsg *
                                   (sizeof(struct mymsg_hdr) + msgsize));
                if (lseek(fd, filesize - 1, SEEK_SET) == -1)
                        goto err;
                if (write(fd, "", 1) == -1)
                        goto err;

                        /* 4memory map the file */
                mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE,
                                        MAP_SHARED, fd, 0);
                if (mptr == MAP_FAILED)
                        goto err;

                        /* 4allocate one mymq_info{} for the queue */
/* *INDENT-OFF* */
                if ( (mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
                        goto err;
/* *INDENT-ON* */
                mqinfo->mqi_hdr = mqhdr = (struct mymq_hdr *) mptr;
                mqinfo->mqi_magic = MQI_MAGIC;
                mqinfo->mqi_flags = nonblock;

                        /* 4initialize header at beginning of file */
                        /* 4create free list with all messages on it */
                mqhdr->mqh_attr.mq_flags = 0;
                mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
                mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;
                mqhdr->mqh_attr.mq_curmsgs = 0;
                mqhdr->mqh_nwait = 0;
                mqhdr->mqh_pid = 0;
                mqhdr->mqh_head = 0;
                index = sizeof(struct mymq_hdr);
                mqhdr->mqh_free = index;
                for (i = 0; i < attr->mq_maxmsg - 1; i++) {
                        msghdr = (struct mymsg_hdr *) &mptr[index];
                        index += sizeof(struct mymsg_hdr) + msgsize;
                        msghdr->msg_next = index;
                }
                msghdr = (struct mymsg_hdr *) &mptr[index];
                msghdr->msg_next = 0; /* end of free list */

                        /* 4initialize mutex & condition variable */
                if ( (i = pthread_mutexattr_init(&mattr)) != 0)
                        goto pthreaderr;
                pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
                i = pthread_mutex_init(&mqhdr->mqh_lock, &mattr);
                pthread_mutexattr_destroy(&mattr); /* be sure to destroy */
                if (i != 0)
                        goto pthreaderr;

                if ( (i = pthread_condattr_init(&cattr)) != 0)
                        goto pthreaderr;
                pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
                i = pthread_cond_init(&mqhdr->mqh_wait, &cattr);
                pthread_condattr_destroy(&cattr); /* be sure to destroy */
                if (i != 0)
                        goto pthreaderr;

                        /* 4initialization complete, turn off user-execute bit */
                if (fchmod(fd, mode) == -1)
                        goto err;
                close(fd);
                return((mymqd_t) mqinfo);
        }
/* end mq_open2 */
/* include mq_open3 */
exists:
                /* 4open the file then memory map */
        if ( (fd = open(pathname, O_RDWR)) < 0) {
                if (errno == ENOENT && (oflag & O_CREAT))
                        goto again;
                goto err;
        }

                /* 4make certain initialization is complete */
        for (i = 0; i < MAX_TRIES; i++) {
                if (stat(pathname, &statbuff) == -1) {
                        if (errno == ENOENT && (oflag & O_CREAT)) {
                                close(fd);
                                goto again;
                        }
                        goto err;
                }
                if ((statbuff.st_mode & S_IXUSR) == 0)
                        break;
                sleep(1);
        }
        if (i == MAX_TRIES) {
                errno = ETIMEDOUT;
                goto err;
        }

        filesize = statbuff.st_size;
        mptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
        if (mptr == MAP_FAILED)
                goto err;
        close(fd);

                /* 4allocate one mymq_info{} for each open */
/* *INDENT-OFF* */
        if ( (mqinfo = malloc(sizeof(struct mymq_info))) == NULL)
                goto err;
/* *INDENT-ON* */
        mqinfo->mqi_hdr = (struct mymq_hdr *) mptr;
        mqinfo->mqi_magic = MQI_MAGIC;
        mqinfo->mqi_flags = nonblock;
        return((mymqd_t) mqinfo);
/* $$.bp$$ */
pthreaderr:
        errno = i;
err:
                /* 4don't let following function calls change errno */
        save_errno = errno;
        if (created)
                unlink(pathname);
        if (mptr != MAP_FAILED)
                munmap(mptr, filesize);
        if (mqinfo != NULL)
                free(mqinfo);
        close(fd);
        errno = save_errno;
        return((mymqd_t) -1);
}
/* end mq_open3 */

mymqd_t
Mymq_open(const char *pathname, int oflag, ...)
{
        mymqd_t mqd;
        va_list ap;
        mode_t mode;
        struct mymq_attr *attr;

        if (oflag & O_CREAT) {
                va_start(ap, oflag); /* init ap to final named argument */
                mode = va_arg(ap, va_mode_t);
                attr = va_arg(ap, struct mymq_attr *);
                if ( (mqd = mymq_open(pathname, oflag, mode, attr)) == (mymqd_t) -1)
                        err_sys("mymq_open error for %s", pathname);
                va_end(ap);
        } else {
                if ( (mqd = mymq_open(pathname, oflag)) == (mymqd_t) -1)
                        err_sys("mymq_open error for %s", pathname);
        }
        return(mqd);
}


Boost list run by bdawes at acm.org, gregod at cs.rpi.edu, cpdaniel at pacbell.net, john at johnmaddock.co.uk