From f2df0f7ca6de3f944dcbfb9923750d7c88984535 Mon Sep 17 00:00:00 2001 From: Stathis Kamperis Date: Wed, 7 Oct 2009 20:03:34 +0000 Subject: [PATCH] mqueues: Port POSIX message queues from NetBSD. --- sys/kern/sys_mqueue.c | 1114 +++++++++++++++++++++++++++++++++++++++++ sys/sys/file.h | 1 + sys/sys/mqueue.h | 147 ++++-- sys/sys/proc.h | 1 + sys/sys/types.h | 1 + 5 files changed, 1217 insertions(+), 47 deletions(-) create mode 100644 sys/kern/sys_mqueue.c diff --git a/sys/kern/sys_mqueue.c b/sys/kern/sys_mqueue.c new file mode 100644 index 0000000000..38a7439646 --- /dev/null +++ b/sys/kern/sys_mqueue.c @@ -0,0 +1,1114 @@ +/* $NetBSD: sys_mqueue.c,v 1.16 2009/04/11 23:05:26 christos Exp $ */ + +/* + * Copyright (c) 2007, 2008 Mindaugas Rasiukevicius + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +/* + * Implementation of POSIX message queues. + * Defined in the Base Definitions volume of IEEE Std 1003.1-2001. + * + * Locking + * + * Global list of message queues (mqueue_head) and proc_t::p_mqueue_cnt + * counter are protected by mqlist_mtx lock. The very message queue and + * its members are protected by mqueue::mq_mtx. + * + * Lock order: + * mqlist_mtx + * -> mqueue::mq_mtx + */ + +#include +/*__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.16 2009/04/11 23:05:26 christos Exp $");*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* System-wide limits. */ +static u_int mq_open_max = MQ_OPEN_MAX; +static u_int mq_prio_max = MQ_PRIO_MAX; +static u_int mq_max_msgsize = 16 * MQ_DEF_MSGSIZE; +static u_int mq_def_maxmsg = 32; + +struct lock mqlist_mtx; +static struct objcache * mqmsg_cache; +static LIST_HEAD(, mqueue) mqueue_head = + LIST_HEAD_INITIALIZER(mqueue_head); + +typedef struct file file_t; /* XXX: Should we put this in sys/types.h ? */ + +/* Function prototypes */ +static int mq_poll_fop(file_t *, int, struct ucred *cred); +static int mq_stat_fop(file_t *, struct stat *, struct ucred *cred); +static int mq_close_fop(file_t *); + +/* Some time-related utility functions */ +static int itimespecfix(struct timespec *ts); +static int tstohz(const struct timespec *ts); + +/* File operations vector */ +static struct fileops mqops = { + .fo_read = badfo_readwrite, + .fo_write = badfo_readwrite, + .fo_ioctl = badfo_ioctl, + .fo_poll = mq_poll_fop, + .fo_stat = mq_stat_fop, + .fo_close = mq_close_fop, + .fo_kqfilter = badfo_kqfilter, + .fo_shutdown = badfo_shutdown +}; + +/* Define a new malloc type for message queues */ +MALLOC_DECLARE(M_MQBUF); +MALLOC_DEFINE(M_MQBUF, "mqueues", "Buffers to message queues"); + +/* Malloc arguments for object cache */ +struct objcache_malloc_args mqueue_malloc_args = { + sizeof(struct mqueue), M_MQBUF }; + +/* Spinlock around the process list */ +extern struct spinlock allproc_spin; + +/* + * Initialize POSIX message queue subsystem. + */ +void +mqueue_sysinit(void) +{ + mqmsg_cache = objcache_create("mqmsg_cache", + 0, /* infinite depot's capacity */ + 0, /* default magazine's capacity */ + NULL, /* constructor */ + NULL, /* deconstructor */ + NULL, + objcache_malloc_alloc, + objcache_malloc_free, + &mqueue_malloc_args); + + lockinit(&mqlist_mtx, "mqlist_mtx", 0, LK_CANRECURSE); +} + +/* + * Free the message. + */ +static void +mqueue_freemsg(struct mq_msg *msg, const size_t size) +{ + + if (size > MQ_DEF_MSGSIZE) + kfree(msg, M_MQBUF); + else + objcache_put(mqmsg_cache, msg); +} + +/* + * Destroy the message queue. + */ +static void +mqueue_destroy(struct mqueue *mq) +{ + struct mq_msg *msg; + size_t msz; + u_int i; + + /* Note MQ_PQSIZE + 1. */ + for (i = 0; i < MQ_PQSIZE + 1; i++) { + while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) { + TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue); + msz = sizeof(struct mq_msg) + msg->msg_len; + mqueue_freemsg(msg, msz); + } + } + lockuninit(&mq->mq_mtx); + kfree(mq, M_MQBUF); +} + +/* + * Lookup for file name in general list of message queues. + * => locks the message queue + */ +static void * +mqueue_lookup(char *name) +{ + struct mqueue *mq; + + KKASSERT(lockstatus(&mqlist_mtx, curthread)); + + LIST_FOREACH(mq, &mqueue_head, mq_list) { + if (strncmp(mq->mq_name, name, MQ_NAMELEN) == 0) { + lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); + return mq; + } + } + + return NULL; +} + +/* + * mqueue_get: get the mqueue from the descriptor. + * => locks the message queue, if found. + * => hold a reference on the file descriptor. + */ +static int +mqueue_get(struct lwp *l, mqd_t mqd, file_t **fpr) +{ + struct mqueue *mq; + file_t *fp; + + fp = holdfp(curproc->p_fd, (int)mqd, -1); /* XXX: Why -1 ? */ + if (__predict_false(fp == NULL)) + return EBADF; + + if (__predict_false(fp->f_type != DTYPE_MQUEUE)) { + fdrop(fp); + return EBADF; + } + mq = fp->f_data; + lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); + + *fpr = fp; + return 0; +} + +/* + * mqueue_linear_insert: perform linear insert according to the message + * priority into the reserved queue (MQ_PQRESQ). Reserved queue is a + * sorted list used only when mq_prio_max is increased via sysctl. + */ +static inline void +mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg) +{ + struct mq_msg *mit; + + TAILQ_FOREACH(mit, &mq->mq_head[MQ_PQRESQ], msg_queue) { + if (msg->msg_prio > mit->msg_prio) + break; + } + if (mit == NULL) { + TAILQ_INSERT_TAIL(&mq->mq_head[MQ_PQRESQ], msg, msg_queue); + } else { + TAILQ_INSERT_BEFORE(mit, msg, msg_queue); + } +} + +/* + * Validate input. + */ +int +itimespecfix(struct timespec *ts) +{ + if (ts->tv_sec < 0 || ts->tv_nsec < 0 || ts->tv_nsec >= 1000000000) + return (EINVAL); + if (ts->tv_sec == 0 && ts->tv_nsec != 0 && ts->tv_nsec < tick * 1000) + ts->tv_nsec = tick * 1000; + return (0); +} + +/* + * Compute number of ticks in the specified amount of time. + */ +int +tstohz(const struct timespec *ts) +{ + struct timeval tv; + + /* + * usec has great enough resolution for hz, so convert to a + * timeval and use tvtohz() above. + */ + TIMESPEC_TO_TIMEVAL(&tv, ts); + return tvtohz_high(&tv); /* XXX Why _high() and not _low() ? */ +} + +/* + * Converter from struct timespec to the ticks. + * Used by mq_timedreceive(), mq_timedsend(). + */ +int +abstimeout2timo(struct timespec *ts, int *timo) +{ + struct timespec tsd; + int error; + + getnanotime(&tsd); + timespecsub(ts, &tsd); + if (ts->tv_sec < 0 || (ts->tv_sec == 0 && ts->tv_nsec <= 0)) { + return ETIMEDOUT; + } + error = itimespecfix(ts); + if (error) { + return error; + } + *timo = tstohz(ts); + KKASSERT(*timo != 0); + + return 0; +} + +static int +mq_stat_fop(file_t *fp, struct stat *st, struct ucred *cred) +{ + struct mqueue *mq = fp->f_data; + + (void)memset(st, 0, sizeof(*st)); + + lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); + st->st_mode = mq->mq_mode; + st->st_uid = mq->mq_euid; + st->st_gid = mq->mq_egid; + st->st_atimespec = mq->mq_atime; + st->st_mtimespec = mq->mq_mtime; + /*st->st_ctimespec = st->st_birthtimespec = mq->mq_btime;*/ + st->st_uid = fp->f_cred->cr_uid; + st->st_gid = fp->f_cred->cr_svgid; + lockmgr(&mq->mq_mtx, LK_RELEASE); + + return 0; +} + +static int +mq_poll_fop(file_t *fp, int events, struct ucred *cred) +{ + struct mqueue *mq = fp->f_data; + int revents = 0; + + lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); + if (events & (POLLIN | POLLRDNORM)) { + /* Ready for receiving, if there are messages in the queue */ + if (mq->mq_attrib.mq_curmsgs) + revents |= (POLLIN | POLLRDNORM); + else + selrecord(curthread, &mq->mq_rsel); + } + if (events & (POLLOUT | POLLWRNORM)) { + /* Ready for sending, if the message queue is not full */ + if (mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg) + revents |= (POLLOUT | POLLWRNORM); + else + selrecord(curthread, &mq->mq_wsel); + } + lockmgr(&mq->mq_mtx, LK_RELEASE); + + return revents; +} + +static int +mq_close_fop(file_t *fp) +{ + struct proc *p = curproc; + struct mqueue *mq = fp->f_data; + bool destroy; + + lockmgr(&mqlist_mtx, LK_EXCLUSIVE); + lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); + + /* Decrease the counters */ + p->p_mqueue_cnt--; + mq->mq_refcnt--; + + /* Remove notification if registered for this process */ + if (mq->mq_notify_proc == p) + mq->mq_notify_proc = NULL; + + /* + * If this is the last reference and mqueue is marked for unlink, + * remove and later destroy the message queue. + */ + if (mq->mq_refcnt == 0 && (mq->mq_attrib.mq_flags & MQ_UNLINK)) { + LIST_REMOVE(mq, mq_list); + destroy = true; + } else + destroy = false; + + lockmgr(&mq->mq_mtx, LK_RELEASE); + lockmgr(&mqlist_mtx, LK_RELEASE); + + if (destroy) + mqueue_destroy(mq); + + return 0; +} + +/* + * General mqueue system calls. + */ + +int +sys_mq_open(struct mq_open_args *uap) +{ + /* { + syscallarg(const char *) name; + syscallarg(int) oflag; + syscallarg(mode_t) mode; + syscallarg(struct mq_attr) attr; + } */ + struct proc *p = curproc; + struct mqueue *mq, *mq_new = NULL; + file_t *fp; + char *name; + int mqd, error, oflag; + + /* Check access mode flags */ + oflag = SCARG(uap, oflag); + if ((oflag & O_ACCMODE) == (O_WRONLY | O_RDWR)) { + return EINVAL; + } + + /* Get the name from the user-space */ + name = kmalloc(MQ_NAMELEN, M_MQBUF, M_WAITOK | M_ZERO); + error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL); + if (error) { + kfree(name, M_MQBUF); + return error; + } + + if (oflag & O_CREAT) { + struct mq_attr attr; + u_int i; + + /* Check the limit */ + if (p->p_mqueue_cnt == mq_open_max) { + kfree(name, M_MQBUF); + return EMFILE; + } + + /* Empty name is invalid */ + if (name[0] == '\0') { + kfree(name, M_MQBUF); + return EINVAL; + } + + /* Check for mqueue attributes */ + if (SCARG(uap, attr)) { + error = copyin(SCARG(uap, attr), &attr, + sizeof(struct mq_attr)); + if (error) { + kfree(name, M_MQBUF); + return error; + } + if (attr.mq_maxmsg <= 0 || attr.mq_msgsize <= 0 || + attr.mq_msgsize > mq_max_msgsize) { + kfree(name, M_MQBUF); + return EINVAL; + } + attr.mq_curmsgs = 0; + } else { + memset(&attr, 0, sizeof(struct mq_attr)); + attr.mq_maxmsg = mq_def_maxmsg; + attr.mq_msgsize = + MQ_DEF_MSGSIZE - sizeof(struct mq_msg); + } + + /* + * Allocate new mqueue, initialize data structures, + * copy the name, attributes and set the flag. + */ + mq_new = kmalloc(sizeof(struct mqueue), M_MQBUF, M_WAITOK | M_ZERO); + + lockinit(&mq_new->mq_mtx, "mq_new->mq_mtx", 0, LK_CANRECURSE); + for (i = 0; i < (MQ_PQSIZE + 1); i++) { + TAILQ_INIT(&mq_new->mq_head[i]); + } + + strlcpy(mq_new->mq_name, name, MQ_NAMELEN); + memcpy(&mq_new->mq_attrib, &attr, sizeof(struct mq_attr)); + + /*CTASSERT((O_MASK & (MQ_UNLINK | MQ_RECEIVE)) == 0);*/ + /* mq_new->mq_attrib.mq_flags = (O_MASK & oflag); */ + mq_new->mq_attrib.mq_flags = oflag; + + /* Store mode and effective UID with GID */ + mq_new->mq_mode = ((SCARG(uap, mode) & + ~p->p_fd->fd_cmask) & ALLPERMS) & ~S_ISTXT; + mq_new->mq_euid = curproc->p_ucred->cr_uid; + mq_new->mq_egid = curproc->p_ucred->cr_svgid; + } + + /* Allocate file structure and descriptor */ + error = falloc(curproc, &fp, &mqd); + if (error) { + if (mq_new) + mqueue_destroy(mq_new); + kfree(name, M_MQBUF); + return error; + } + fp->f_type = DTYPE_MQUEUE; + fp->f_flag = FFLAGS(oflag) & (FREAD | FWRITE); + fp->f_ops = &mqops; + + /* Look up for mqueue with such name */ + lockmgr(&mqlist_mtx, LK_EXCLUSIVE); + mq = mqueue_lookup(name); + if (mq) { + int acc_mode; + + KKASSERT(lockstatus(&mq->mq_mtx, curthread)); + + /* Check if mqueue is not marked as unlinking */ + if (mq->mq_attrib.mq_flags & MQ_UNLINK) { + error = EACCES; + goto exit; + } + /* Fail if O_EXCL is set, and mqueue already exists */ + if ((oflag & O_CREAT) && (oflag & O_EXCL)) { + error = EEXIST; + goto exit; + } + + /* + * Check the permissions. Note the difference between + * VREAD/VWRITE and FREAD/FWRITE. + */ + acc_mode = 0; + if (fp->f_flag & FREAD) { + acc_mode |= VREAD; + } + if (fp->f_flag & FWRITE) { + acc_mode |= VWRITE; + } + if (vaccess(VNON, mq->mq_mode, mq->mq_euid, mq->mq_egid, + acc_mode, curproc->p_ucred)) { + + error = EACCES; + goto exit; + } + } else { + /* Fail if mqueue neither exists, nor we create it */ + if ((oflag & O_CREAT) == 0) { + lockmgr(&mqlist_mtx, LK_RELEASE); + KKASSERT(mq_new == NULL); + fsetfd(curproc, NULL, mqd); + fp->f_ops = &badfileops; + fdrop(fp); + kfree(name, M_MQBUF); + return ENOENT; + } + + /* Check the limit */ + if (p->p_mqueue_cnt == mq_open_max) { + error = EMFILE; + goto exit; + } + + /* Insert the queue to the list */ + mq = mq_new; + lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); + LIST_INSERT_HEAD(&mqueue_head, mq, mq_list); + mq_new = NULL; + getnanotime(&mq->mq_btime); + mq->mq_atime = mq->mq_mtime = mq->mq_btime; + } + + /* Increase the counters, and make descriptor ready */ + p->p_mqueue_cnt++; + mq->mq_refcnt++; + fp->f_data = mq; +exit: + lockmgr(&mq->mq_mtx, LK_RELEASE); + lockmgr(&mqlist_mtx, LK_RELEASE); + + if (mq_new) + mqueue_destroy(mq_new); + if (error) { + fsetfd(curproc, NULL, mqd); + fp->f_ops = &badfileops; + } else { + fsetfd(p, fp, mqd); + uap->sysmsg_result = mqd; + } + fdrop(fp); + kfree(name, M_MQBUF); + + return error; +} + +int +sys_mq_close(struct mq_close_args *uap) +{ + return sys_close((void *)uap); +} + +/* + * Primary mq_receive1() function. + */ +int +mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len, + unsigned *msg_prio, struct timespec *ts, ssize_t *mlen) +{ + file_t *fp = NULL; + struct mqueue *mq; + struct mq_msg *msg = NULL; + struct mq_attr *mqattr; + u_int idx; + int error; + + /* Get the message queue */ + error = mqueue_get(l, mqdes, &fp); + if (error) { + return error; + } + mq = fp->f_data; + if ((fp->f_flag & FREAD) == 0) { + error = EBADF; + goto error; + } + getnanotime(&mq->mq_atime); + mqattr = &mq->mq_attrib; + + /* Check the message size limits */ + if (msg_len < mqattr->mq_msgsize) { + error = EMSGSIZE; + goto error; + } + + /* Check if queue is empty */ + while (mqattr->mq_curmsgs == 0) { + int t; + + if (mqattr->mq_flags & O_NONBLOCK) { + error = EAGAIN; + goto error; + } + error = abstimeout2timo(ts, &t); + if (error) { + goto error; + } + /* + * Block until someone sends the message. + * While doing this, notification should not be sent. + */ + mqattr->mq_flags |= MQ_RECEIVE; + error = tsleep(&mq->mq_send_cv, PCATCH, "mqsend", t); + mqattr->mq_flags &= ~MQ_RECEIVE; + if (error || (mqattr->mq_flags & MQ_UNLINK)) { + error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR; + goto error; + } + } + + + /* + * Find the highest priority message, and remove it from the queue. + * At first, reserved queue is checked, bitmap is next. + */ + msg = TAILQ_FIRST(&mq->mq_head[MQ_PQRESQ]); + if (__predict_true(msg == NULL)) { + idx = ffs(mq->mq_bitmap); + msg = TAILQ_FIRST(&mq->mq_head[idx]); + KKASSERT(msg != NULL); + } else { + idx = MQ_PQRESQ; + } + TAILQ_REMOVE(&mq->mq_head[idx], msg, msg_queue); + + /* Unmark the bit, if last message. */ + if (__predict_true(idx) && TAILQ_EMPTY(&mq->mq_head[idx])) { + KKASSERT((MQ_PQSIZE - idx) == msg->msg_prio); + mq->mq_bitmap &= ~(1 << --idx); + } + + /* Decrement the counter and signal waiter, if any */ + mqattr->mq_curmsgs--; + wakeup_one(&mq->mq_recv_cv); + + /* Ready for sending now */ + selwakeup(&mq->mq_wsel); +error: + lockmgr(&mq->mq_mtx, LK_RELEASE); + fdrop(fp); + if (error) + return error; + + /* + * Copy the data to the user-space. + * Note: According to POSIX, no message should be removed from the + * queue in case of fail - this would be violated. + */ + *mlen = msg->msg_len; + error = copyout(msg->msg_ptr, msg_ptr, msg->msg_len); + if (error == 0 && msg_prio) + error = copyout(&msg->msg_prio, msg_prio, sizeof(unsigned)); + mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len); + + return error; +} + +int +sys_mq_receive(struct mq_receive_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned *) msg_prio; + } */ + ssize_t mlen; + int error; + + error = mq_receive1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), 0, &mlen); + if (error == 0) + uap->sysmsg_result = mlen; + + return error; +} + +int +sys_mq_timedreceive(struct mq_timedreceive_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned *) msg_prio; + syscallarg(const struct timespec *) abs_timeout; + } */ + int error; + ssize_t mlen; + struct timespec ts, *tsp; + + /* Get and convert time value */ + if (SCARG(uap, abs_timeout)) { + error = copyin(SCARG(uap, abs_timeout), &ts, sizeof(ts)); + if (error) + return error; + tsp = &ts; + } else { + tsp = NULL; + } + + error = mq_receive1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), tsp, &mlen); + if (error == 0) + uap->sysmsg_result = mlen; + + return error; +} + +/* + * Primary mq_send1() function. + */ +int +mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len, + unsigned msg_prio, struct timespec *ts) +{ + file_t *fp = NULL; + struct mqueue *mq; + struct mq_msg *msg; + struct mq_attr *mqattr; + struct proc *notify = NULL; + /*ksiginfo_t ksi;*/ + size_t size; + int error; + + /* Check the priority range */ + if (msg_prio >= mq_prio_max) + return EINVAL; + + /* Allocate a new message */ + size = sizeof(struct mq_msg) + msg_len; + if (size > mq_max_msgsize) + return EMSGSIZE; + + if (size > MQ_DEF_MSGSIZE) { + msg = kmalloc(size, M_MQBUF, M_WAITOK); + } else { + msg = objcache_get(mqmsg_cache, M_WAITOK); + } + + /* Get the data from user-space */ + error = copyin(msg_ptr, msg->msg_ptr, msg_len); + if (error) { + mqueue_freemsg(msg, size); + return error; + } + msg->msg_len = msg_len; + msg->msg_prio = msg_prio; + + /* Get the mqueue */ + error = mqueue_get(l, mqdes, &fp); + if (error) { + mqueue_freemsg(msg, size); + return error; + } + mq = fp->f_data; + if ((fp->f_flag & FWRITE) == 0) { + error = EBADF; + goto error; + } + getnanotime(&mq->mq_mtime); + mqattr = &mq->mq_attrib; + + /* Check the message size limit */ + if (msg_len <= 0 || msg_len > mqattr->mq_msgsize) { + error = EMSGSIZE; + goto error; + } + + /* Check if queue is full */ + while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) { + int t; + + if (mqattr->mq_flags & O_NONBLOCK) { + error = EAGAIN; + goto error; + } + error = abstimeout2timo(ts, &t); + if (error) { + goto error; + } + /* Block until queue becomes available */ + error = tsleep(&mq->mq_recv_cv, PCATCH, "mqrecv", t); + if (error || (mqattr->mq_flags & MQ_UNLINK)) { + error = (error == EWOULDBLOCK) ? ETIMEDOUT : error; + goto error; + } + } + KKASSERT(mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg); + + /* + * Insert message into the queue, according to the priority. + * Note the difference between index and priority. + */ + if (__predict_true(msg_prio < MQ_PQSIZE)) { + u_int idx = MQ_PQSIZE - msg_prio; + + KKASSERT(idx != MQ_PQRESQ); + TAILQ_INSERT_TAIL(&mq->mq_head[idx], msg, msg_queue); + mq->mq_bitmap |= (1 << --idx); + } else { + mqueue_linear_insert(mq, msg); + } + + /* Check for the notify */ + if (mqattr->mq_curmsgs == 0 && mq->mq_notify_proc && + (mqattr->mq_flags & MQ_RECEIVE) == 0) { + /* Initialize the signal */ + /*KSI_INIT(&ksi);*/ + /*ksi.ksi_signo = mq->mq_sig_notify.sigev_signo;*/ + /*ksi.ksi_code = SI_MESGQ;*/ + /*ksi.ksi_value = mq->mq_sig_notify.sigev_value;*/ + /* Unregister the process */ + notify = mq->mq_notify_proc; + mq->mq_notify_proc = NULL; + } + + /* Increment the counter and signal waiter, if any */ + mqattr->mq_curmsgs++; + wakeup_one(&mq->mq_send_cv); + + /* Ready for receiving now */ + selwakeup(&mq->mq_rsel); +error: + /*mutex_exit(&mq->mq_mtx);*/ + lockmgr(&mq->mq_mtx, LK_RELEASE); + fdrop(fp); + + if (error) { + mqueue_freemsg(msg, size); + } else if (notify) { + /* Send the notify, if needed */ + spin_lock_wr(&allproc_spin); + /*kpsignal(notify, &ksi, NULL);*/ + ksignal(notify, mq->mq_sig_notify.sigev_signo); + spin_unlock_wr(&allproc_spin); + } + + return error; +} + +int +sys_mq_send(struct mq_send_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(const char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned) msg_prio; + } */ + + return mq_send1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), 0); +} + +int +sys_mq_timedsend(struct mq_timedsend_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(const char *) msg_ptr; + syscallarg(size_t) msg_len; + syscallarg(unsigned) msg_prio; + syscallarg(const struct timespec *) abs_timeout; + } */ + struct timespec ts, *tsp; + int error; + + /* Get and convert time value */ + if (SCARG(uap, abs_timeout)) { + error = copyin(SCARG(uap, abs_timeout), &ts, sizeof(ts)); + if (error) + return error; + tsp = &ts; + } else { + tsp = NULL; + } + + return mq_send1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), + SCARG(uap, msg_len), SCARG(uap, msg_prio), tsp); +} + +int +sys_mq_notify(struct mq_notify_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(const struct sigevent *) notification; + } */ + file_t *fp = NULL; + struct mqueue *mq; + struct sigevent sig; + int error; + + if (SCARG(uap, notification)) { + /* Get the signal from user-space */ + error = copyin(SCARG(uap, notification), &sig, + sizeof(struct sigevent)); + if (error) + return error; + } + + error = mqueue_get(curthread->td_lwp, SCARG(uap, mqdes), &fp); + if (error) + return error; + mq = fp->f_data; + + if (SCARG(uap, notification)) { + /* Register notification: set the signal and target process */ + if (mq->mq_notify_proc == NULL) { + memcpy(&mq->mq_sig_notify, &sig, + sizeof(struct sigevent)); + mq->mq_notify_proc = curproc; + } else { + /* Fail if someone else already registered */ + error = EBUSY; + } + } else { + /* Unregister the notification */ + mq->mq_notify_proc = NULL; + } + lockmgr(&mq->mq_mtx, LK_RELEASE); + fdrop(fp); + + return error; +} + +int +sys_mq_getattr(struct mq_getattr_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(struct mq_attr *) mqstat; + } */ + file_t *fp = NULL; + struct mqueue *mq; + struct mq_attr attr; + int error; + + /* Get the message queue */ + error = mqueue_get(/*l*/curthread->td_lwp, SCARG(uap, mqdes), &fp); + if (error) + return error; + mq = fp->f_data; + memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr)); + lockmgr(&mq->mq_mtx, LK_RELEASE); + fdrop(fp); + + return copyout(&attr, SCARG(uap, mqstat), sizeof(struct mq_attr)); +} + +int +sys_mq_setattr(struct mq_setattr_args *uap) +{ + /* { + syscallarg(mqd_t) mqdes; + syscallarg(const struct mq_attr *) mqstat; + syscallarg(struct mq_attr *) omqstat; + } */ + file_t *fp = NULL; + struct mqueue *mq; + struct mq_attr attr; + int error, nonblock; + + error = copyin(SCARG(uap, mqstat), &attr, sizeof(struct mq_attr)); + if (error) + return error; + nonblock = (attr.mq_flags & O_NONBLOCK); + + /* Get the message queue */ + error = mqueue_get(/*l*/curthread->td_lwp, SCARG(uap, mqdes), &fp); + if (error) + return error; + mq = fp->f_data; + + /* Copy the old attributes, if needed */ + if (SCARG(uap, omqstat)) + memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr)); + + /* Ignore everything, except O_NONBLOCK */ + if (nonblock) + mq->mq_attrib.mq_flags |= O_NONBLOCK; + else + mq->mq_attrib.mq_flags &= ~O_NONBLOCK; + + lockmgr(&mq->mq_mtx, LK_RELEASE); + fdrop(fp); + + /* + * Copy the data to the user-space. + * Note: According to POSIX, the new attributes should not be set in + * case of fail - this would be violated. + */ + if (SCARG(uap, omqstat)) + error = copyout(&attr, SCARG(uap, omqstat), + sizeof(struct mq_attr)); + + return error; +} + +int +sys_mq_unlink(struct mq_unlink_args *uap) +{ + /* { + syscallarg(const char *) name; + } */ + struct mqueue *mq; + char *name; + int error, refcnt = 0; + + /* Get the name from the user-space */ + name = kmalloc(MQ_NAMELEN, M_MQBUF, M_WAITOK | M_ZERO); + error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL); + if (error) { + kfree(name, M_MQBUF); + return error; + } + + /* Lookup for this file */ + lockmgr(&mqlist_mtx, LK_EXCLUSIVE); + mq = mqueue_lookup(name); + if (mq == NULL) { + error = ENOENT; + goto error; + } + + /* Check the permissions */ + if (curproc->p_ucred->cr_uid != mq->mq_euid && + priv_check(curthread, PRIV_ROOT) != 0) { + lockmgr(&mq->mq_mtx, LK_RELEASE); + error = EACCES; + goto error; + } + + /* Mark message queue as unlinking, before leaving the window */ + mq->mq_attrib.mq_flags |= MQ_UNLINK; + + /* Wake up all waiters, if there are such */ + wakeup(&mq->mq_send_cv); + wakeup(&mq->mq_recv_cv); + + selwakeup(&mq->mq_rsel); + selwakeup(&mq->mq_wsel); + + refcnt = mq->mq_refcnt; + if (refcnt == 0) + LIST_REMOVE(mq, mq_list); + + lockmgr(&mq->mq_mtx, LK_RELEASE); +error: + lockmgr(&mqlist_mtx, LK_RELEASE); + + /* + * If there are no references - destroy the message + * queue, otherwise, the last mq_close() will do that. + */ + if (error == 0 && refcnt == 0) + mqueue_destroy(mq); + + kfree(name, M_MQBUF); + return error; +} + +/* + * SysCtl. + */ +SYSCTL_NODE(_kern, OID_AUTO, mqueue, + CTLFLAG_RW, 0, "Message queue options"); + +SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_open_max, + CTLFLAG_RW, &mq_open_max, 0, + "Maximal number of message queue descriptors per process"); + +SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_prio_max, + CTLFLAG_RW, &mq_prio_max, 0, + "Maximal priority of the message"); + +SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_max_msgsize, + CTLFLAG_RW, &mq_max_msgsize, 0, + "Maximal allowed size of the message"); + +SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_def_maxmsg, + CTLFLAG_RW, &mq_def_maxmsg, 0, + "Default maximal message count"); + +SYSINIT(sys_mqueue_init, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, mqueue_sysinit, NULL); diff --git a/sys/sys/file.h b/sys/sys/file.h index 047bb3d4ff..56ad4e20c2 100644 --- a/sys/sys/file.h +++ b/sys/sys/file.h @@ -129,6 +129,7 @@ struct file { #define DTYPE_KQUEUE 5 /* event queue */ #define DTYPE_CRYPTO 6 /* crypto */ #define DTYPE_SYSLINK 7 /* syslink */ +#define DTYPE_MQUEUE 8 /* message queue */ LIST_HEAD(filelist, file); diff --git a/sys/sys/mqueue.h b/sys/sys/mqueue.h index eb737525cd..d62f033638 100644 --- a/sys/sys/mqueue.h +++ b/sys/sys/mqueue.h @@ -1,12 +1,9 @@ -#ifndef _MQUEUE_H_ -#define _MQUEUE_H_ +/* $NetBSD: mqueue.h,v 1.7 2009/04/11 15:47:34 christos Exp $ */ -/* mqueue.h: POSIX 1003.1b Message Queues */ - -/*- - * Copyright (c) 1996, 1997 - * HD Associates, Inc. All rights reserved. - * +/* + * Copyright (c) 2007, Mindaugas Rasiukevicius + * All rights reserved. + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: @@ -15,17 +12,11 @@ * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. - * 3. All advertising materials mentioning features or use of this software - * must display the following acknowledgement: - * This product includes software developed by HD Associates, Inc - * 4. Neither the name of the author nor the names of any co-contributors - * may be used to endorse or promote products derived from this software - * without specific prior written permission. * - * THIS SOFTWARE IS PROVIDED BY HD ASSOCIATES AND CONTRIBUTORS ``AS IS'' AND + * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL HD ASSOCIATES OR CONTRIBUTORS BE LIABLE + * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) @@ -33,46 +24,108 @@ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. - * - * $FreeBSD: src/sys/posix4/mqueue.h,v 1.4 1999/12/29 04:55:02 peter Exp $ - * $DragonFly: src/sys/sys/mqueue.h,v 1.3 2003/08/27 06:30:04 rob Exp $ */ -#include +#ifndef _SYS_MQUEUE_H_ +#define _SYS_MQUEUE_H_ -#ifdef _P1003_1B_INCLUDE_MAYBES -#include -#include -#include -#include -#else -struct sigevent; -#endif +/* Maximal number of mqueue descriptors, that process could open */ +#define MQ_OPEN_MAX 512 -typedef int mqd_t; /* message queue descriptors */ +/* Maximal priority of the message */ +#define MQ_PRIO_MAX 32 struct mq_attr { - long mq_flags; /* message queue flags */ - long mq_maxmsg; /* maximum number of messages */ - long mq_msgsize; /* maximum message size */ - long mq_curmsgs; /* number of messages currently queued */ + long mq_flags; /* Flags of message queue */ + long mq_maxmsg; /* Maximum number of messages */ + long mq_msgsize; /* Maximum size of the message */ + long mq_curmsgs; /* Count of the queued messages */ +}; + +/* Internal kernel data */ +/*#ifdef _KERNEL*/ +#if defined(_KERNEL) || defined(_KERNEL_STRUCTURES) + +/*#include */ +/*#include */ +#include +#include +#include +#include +#include +#include + +/* + * Flags below are used in mq_flags for internal + * purposes, this is appropriate according to POSIX. + */ + +/* Message queue is unlinking */ +#define MQ_UNLINK 0x10000000 +/* There are receive-waiters */ +#define MQ_RECEIVE 0x20000000 + +/* Maximal length of mqueue name */ +#define MQ_NAMELEN (NAME_MAX + 1) + +/* Default size of the message */ +#define MQ_DEF_MSGSIZE 1024 + +/* Size/bits and index of reserved queue */ +#define MQ_PQSIZE 32 +#define MQ_PQRESQ 0 + +/* Structure of the message queue */ +struct mqueue { + char mq_name[MQ_NAMELEN]; + struct lock mq_mtx; + int mq_send_cv; /* Condition variables for tsleep() */ + int mq_recv_cv; + struct mq_attr mq_attrib; + /* Notification */ + struct selinfo mq_rsel; + struct selinfo mq_wsel; + struct sigevent mq_sig_notify; + struct proc * mq_notify_proc; + /* Permissions */ + mode_t mq_mode; + uid_t mq_euid; + gid_t mq_egid; + /* + * Reference counter, heads of the message queue + * + * Every message queue has `MQ_PQSIZE' priority queues that guarantee + * constant-time insertion of messages. If `mq_prio_max' is increased + * via sysctl, then the reserved queue (MQ_PQRESQ) is used which + * performs linear insertion. + */ + u_int mq_refcnt; + TAILQ_HEAD(, mq_msg) mq_head[1 + MQ_PQSIZE]; + uint32_t mq_bitmap; + /* Entry of the global list */ + LIST_ENTRY(mqueue) mq_list; + struct timespec mq_atime; + struct timespec mq_mtime; + struct timespec mq_btime; }; -#ifndef _KERNEL +/* Structure of the message */ +struct mq_msg { + TAILQ_ENTRY(mq_msg) msg_queue; + size_t msg_len; + u_int msg_prio; + int8_t msg_ptr[1]; +}; -#include +/* Prototypes */ +void mqueue_sysinit(void); +void mqueue_print_list(void (*pr)(const char *, ...)); +int abstimeout2timo(struct timespec *, int *); +int mq_send1(struct lwp *, mqd_t, const char *, size_t, unsigned, struct timespec *); +int mq_receive1(struct lwp *, mqd_t, void *, size_t, unsigned *, struct timespec *, + ssize_t *); -__BEGIN_DECLS -mqd_t mq_open (const char *, int oflag, ...); -int mq_close (mqd_t); -int mq_unlink (const char *); -int mq_send (mqd_t, const char *, size_t, unsigned int); -ssize_t mq_receive (mqd_t, char *, size_t, unsigned int *); -int mq_notify (mqd_t, const struct sigevent *); -int mq_setattr (mqd_t, const struct mq_attr *, struct mq_attr *); -int mq_getattr (mqd_t, struct mq_attr *); -__END_DECLS +#endif /* _KERNEL || _KERNEL_STRUCTURES */ -#endif +#endif /* _SYS_MQUEUE_H_ */ -#endif /* _MQUEUE_H_ */ diff --git a/sys/sys/proc.h b/sys/sys/proc.h index 23fe8a85b5..a9634a344d 100644 --- a/sys/sys/proc.h +++ b/sys/sys/proc.h @@ -227,6 +227,7 @@ struct proc { struct filedesc_to_leader *p_fdtol; /* Ptr to tracking node XXX lwp */ struct plimit *p_limit; /* Process limits. */ struct pstats *p_stats; + u_int p_mqueue_cnt; /* Count of open mqueues. */ void *p_pad0; struct sigacts *p_sigacts; #define p_sigignore p_sigacts->ps_sigignore diff --git a/sys/sys/types.h b/sys/sys/types.h index 6f9dfce8c1..400d8367c4 100644 --- a/sys/sys/types.h +++ b/sys/sys/types.h @@ -120,6 +120,7 @@ typedef __uint32_t uid_t; /* user id */ #endif typedef long suseconds_t; /* microseconds (signed) */ typedef __uint32_t useconds_t; /* microseconds (unsigned) */ +typedef int mqd_t; /* message queue descriptor */ #if defined(_KERNEL) || defined(_KERNEL_STRUCTURES) -- 2.41.0