mqueues: Port POSIX message queues from NetBSD.
[dragonfly.git] / sys / kern / sys_mqueue.c
1 /*      $NetBSD: sys_mqueue.c,v 1.16 2009/04/11 23:05:26 christos Exp $ */
2
3 /*
4  * Copyright (c) 2007, 2008 Mindaugas Rasiukevicius <rmind at NetBSD org>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28
29 /*
30  * Implementation of POSIX message queues.
31  * Defined in the Base Definitions volume of IEEE Std 1003.1-2001.
32  *
33  * Locking
34  *
35  * Global list of message queues (mqueue_head) and proc_t::p_mqueue_cnt
36  * counter are protected by mqlist_mtx lock.  The very message queue and
37  * its members are protected by mqueue::mq_mtx.
38  *
39  * Lock order:
40  *      mqlist_mtx
41  *        -> mqueue::mq_mtx
42  */
43
44 #include <sys/cdefs.h>
45 /*__KERNEL_RCSID(0, "$NetBSD: sys_mqueue.c,v 1.16 2009/04/11 23:05:26 christos Exp $");*/
46
47 #include <stdbool.h>
48 #include <sys/param.h>
49 #include <sys/types.h>
50 #include <sys/errno.h>
51 #include <sys/fcntl.h>
52 #include <sys/file.h>
53 #include <sys/filedesc.h>
54 #include <sys/ucred.h>
55 #include <sys/priv.h>
56 #include <sys/kernel.h>
57 #include <sys/malloc.h>
58 #include <sys/mqueue.h>
59 #include <sys/objcache.h>
60 #include <sys/poll.h>
61 #include <sys/proc.h>
62 #include <sys/queue.h>
63 #include <sys/select.h>
64 #include <sys/serialize.h>
65 #include <sys/signal.h>
66 #include <sys/signalvar.h>
67 #include <sys/spinlock.h>
68 #include <sys/spinlock2.h>
69 #include <sys/stat.h>
70 #include <sys/sysctl.h>
71 #include <sys/sysproto.h>
72 #include <sys/systm.h>
73 #include <sys/lock.h>
74 #include <sys/unistd.h>
75 #include <sys/vnode.h>
76
77 /* System-wide limits. */
78 static u_int                    mq_open_max = MQ_OPEN_MAX;
79 static u_int                    mq_prio_max = MQ_PRIO_MAX;
80 static u_int                    mq_max_msgsize = 16 * MQ_DEF_MSGSIZE;
81 static u_int                    mq_def_maxmsg = 32;
82
83 struct lock                     mqlist_mtx;
84 static struct objcache *        mqmsg_cache;
85 static LIST_HEAD(, mqueue)      mqueue_head =
86         LIST_HEAD_INITIALIZER(mqueue_head);
87
88 typedef struct  file file_t;    /* XXX: Should we put this in sys/types.h ? */
89
90 /* Function prototypes */
91 static int      mq_poll_fop(file_t *, int, struct ucred *cred);
92 static int      mq_stat_fop(file_t *, struct stat *, struct ucred *cred);
93 static int      mq_close_fop(file_t *);
94
95 /* Some time-related utility functions */
96 static int      itimespecfix(struct timespec *ts);
97 static int      tstohz(const struct timespec *ts);
98
99 /* File operations vector */
100 static struct fileops mqops = {
101         .fo_read = badfo_readwrite,
102         .fo_write = badfo_readwrite,
103         .fo_ioctl = badfo_ioctl,
104         .fo_poll = mq_poll_fop,
105         .fo_stat = mq_stat_fop,
106         .fo_close = mq_close_fop,
107         .fo_kqfilter = badfo_kqfilter,
108         .fo_shutdown = badfo_shutdown
109 };
110
111 /* Define a new malloc type for message queues */
112 MALLOC_DECLARE(M_MQBUF);
113 MALLOC_DEFINE(M_MQBUF, "mqueues", "Buffers to message queues");
114
115 /* Malloc arguments for object cache */
116 struct objcache_malloc_args mqueue_malloc_args = {
117         sizeof(struct mqueue), M_MQBUF };
118
119 /* Spinlock around the process list */
120 extern struct spinlock allproc_spin;
121
122 /*
123  * Initialize POSIX message queue subsystem.
124  */
125 void
126 mqueue_sysinit(void)
127 {
128         mqmsg_cache = objcache_create("mqmsg_cache",
129             0,          /* infinite depot's capacity */
130             0,          /* default magazine's capacity */
131             NULL,       /* constructor */
132             NULL,       /* deconstructor */
133             NULL,
134             objcache_malloc_alloc,
135             objcache_malloc_free,
136             &mqueue_malloc_args);
137
138         lockinit(&mqlist_mtx, "mqlist_mtx", 0, LK_CANRECURSE);
139 }
140
141 /*
142  * Free the message.
143  */
144 static void
145 mqueue_freemsg(struct mq_msg *msg, const size_t size)
146 {
147
148         if (size > MQ_DEF_MSGSIZE)
149                 kfree(msg, M_MQBUF);
150         else
151                 objcache_put(mqmsg_cache, msg);
152 }
153
154 /*
155  * Destroy the message queue.
156  */
157 static void
158 mqueue_destroy(struct mqueue *mq)
159 {
160         struct mq_msg *msg;
161         size_t msz;
162         u_int i;
163
164         /* Note MQ_PQSIZE + 1. */
165         for (i = 0; i < MQ_PQSIZE + 1; i++) {
166                 while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) {
167                         TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue);
168                         msz = sizeof(struct mq_msg) + msg->msg_len;
169                         mqueue_freemsg(msg, msz);
170                 }
171         }
172         lockuninit(&mq->mq_mtx);
173         kfree(mq, M_MQBUF);
174 }
175
176 /*
177  * Lookup for file name in general list of message queues.
178  *  => locks the message queue
179  */
180 static void *
181 mqueue_lookup(char *name)
182 {
183         struct mqueue *mq;
184
185         KKASSERT(lockstatus(&mqlist_mtx, curthread));
186
187         LIST_FOREACH(mq, &mqueue_head, mq_list) {
188                 if (strncmp(mq->mq_name, name, MQ_NAMELEN) == 0) {
189                         lockmgr(&mq->mq_mtx, LK_EXCLUSIVE);
190                         return mq;
191                 }
192         }
193
194         return NULL;
195 }
196
197 /*
198  * mqueue_get: get the mqueue from the descriptor.
199  *  => locks the message queue, if found.
200  *  => hold a reference on the file descriptor.
201  */
202 static int
203 mqueue_get(struct lwp *l, mqd_t mqd, file_t **fpr)
204 {
205         struct mqueue *mq;
206         file_t *fp;
207
208         fp = holdfp(curproc->p_fd, (int)mqd, -1);       /* XXX: Why -1 ? */
209         if (__predict_false(fp == NULL))
210                 return EBADF;
211
212         if (__predict_false(fp->f_type != DTYPE_MQUEUE)) {
213                 fdrop(fp);
214                 return EBADF;
215         }
216         mq = fp->f_data;
217         lockmgr(&mq->mq_mtx, LK_EXCLUSIVE);
218
219         *fpr = fp;
220         return 0;
221 }
222
223 /*
224  * mqueue_linear_insert: perform linear insert according to the message
225  * priority into the reserved queue (MQ_PQRESQ).  Reserved queue is a
226  * sorted list used only when mq_prio_max is increased via sysctl.
227  */
228 static inline void
229 mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg)
230 {
231         struct mq_msg *mit;
232
233         TAILQ_FOREACH(mit, &mq->mq_head[MQ_PQRESQ], msg_queue) {
234                 if (msg->msg_prio > mit->msg_prio)
235                         break;
236         }
237         if (mit == NULL) {
238                 TAILQ_INSERT_TAIL(&mq->mq_head[MQ_PQRESQ], msg, msg_queue);
239         } else {
240                 TAILQ_INSERT_BEFORE(mit, msg, msg_queue);
241         }
242 }
243
244 /*
245  * Validate input.
246  */
247 int
248 itimespecfix(struct timespec *ts)
249 {
250         if (ts->tv_sec < 0 || ts->tv_nsec < 0 || ts->tv_nsec >= 1000000000)
251                 return (EINVAL);
252         if (ts->tv_sec == 0 && ts->tv_nsec != 0 && ts->tv_nsec < tick * 1000)
253                 ts->tv_nsec = tick * 1000;
254         return (0);
255 }
256
257 /*
258  * Compute number of ticks in the specified amount of time.
259  */
260 int
261 tstohz(const struct timespec *ts)
262 {
263         struct timeval tv;
264
265         /*
266          * usec has great enough resolution for hz, so convert to a
267          * timeval and use tvtohz() above.
268          */
269         TIMESPEC_TO_TIMEVAL(&tv, ts);
270         return tvtohz_high(&tv);        /* XXX Why _high() and not _low() ? */
271 }
272
273 /*
274  * Converter from struct timespec to the ticks.
275  * Used by mq_timedreceive(), mq_timedsend().
276  */
277 int
278 abstimeout2timo(struct timespec *ts, int *timo)
279 {
280         struct timespec tsd;
281         int error;
282
283         getnanotime(&tsd);
284         timespecsub(ts, &tsd);
285         if (ts->tv_sec < 0 || (ts->tv_sec == 0 && ts->tv_nsec <= 0)) {
286                 return ETIMEDOUT;
287         }
288         error = itimespecfix(ts);
289         if (error) {
290                 return error;
291         }
292         *timo = tstohz(ts);
293         KKASSERT(*timo != 0);
294
295         return 0;
296 }
297
298 static int
299 mq_stat_fop(file_t *fp, struct stat *st, struct ucred *cred)
300 {
301         struct mqueue *mq = fp->f_data;
302
303         (void)memset(st, 0, sizeof(*st));
304
305         lockmgr(&mq->mq_mtx, LK_EXCLUSIVE);
306         st->st_mode = mq->mq_mode;
307         st->st_uid = mq->mq_euid;
308         st->st_gid = mq->mq_egid;
309         st->st_atimespec = mq->mq_atime;
310         st->st_mtimespec = mq->mq_mtime;
311         /*st->st_ctimespec = st->st_birthtimespec = mq->mq_btime;*/
312         st->st_uid = fp->f_cred->cr_uid;
313         st->st_gid = fp->f_cred->cr_svgid;
314         lockmgr(&mq->mq_mtx, LK_RELEASE);
315
316         return 0;
317 }
318
319 static int
320 mq_poll_fop(file_t *fp, int events, struct ucred *cred)
321 {
322         struct mqueue *mq = fp->f_data;
323         int revents = 0;
324
325         lockmgr(&mq->mq_mtx, LK_EXCLUSIVE);
326         if (events & (POLLIN | POLLRDNORM)) {
327                 /* Ready for receiving, if there are messages in the queue */
328                 if (mq->mq_attrib.mq_curmsgs)
329                         revents |= (POLLIN | POLLRDNORM);
330                 else
331                         selrecord(curthread, &mq->mq_rsel);
332         }
333         if (events & (POLLOUT | POLLWRNORM)) {
334                 /* Ready for sending, if the message queue is not full */
335                 if (mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg)
336                         revents |= (POLLOUT | POLLWRNORM);
337                 else
338                         selrecord(curthread, &mq->mq_wsel);
339         }
340         lockmgr(&mq->mq_mtx, LK_RELEASE);
341
342         return revents;
343 }
344
345 static int
346 mq_close_fop(file_t *fp)
347 {
348         struct proc *p = curproc;
349         struct mqueue *mq = fp->f_data;
350         bool destroy;
351
352         lockmgr(&mqlist_mtx, LK_EXCLUSIVE);
353         lockmgr(&mq->mq_mtx, LK_EXCLUSIVE);
354
355         /* Decrease the counters */
356         p->p_mqueue_cnt--;
357         mq->mq_refcnt--;
358
359         /* Remove notification if registered for this process */
360         if (mq->mq_notify_proc == p)
361                 mq->mq_notify_proc = NULL;
362
363         /*
364          * If this is the last reference and mqueue is marked for unlink,
365          * remove and later destroy the message queue.
366          */
367         if (mq->mq_refcnt == 0 && (mq->mq_attrib.mq_flags & MQ_UNLINK)) {
368                 LIST_REMOVE(mq, mq_list);
369                 destroy = true;
370         } else
371                 destroy = false;
372
373         lockmgr(&mq->mq_mtx, LK_RELEASE);
374         lockmgr(&mqlist_mtx, LK_RELEASE);
375
376         if (destroy)
377                 mqueue_destroy(mq);
378
379         return 0;
380 }
381
382 /*
383  * General mqueue system calls.
384  */
385
386 int
387 sys_mq_open(struct mq_open_args *uap)
388 {
389         /* {
390                 syscallarg(const char *) name;
391                 syscallarg(int) oflag;
392                 syscallarg(mode_t) mode;
393                 syscallarg(struct mq_attr) attr;
394         } */
395         struct proc *p = curproc;
396         struct mqueue *mq, *mq_new = NULL;
397         file_t *fp;
398         char *name;
399         int mqd, error, oflag;
400
401         /* Check access mode flags */
402         oflag = SCARG(uap, oflag);
403         if ((oflag & O_ACCMODE) == (O_WRONLY | O_RDWR)) {
404                 return EINVAL;
405         }
406
407         /* Get the name from the user-space */
408         name = kmalloc(MQ_NAMELEN, M_MQBUF, M_WAITOK | M_ZERO);
409         error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL);
410         if (error) {
411                 kfree(name, M_MQBUF);
412                 return error;
413         }
414
415         if (oflag & O_CREAT) {
416                 struct mq_attr attr;
417                 u_int i;
418
419                 /* Check the limit */
420                 if (p->p_mqueue_cnt == mq_open_max) {
421                         kfree(name, M_MQBUF);
422                         return EMFILE;
423                 }
424
425                 /* Empty name is invalid */
426                 if (name[0] == '\0') {
427                         kfree(name, M_MQBUF);
428                         return EINVAL;
429                 }
430
431                 /* Check for mqueue attributes */
432                 if (SCARG(uap, attr)) {
433                         error = copyin(SCARG(uap, attr), &attr,
434                                 sizeof(struct mq_attr));
435                         if (error) {
436                                 kfree(name, M_MQBUF);
437                                 return error;
438                         }
439                         if (attr.mq_maxmsg <= 0 || attr.mq_msgsize <= 0 ||
440                             attr.mq_msgsize > mq_max_msgsize) {
441                                 kfree(name, M_MQBUF);
442                                 return EINVAL;
443                         }
444                         attr.mq_curmsgs = 0;
445                 } else {
446                         memset(&attr, 0, sizeof(struct mq_attr));
447                         attr.mq_maxmsg = mq_def_maxmsg;
448                         attr.mq_msgsize =
449                             MQ_DEF_MSGSIZE - sizeof(struct mq_msg);
450                 }
451
452                 /*
453                  * Allocate new mqueue, initialize data structures,
454                  * copy the name, attributes and set the flag.
455                  */
456                 mq_new = kmalloc(sizeof(struct mqueue), M_MQBUF, M_WAITOK | M_ZERO);
457
458                 lockinit(&mq_new->mq_mtx, "mq_new->mq_mtx", 0, LK_CANRECURSE);
459                 for (i = 0; i < (MQ_PQSIZE + 1); i++) {
460                         TAILQ_INIT(&mq_new->mq_head[i]);
461                 }
462
463                 strlcpy(mq_new->mq_name, name, MQ_NAMELEN);
464                 memcpy(&mq_new->mq_attrib, &attr, sizeof(struct mq_attr));
465
466                 /*CTASSERT((O_MASK & (MQ_UNLINK | MQ_RECEIVE)) == 0);*/
467                 /* mq_new->mq_attrib.mq_flags = (O_MASK & oflag); */
468                 mq_new->mq_attrib.mq_flags = oflag;
469
470                 /* Store mode and effective UID with GID */
471                 mq_new->mq_mode = ((SCARG(uap, mode) &
472                     ~p->p_fd->fd_cmask) & ALLPERMS) & ~S_ISTXT;
473                 mq_new->mq_euid = curproc->p_ucred->cr_uid;
474                 mq_new->mq_egid = curproc->p_ucred->cr_svgid;
475         }
476
477         /* Allocate file structure and descriptor */
478         error = falloc(curproc, &fp, &mqd);
479         if (error) {
480                 if (mq_new)
481                         mqueue_destroy(mq_new);
482                 kfree(name, M_MQBUF);
483                 return error;
484         }
485         fp->f_type = DTYPE_MQUEUE;
486         fp->f_flag = FFLAGS(oflag) & (FREAD | FWRITE);
487         fp->f_ops = &mqops;
488
489         /* Look up for mqueue with such name */
490         lockmgr(&mqlist_mtx, LK_EXCLUSIVE);
491         mq = mqueue_lookup(name);
492         if (mq) {
493                 int acc_mode;
494
495                 KKASSERT(lockstatus(&mq->mq_mtx, curthread));
496
497                 /* Check if mqueue is not marked as unlinking */
498                 if (mq->mq_attrib.mq_flags & MQ_UNLINK) {
499                         error = EACCES;
500                         goto exit;
501                 }
502                 /* Fail if O_EXCL is set, and mqueue already exists */
503                 if ((oflag & O_CREAT) && (oflag & O_EXCL)) {
504                         error = EEXIST;
505                         goto exit;
506                 }
507
508                 /*
509                  * Check the permissions. Note the difference between
510                  * VREAD/VWRITE and FREAD/FWRITE.
511                  */
512                 acc_mode = 0;
513                 if (fp->f_flag & FREAD) {
514                         acc_mode |= VREAD;
515                 }
516                 if (fp->f_flag & FWRITE) {
517                         acc_mode |= VWRITE;
518                 }
519                 if (vaccess(VNON, mq->mq_mode, mq->mq_euid, mq->mq_egid,
520                         acc_mode, curproc->p_ucred)) {
521
522                         error = EACCES;
523                         goto exit;
524                 }
525         } else {
526                 /* Fail if mqueue neither exists, nor we create it */
527                 if ((oflag & O_CREAT) == 0) {
528                         lockmgr(&mqlist_mtx, LK_RELEASE);
529                         KKASSERT(mq_new == NULL);
530                         fsetfd(curproc, NULL, mqd);
531                         fp->f_ops = &badfileops;
532                         fdrop(fp);
533                         kfree(name, M_MQBUF);
534                         return ENOENT;
535                 }
536
537                 /* Check the limit */
538                 if (p->p_mqueue_cnt == mq_open_max) {
539                         error = EMFILE;
540                         goto exit;
541                 }
542
543                 /* Insert the queue to the list */
544                 mq = mq_new;
545                 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE);
546                 LIST_INSERT_HEAD(&mqueue_head, mq, mq_list);
547                 mq_new = NULL;
548                 getnanotime(&mq->mq_btime);
549                 mq->mq_atime = mq->mq_mtime = mq->mq_btime;
550         }
551
552         /* Increase the counters, and make descriptor ready */
553         p->p_mqueue_cnt++;
554         mq->mq_refcnt++;
555         fp->f_data = mq;
556 exit:
557         lockmgr(&mq->mq_mtx, LK_RELEASE);
558         lockmgr(&mqlist_mtx, LK_RELEASE);
559
560         if (mq_new)
561                 mqueue_destroy(mq_new);
562         if (error) {
563                 fsetfd(curproc, NULL, mqd);
564                 fp->f_ops = &badfileops;
565         } else {
566                 fsetfd(p, fp, mqd);
567                 uap->sysmsg_result = mqd;
568         }
569         fdrop(fp);
570         kfree(name, M_MQBUF);
571
572         return error;
573 }
574
575 int
576 sys_mq_close(struct mq_close_args *uap)
577 {
578         return sys_close((void *)uap);
579 }
580
581 /*
582  * Primary mq_receive1() function.
583  */
584 int
585 mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len,
586     unsigned *msg_prio, struct timespec *ts, ssize_t *mlen)
587 {
588         file_t *fp = NULL;
589         struct mqueue *mq;
590         struct mq_msg *msg = NULL;
591         struct mq_attr *mqattr;
592         u_int idx;
593         int error;
594
595         /* Get the message queue */
596         error = mqueue_get(l, mqdes, &fp);
597         if (error) {
598                 return error;
599         }
600         mq = fp->f_data;
601         if ((fp->f_flag & FREAD) == 0) {
602                 error = EBADF;
603                 goto error;
604         }
605         getnanotime(&mq->mq_atime);
606         mqattr = &mq->mq_attrib;
607
608         /* Check the message size limits */
609         if (msg_len < mqattr->mq_msgsize) {
610                 error = EMSGSIZE;
611                 goto error;
612         }
613
614         /* Check if queue is empty */
615         while (mqattr->mq_curmsgs == 0) {
616                 int t;
617
618                 if (mqattr->mq_flags & O_NONBLOCK) {
619                         error = EAGAIN;
620                         goto error;
621                 }
622                 error = abstimeout2timo(ts, &t);
623                 if (error) {
624                         goto error;
625                 }
626                 /*
627                  * Block until someone sends the message.
628                  * While doing this, notification should not be sent.
629                  */
630                 mqattr->mq_flags |= MQ_RECEIVE;
631                 error = tsleep(&mq->mq_send_cv, PCATCH, "mqsend", t);
632                 mqattr->mq_flags &= ~MQ_RECEIVE;
633                 if (error || (mqattr->mq_flags & MQ_UNLINK)) {
634                         error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR;
635                         goto error;
636                 }
637         }
638
639
640         /*
641          * Find the highest priority message, and remove it from the queue.
642          * At first, reserved queue is checked, bitmap is next.
643          */
644         msg = TAILQ_FIRST(&mq->mq_head[MQ_PQRESQ]);
645         if (__predict_true(msg == NULL)) {
646                 idx = ffs(mq->mq_bitmap);
647                 msg = TAILQ_FIRST(&mq->mq_head[idx]);
648                 KKASSERT(msg != NULL);
649         } else {
650                 idx = MQ_PQRESQ;
651         }
652         TAILQ_REMOVE(&mq->mq_head[idx], msg, msg_queue);
653
654         /* Unmark the bit, if last message. */
655         if (__predict_true(idx) && TAILQ_EMPTY(&mq->mq_head[idx])) {
656                 KKASSERT((MQ_PQSIZE - idx) == msg->msg_prio);
657                 mq->mq_bitmap &= ~(1 << --idx);
658         }
659
660         /* Decrement the counter and signal waiter, if any */
661         mqattr->mq_curmsgs--;
662         wakeup_one(&mq->mq_recv_cv);
663
664         /* Ready for sending now */
665         selwakeup(&mq->mq_wsel);
666 error:
667         lockmgr(&mq->mq_mtx, LK_RELEASE);
668         fdrop(fp);
669         if (error)
670                 return error;
671
672         /*
673          * Copy the data to the user-space.
674          * Note: According to POSIX, no message should be removed from the
675          * queue in case of fail - this would be violated.
676          */
677         *mlen = msg->msg_len;
678         error = copyout(msg->msg_ptr, msg_ptr, msg->msg_len);
679         if (error == 0 && msg_prio)
680                 error = copyout(&msg->msg_prio, msg_prio, sizeof(unsigned));
681         mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len);
682
683         return error;
684 }
685
686 int
687 sys_mq_receive(struct mq_receive_args *uap)
688 {
689         /* {
690                 syscallarg(mqd_t) mqdes;
691                 syscallarg(char *) msg_ptr;
692                 syscallarg(size_t) msg_len;
693                 syscallarg(unsigned *) msg_prio;
694         } */
695         ssize_t mlen;
696         int error;
697
698         error = mq_receive1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
699             SCARG(uap, msg_len), SCARG(uap, msg_prio), 0, &mlen);
700         if (error == 0)
701                 uap->sysmsg_result = mlen;
702
703         return error;
704 }
705
706 int
707 sys_mq_timedreceive(struct mq_timedreceive_args *uap)
708 {
709         /* {
710                 syscallarg(mqd_t) mqdes;
711                 syscallarg(char *) msg_ptr;
712                 syscallarg(size_t) msg_len;
713                 syscallarg(unsigned *) msg_prio;
714                 syscallarg(const struct timespec *) abs_timeout;
715         } */
716         int error;
717         ssize_t mlen;
718         struct timespec ts, *tsp;
719
720         /* Get and convert time value */
721         if (SCARG(uap, abs_timeout)) {
722                 error = copyin(SCARG(uap, abs_timeout), &ts, sizeof(ts));
723                 if (error)
724                         return error;
725                 tsp = &ts;
726         } else {
727                 tsp = NULL;
728         }
729
730         error = mq_receive1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
731             SCARG(uap, msg_len), SCARG(uap, msg_prio), tsp, &mlen);
732         if (error == 0)
733                 uap->sysmsg_result = mlen;
734
735         return error;
736 }
737
738 /*
739  * Primary mq_send1() function.
740  */
741 int
742 mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len,
743     unsigned msg_prio, struct timespec *ts)
744 {
745         file_t *fp = NULL;
746         struct mqueue *mq;
747         struct mq_msg *msg;
748         struct mq_attr *mqattr;
749         struct proc *notify = NULL;
750         /*ksiginfo_t ksi;*/
751         size_t size;
752         int error;
753
754         /* Check the priority range */
755         if (msg_prio >= mq_prio_max)
756                 return EINVAL;
757
758         /* Allocate a new message */
759         size = sizeof(struct mq_msg) + msg_len;
760         if (size > mq_max_msgsize)
761                 return EMSGSIZE;
762
763         if (size > MQ_DEF_MSGSIZE) {
764                 msg = kmalloc(size, M_MQBUF, M_WAITOK);
765         } else {
766                 msg = objcache_get(mqmsg_cache, M_WAITOK);
767         }
768
769         /* Get the data from user-space */
770         error = copyin(msg_ptr, msg->msg_ptr, msg_len);
771         if (error) {
772                 mqueue_freemsg(msg, size);
773                 return error;
774         }
775         msg->msg_len = msg_len;
776         msg->msg_prio = msg_prio;
777
778         /* Get the mqueue */
779         error = mqueue_get(l, mqdes, &fp);
780         if (error) {
781                 mqueue_freemsg(msg, size);
782                 return error;
783         }
784         mq = fp->f_data;
785         if ((fp->f_flag & FWRITE) == 0) {
786                 error = EBADF;
787                 goto error;
788         }
789         getnanotime(&mq->mq_mtime);
790         mqattr = &mq->mq_attrib;
791
792         /* Check the message size limit */
793         if (msg_len <= 0 || msg_len > mqattr->mq_msgsize) {
794                 error = EMSGSIZE;
795                 goto error;
796         }
797
798         /* Check if queue is full */
799         while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) {
800                 int t;
801
802                 if (mqattr->mq_flags & O_NONBLOCK) {
803                         error = EAGAIN;
804                         goto error;
805                 }
806                 error = abstimeout2timo(ts, &t);
807                 if (error) {
808                         goto error;
809                 }
810                 /* Block until queue becomes available */
811                 error = tsleep(&mq->mq_recv_cv, PCATCH, "mqrecv", t);
812                 if (error || (mqattr->mq_flags & MQ_UNLINK)) {
813                         error = (error == EWOULDBLOCK) ? ETIMEDOUT : error;
814                         goto error;
815                 }
816         }
817         KKASSERT(mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg);
818
819         /*
820          * Insert message into the queue, according to the priority.
821          * Note the difference between index and priority.
822          */
823         if (__predict_true(msg_prio < MQ_PQSIZE)) {
824                 u_int idx = MQ_PQSIZE - msg_prio;
825
826                 KKASSERT(idx != MQ_PQRESQ);
827                 TAILQ_INSERT_TAIL(&mq->mq_head[idx], msg, msg_queue);
828                 mq->mq_bitmap |= (1 << --idx);
829         } else {
830                 mqueue_linear_insert(mq, msg);
831         }
832
833         /* Check for the notify */
834         if (mqattr->mq_curmsgs == 0 && mq->mq_notify_proc &&
835             (mqattr->mq_flags & MQ_RECEIVE) == 0) {
836                 /* Initialize the signal */
837                 /*KSI_INIT(&ksi);*/
838                 /*ksi.ksi_signo = mq->mq_sig_notify.sigev_signo;*/
839                 /*ksi.ksi_code = SI_MESGQ;*/
840                 /*ksi.ksi_value = mq->mq_sig_notify.sigev_value;*/
841                 /* Unregister the process */
842                 notify = mq->mq_notify_proc;
843                 mq->mq_notify_proc = NULL;
844         }
845
846         /* Increment the counter and signal waiter, if any */
847         mqattr->mq_curmsgs++;
848         wakeup_one(&mq->mq_send_cv);
849
850         /* Ready for receiving now */
851         selwakeup(&mq->mq_rsel);
852 error:
853         /*mutex_exit(&mq->mq_mtx);*/
854         lockmgr(&mq->mq_mtx, LK_RELEASE);
855         fdrop(fp);
856
857         if (error) {
858                 mqueue_freemsg(msg, size);
859         } else if (notify) {
860                 /* Send the notify, if needed */
861                 spin_lock_wr(&allproc_spin);
862                 /*kpsignal(notify, &ksi, NULL);*/
863                 ksignal(notify, mq->mq_sig_notify.sigev_signo);
864                 spin_unlock_wr(&allproc_spin);
865         }
866
867         return error;
868 }
869
870 int
871 sys_mq_send(struct mq_send_args *uap)
872 {
873         /* {
874                 syscallarg(mqd_t) mqdes;
875                 syscallarg(const char *) msg_ptr;
876                 syscallarg(size_t) msg_len;
877                 syscallarg(unsigned) msg_prio;
878         } */
879
880         return mq_send1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
881             SCARG(uap, msg_len), SCARG(uap, msg_prio), 0);
882 }
883
884 int
885 sys_mq_timedsend(struct mq_timedsend_args *uap)
886 {
887         /* {
888                 syscallarg(mqd_t) mqdes;
889                 syscallarg(const char *) msg_ptr;
890                 syscallarg(size_t) msg_len;
891                 syscallarg(unsigned) msg_prio;
892                 syscallarg(const struct timespec *) abs_timeout;
893         } */
894         struct timespec ts, *tsp;
895         int error;
896
897         /* Get and convert time value */
898         if (SCARG(uap, abs_timeout)) {
899                 error = copyin(SCARG(uap, abs_timeout), &ts, sizeof(ts));
900                 if (error)
901                         return error;
902                 tsp = &ts;
903         } else {
904                 tsp = NULL;
905         }
906
907         return mq_send1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr),
908             SCARG(uap, msg_len), SCARG(uap, msg_prio), tsp);
909 }
910
911 int
912 sys_mq_notify(struct mq_notify_args *uap)
913 {
914         /* {
915                 syscallarg(mqd_t) mqdes;
916                 syscallarg(const struct sigevent *) notification;
917         } */
918         file_t *fp = NULL;
919         struct mqueue *mq;
920         struct sigevent sig;
921         int error;
922
923         if (SCARG(uap, notification)) {
924                 /* Get the signal from user-space */
925                 error = copyin(SCARG(uap, notification), &sig,
926                     sizeof(struct sigevent));
927                 if (error)
928                         return error;
929         }
930
931         error = mqueue_get(curthread->td_lwp, SCARG(uap, mqdes), &fp);
932         if (error)
933                 return error;
934         mq = fp->f_data;
935
936         if (SCARG(uap, notification)) {
937                 /* Register notification: set the signal and target process */
938                 if (mq->mq_notify_proc == NULL) {
939                         memcpy(&mq->mq_sig_notify, &sig,
940                             sizeof(struct sigevent));
941                         mq->mq_notify_proc = curproc;
942                 } else {
943                         /* Fail if someone else already registered */
944                         error = EBUSY;
945                 }
946         } else {
947                 /* Unregister the notification */
948                 mq->mq_notify_proc = NULL;
949         }
950         lockmgr(&mq->mq_mtx, LK_RELEASE);
951         fdrop(fp);
952
953         return error;
954 }
955
956 int
957 sys_mq_getattr(struct mq_getattr_args *uap)
958 {
959         /* {
960                 syscallarg(mqd_t) mqdes;
961                 syscallarg(struct mq_attr *) mqstat;
962         } */
963         file_t *fp = NULL;
964         struct mqueue *mq;
965         struct mq_attr attr;
966         int error;
967
968         /* Get the message queue */
969         error = mqueue_get(/*l*/curthread->td_lwp, SCARG(uap, mqdes), &fp);
970         if (error)
971                 return error;
972         mq = fp->f_data;
973         memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr));
974         lockmgr(&mq->mq_mtx, LK_RELEASE);
975         fdrop(fp);
976
977         return copyout(&attr, SCARG(uap, mqstat), sizeof(struct mq_attr));
978 }
979
980 int
981 sys_mq_setattr(struct mq_setattr_args *uap)
982 {
983         /* {
984                 syscallarg(mqd_t) mqdes;
985                 syscallarg(const struct mq_attr *) mqstat;
986                 syscallarg(struct mq_attr *) omqstat;
987         } */
988         file_t *fp = NULL;
989         struct mqueue *mq;
990         struct mq_attr attr;
991         int error, nonblock;
992
993         error = copyin(SCARG(uap, mqstat), &attr, sizeof(struct mq_attr));
994         if (error)
995                 return error;
996         nonblock = (attr.mq_flags & O_NONBLOCK);
997
998         /* Get the message queue */
999         error = mqueue_get(/*l*/curthread->td_lwp, SCARG(uap, mqdes), &fp);
1000         if (error)
1001                 return error;
1002         mq = fp->f_data;
1003
1004         /* Copy the old attributes, if needed */
1005         if (SCARG(uap, omqstat))
1006                 memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr));
1007
1008         /* Ignore everything, except O_NONBLOCK */
1009         if (nonblock)
1010                 mq->mq_attrib.mq_flags |= O_NONBLOCK;
1011         else
1012                 mq->mq_attrib.mq_flags &= ~O_NONBLOCK;
1013
1014         lockmgr(&mq->mq_mtx, LK_RELEASE);
1015         fdrop(fp);
1016
1017         /*
1018          * Copy the data to the user-space.
1019          * Note: According to POSIX, the new attributes should not be set in
1020          * case of fail - this would be violated.
1021          */
1022         if (SCARG(uap, omqstat))
1023                 error = copyout(&attr, SCARG(uap, omqstat),
1024                     sizeof(struct mq_attr));
1025
1026         return error;
1027 }
1028
1029 int
1030 sys_mq_unlink(struct mq_unlink_args *uap)
1031 {
1032         /* {
1033                 syscallarg(const char *) name;
1034         } */
1035         struct mqueue *mq;
1036         char *name;
1037         int error, refcnt = 0;
1038
1039         /* Get the name from the user-space */
1040         name = kmalloc(MQ_NAMELEN, M_MQBUF, M_WAITOK | M_ZERO);
1041         error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL);
1042         if (error) {
1043                 kfree(name, M_MQBUF);
1044                 return error;
1045         }
1046
1047         /* Lookup for this file */
1048         lockmgr(&mqlist_mtx, LK_EXCLUSIVE);
1049         mq = mqueue_lookup(name);
1050         if (mq == NULL) {
1051                 error = ENOENT;
1052                 goto error;
1053         }
1054
1055         /* Check the permissions */
1056         if (curproc->p_ucred->cr_uid != mq->mq_euid &&
1057             priv_check(curthread, PRIV_ROOT) != 0) {
1058                 lockmgr(&mq->mq_mtx, LK_RELEASE);
1059                 error = EACCES;
1060                 goto error;
1061         }
1062
1063         /* Mark message queue as unlinking, before leaving the window */
1064         mq->mq_attrib.mq_flags |= MQ_UNLINK;
1065
1066         /* Wake up all waiters, if there are such */
1067         wakeup(&mq->mq_send_cv);
1068         wakeup(&mq->mq_recv_cv);
1069
1070         selwakeup(&mq->mq_rsel);
1071         selwakeup(&mq->mq_wsel);
1072
1073         refcnt = mq->mq_refcnt;
1074         if (refcnt == 0)
1075                 LIST_REMOVE(mq, mq_list);
1076
1077         lockmgr(&mq->mq_mtx, LK_RELEASE);
1078 error:
1079         lockmgr(&mqlist_mtx, LK_RELEASE);
1080
1081         /*
1082          * If there are no references - destroy the message
1083          * queue, otherwise, the last mq_close() will do that.
1084          */
1085         if (error == 0 && refcnt == 0)
1086                 mqueue_destroy(mq);
1087
1088         kfree(name, M_MQBUF);
1089         return error;
1090 }
1091
1092 /*
1093  * SysCtl.
1094  */
1095 SYSCTL_NODE(_kern, OID_AUTO, mqueue,
1096     CTLFLAG_RW, 0, "Message queue options");
1097
1098 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_open_max,
1099     CTLFLAG_RW, &mq_open_max, 0,
1100     "Maximal number of message queue descriptors per process");
1101
1102 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_prio_max,
1103     CTLFLAG_RW, &mq_prio_max, 0,
1104     "Maximal priority of the message");
1105
1106 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_max_msgsize,
1107     CTLFLAG_RW, &mq_max_msgsize, 0,
1108     "Maximal allowed size of the message");
1109
1110 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_def_maxmsg,
1111     CTLFLAG_RW, &mq_def_maxmsg, 0,
1112     "Default maximal message count");
1113
1114 SYSINIT(sys_mqueue_init, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, mqueue_sysinit, NULL);