2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.45 2008/03/05 13:03:29 sephe Exp $
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
46 #include <sys/rtprio.h>
47 #include <sys/queue.h>
48 #include <sys/sysctl.h>
49 #include <sys/kthread.h>
50 #include <sys/signalvar.h>
51 #include <sys/signal2.h>
52 #include <machine/cpu.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_kern.h>
58 #include <vm/vm_object.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_pager.h>
62 #include <vm/vm_extern.h>
63 #include <vm/vm_zone.h>
65 #include <sys/thread2.h>
66 #include <sys/msgport2.h>
67 #include <sys/spinlock2.h>
68 #include <sys/serialize.h>
70 #include <machine/stdarg.h>
71 #include <machine/cpufunc.h>
73 #include <machine/smp.h>
76 #include <sys/malloc.h>
77 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
81 #include <sys/stdint.h>
82 #include <libcaps/thread.h>
83 #include <sys/thread.h>
84 #include <sys/msgport.h>
85 #include <sys/errno.h>
86 #include <libcaps/globaldata.h>
87 #include <machine/cpufunc.h>
88 #include <sys/thread2.h>
89 #include <sys/msgport2.h>
95 /************************************************************************
97 ************************************************************************/
102 * Request asynchronous completion and call lwkt_beginmsg(). The
103 * target port can opt to execute the message synchronously or
104 * asynchronously and this function will automatically queue the
105 * response if the target executes the message synchronously.
107 * NOTE: The message is in an indeterminant state until this call
108 * returns. The caller should not mess with it (e.g. try to abort it)
112 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
116 KKASSERT(msg->ms_reply_port != NULL &&
117 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
118 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
119 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
120 lwkt_replymsg(msg, error);
127 * Request asynchronous completion and call lwkt_beginmsg(). The
128 * target port can opt to execute the message synchronously or
129 * asynchronously and this function will automatically queue the
130 * response if the target executes the message synchronously.
133 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
137 KKASSERT(msg->ms_reply_port != NULL &&
138 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
139 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
140 msg->ms_flags |= MSGF_SYNC;
141 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
142 error = lwkt_waitmsg(msg, flags);
144 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
152 * Forward a message received on one port to another port.
155 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
160 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
161 if ((error = port->mp_putport(port, msg)) != EASYNC)
162 lwkt_replymsg(msg, error);
170 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
171 * The caller must ensure that the message will not be both replied AND
172 * destroyed while the abort is in progress.
174 * This function issues a callback which might block!
177 lwkt_abortmsg(lwkt_msg_t msg)
180 * A critical section protects us from reply IPIs on this cpu.
185 * Shortcut the operation if the message has already been returned.
186 * The callback typically constructs a lwkt_msg with the abort request,
187 * issues it synchronously, and waits for completion. The callback
188 * is not required to actually abort the message and the target port,
189 * upon receiving an abort request message generated by the callback
190 * should check whether the original message has already completed or
193 if (msg->ms_flags & MSGF_ABORTABLE) {
194 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
195 msg->ms_abortfn(msg);
200 /************************************************************************
201 * PORT INITIALIZATION API *
202 ************************************************************************/
204 static void *lwkt_thread_getport(lwkt_port_t port);
205 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
206 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
207 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
208 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
210 static void *lwkt_spin_getport(lwkt_port_t port);
211 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
212 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
213 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
214 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
216 static void *lwkt_serialize_getport(lwkt_port_t port);
217 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
218 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
219 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
220 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
222 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
223 static void *lwkt_panic_getport(lwkt_port_t port);
224 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
225 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
226 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
227 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
230 * Core port initialization (internal)
234 _lwkt_initport(lwkt_port_t port,
235 void *(*gportfn)(lwkt_port_t),
236 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
237 int (*wmsgfn)(lwkt_msg_t, int),
238 void *(*wportfn)(lwkt_port_t, int),
239 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
241 bzero(port, sizeof(*port));
242 TAILQ_INIT(&port->mp_msgq);
243 port->mp_getport = gportfn;
244 port->mp_putport = pportfn;
245 port->mp_waitmsg = wmsgfn;
246 port->mp_waitport = wportfn;
247 port->mp_replyport = rportfn;
251 * lwkt_initport_thread()
253 * Initialize a port for use by a particular thread. The port may
254 * only be used by <td>.
257 lwkt_initport_thread(lwkt_port_t port, thread_t td)
263 lwkt_thread_waitport,
264 lwkt_thread_replyport);
269 * lwkt_initport_spin()
271 * Initialize a port for use with descriptors that might be accessed
272 * via multiple LWPs, processes, or threads. Has somewhat more
273 * overhead then thread ports.
276 lwkt_initport_spin(lwkt_port_t port)
283 lwkt_spin_replyport);
284 spin_init(&port->mpu_spin);
288 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
291 lwkt_serialize_getport,
292 lwkt_serialize_putport,
293 lwkt_serialize_waitmsg,
294 lwkt_serialize_waitport,
295 lwkt_serialize_replyport);
296 port->mpu_serialize = slz;
300 * Similar to the standard initport, this function simply marks the message
301 * as being done and does not attempt to return it to an originating port.
304 lwkt_initport_replyonly_null(lwkt_port_t port)
311 lwkt_null_replyport);
315 * Initialize a reply-only port, typically used as a message sink. Such
316 * ports can only be used as a reply port.
319 lwkt_initport_replyonly(lwkt_port_t port,
320 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
322 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
323 lwkt_panic_waitmsg, lwkt_panic_waitport,
328 lwkt_initport_putonly(lwkt_port_t port,
329 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
331 _lwkt_initport(port, lwkt_panic_getport, pportfn,
332 lwkt_panic_waitmsg, lwkt_panic_waitport,
333 lwkt_panic_replyport);
337 lwkt_initport_panic(lwkt_port_t port)
340 lwkt_panic_getport, lwkt_panic_putport,
341 lwkt_panic_waitmsg, lwkt_panic_waitport,
342 lwkt_panic_replyport);
348 * Retrieve the next message from the port's message queue, return NULL
349 * if no messages are pending. The retrieved message will either be a
350 * request or a reply based on the MSGF_REPLY bit.
352 * The calling thread MUST own the port.
357 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
360 * normal case, remove and return the message.
362 TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
363 msg->ms_flags &= ~MSGF_QUEUED;
366 /************************************************************************
367 * THREAD PORT BACKEND *
368 ************************************************************************
370 * This backend is used when the port a message is retrieved from is owned
371 * by a single thread (the calling thread). Messages are IPId to the
372 * correct cpu before being enqueued to a port. Note that this is fairly
373 * optimal since scheduling would have had to do an IPI anyway if the
374 * message were headed to a different cpu.
380 * This function completes reply processing for the default case in the
381 * context of the originating cpu.
385 lwkt_thread_replyport_remote(lwkt_msg_t msg)
387 lwkt_port_t port = msg->ms_reply_port;
390 * Chase any thread migration that occurs
392 if (port->mpu_td->td_gd != mycpu) {
393 lwkt_send_ipiq(port->mpu_td->td_gd,
394 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
402 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
403 msg->ms_flags &= ~MSGF_INTRANSIT;
405 if (msg->ms_flags & MSGF_SYNC) {
406 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
408 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
409 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
411 if (port->mp_flags & MSGPORTF_WAITING)
412 lwkt_schedule(port->mpu_td);
418 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
420 * Called with the reply port as an argument but in the context of the
421 * original target port. Completion must occur on the target port's
424 * The critical section protects us from IPIs on the this CPU.
427 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
429 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
431 if (msg->ms_flags & MSGF_SYNC) {
433 * If a synchronous completion has been requested, just wakeup
434 * the message without bothering to queue it to the target port.
436 * Assume the target thread is non-preemptive, so no critical
437 * section is required.
440 if (port->mpu_td->td_gd == mycpu) {
442 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
443 if (port->mp_flags & MSGPORTF_WAITING)
444 lwkt_schedule(port->mpu_td);
448 msg->ms_flags |= MSGF_INTRANSIT;
450 msg->ms_flags |= MSGF_REPLY;
451 lwkt_send_ipiq(port->mpu_td->td_gd,
452 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
457 * If an asynchronous completion has been requested the message
458 * must be queued to the reply port.
460 * A critical section is required to interlock the port queue.
463 if (port->mpu_td->td_gd == mycpu) {
466 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
467 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
468 if (port->mp_flags & MSGPORTF_WAITING)
469 lwkt_schedule(port->mpu_td);
474 msg->ms_flags |= MSGF_INTRANSIT;
476 msg->ms_flags |= MSGF_REPLY;
477 lwkt_send_ipiq(port->mpu_td->td_gd,
478 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
485 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
487 * Called with the target port as an argument but in the context of the
488 * reply port. This function always implements an asynchronous put to
489 * the target message port, and thus returns EASYNC.
491 * The message must already have cleared MSGF_DONE and MSGF_REPLY
498 lwkt_thread_putport_remote(lwkt_msg_t msg)
500 lwkt_port_t port = msg->ms_target_port;
503 * Chase any thread migration that occurs
505 if (port->mpu_td->td_gd != mycpu) {
506 lwkt_send_ipiq(port->mpu_td->td_gd,
507 (ipifunc1_t)lwkt_thread_putport_remote, msg);
515 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
516 msg->ms_flags &= ~MSGF_INTRANSIT;
518 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
519 msg->ms_flags |= MSGF_QUEUED;
520 if (port->mp_flags & MSGPORTF_WAITING)
521 lwkt_schedule(port->mpu_td);
528 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
530 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
532 msg->ms_target_port = port;
534 if (port->mpu_td->td_gd == mycpu) {
537 msg->ms_flags |= MSGF_QUEUED;
538 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
539 if (port->mp_flags & MSGPORTF_WAITING)
540 lwkt_schedule(port->mpu_td);
545 msg->ms_flags |= MSGF_INTRANSIT;
547 lwkt_send_ipiq(port->mpu_td->td_gd,
548 (ipifunc1_t)lwkt_thread_putport_remote, msg);
555 * lwkt_thread_getport()
557 * Retrieve the next message from the port or NULL if no messages
561 lwkt_thread_getport(lwkt_port_t port)
565 KKASSERT(port->mpu_td == curthread);
567 crit_enter_quick(port->mpu_td);
568 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
569 _lwkt_pullmsg(port, msg);
570 crit_exit_quick(port->mpu_td);
575 * lwkt_thread_waitmsg()
577 * Wait for a particular message to be replied. We must be the only
578 * thread waiting on the message. The port must be owned by the
582 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
584 if ((msg->ms_flags & MSGF_DONE) == 0) {
586 * If the done bit was not set we have to block until it is.
588 lwkt_port_t port = msg->ms_reply_port;
589 thread_t td = curthread;
592 KKASSERT(port->mpu_td == td);
593 crit_enter_quick(td);
596 while ((msg->ms_flags & MSGF_DONE) == 0) {
597 port->mp_flags |= MSGPORTF_WAITING;
598 if (sentabort == 0) {
599 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
603 lwkt_sleep("waitabt", 0);
605 port->mp_flags &= ~MSGPORTF_WAITING;
607 if (msg->ms_flags & MSGF_QUEUED)
608 _lwkt_pullmsg(port, msg);
612 * If the done bit was set we only have to mess around with the
613 * message if it is queued on the reply port.
615 if (msg->ms_flags & MSGF_QUEUED) {
616 lwkt_port_t port = msg->ms_reply_port;
617 thread_t td = curthread;
619 KKASSERT(port->mpu_td == td);
620 crit_enter_quick(td);
621 _lwkt_pullmsg(port, msg);
625 return(msg->ms_error);
629 lwkt_thread_waitport(lwkt_port_t port, int flags)
631 thread_t td = curthread;
635 KKASSERT(port->mpu_td == td);
636 crit_enter_quick(td);
637 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
638 port->mp_flags |= MSGPORTF_WAITING;
639 error = lwkt_sleep("waitport", flags);
640 port->mp_flags &= ~MSGPORTF_WAITING;
644 _lwkt_pullmsg(port, msg);
650 /************************************************************************
651 * SPIN PORT BACKEND *
652 ************************************************************************
654 * This backend uses spinlocks instead of making assumptions about which
655 * thread is accessing the port. It must be used when a port is not owned
656 * by a particular thread. This is less optimal then thread ports but
657 * you don't have a choice if there are multiple threads accessing the port.
659 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
660 * on the message port, it is the responsibility of the code doing the
661 * wakeup to clear this flag rather then the blocked threads. Some
662 * superfluous wakeups may occur, which is ok.
664 * XXX synchronous message wakeups are not current optimized.
669 lwkt_spin_getport(lwkt_port_t port)
673 spin_lock_wr(&port->mpu_spin);
674 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
675 _lwkt_pullmsg(port, msg);
676 spin_unlock_wr(&port->mpu_spin);
682 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
686 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
688 msg->ms_target_port = port;
689 spin_lock_wr(&port->mpu_spin);
690 msg->ms_flags |= MSGF_QUEUED;
691 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
693 if (port->mp_flags & MSGPORTF_WAITING) {
694 port->mp_flags &= ~MSGPORTF_WAITING;
697 spin_unlock_wr(&port->mpu_spin);
705 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
711 if ((msg->ms_flags & MSGF_DONE) == 0) {
712 port = msg->ms_reply_port;
714 spin_lock_wr(&port->mpu_spin);
715 while ((msg->ms_flags & MSGF_DONE) == 0) {
719 * If message was sent synchronously from the beginning
720 * the wakeup will be on the message structure, else it
721 * will be on the port structure.
723 if (msg->ms_flags & MSGF_SYNC) {
727 port->mp_flags |= MSGPORTF_WAITING;
731 * Only messages which support abort can be interrupted.
732 * We must still wait for message completion regardless.
734 if ((flags & PCATCH) && sentabort == 0) {
735 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
738 spin_unlock_wr(&port->mpu_spin);
740 spin_lock_wr(&port->mpu_spin);
743 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
745 /* see note at the top on the MSGPORTF_WAITING flag */
748 * Turn EINTR into ERESTART if the signal indicates.
750 if (sentabort && msg->ms_error == EINTR)
751 msg->ms_error = sentabort;
752 if (msg->ms_flags & MSGF_QUEUED)
753 _lwkt_pullmsg(port, msg);
754 spin_unlock_wr(&port->mpu_spin);
756 if (msg->ms_flags & MSGF_QUEUED) {
757 port = msg->ms_reply_port;
758 spin_lock_wr(&port->mpu_spin);
759 _lwkt_pullmsg(port, msg);
760 spin_unlock_wr(&port->mpu_spin);
763 return(msg->ms_error);
768 lwkt_spin_waitport(lwkt_port_t port, int flags)
773 spin_lock_wr(&port->mpu_spin);
774 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
775 port->mp_flags |= MSGPORTF_WAITING;
776 error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
777 /* see note at the top on the MSGPORTF_WAITING flag */
779 spin_unlock_wr(&port->mpu_spin);
783 _lwkt_pullmsg(port, msg);
784 spin_unlock_wr(&port->mpu_spin);
790 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
794 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
796 if (msg->ms_flags & MSGF_SYNC) {
798 * If a synchronous completion has been requested, just wakeup
799 * the message without bothering to queue it to the target port.
801 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
805 * If an asynchronous completion has been requested the message
806 * must be queued to the reply port.
808 spin_lock_wr(&port->mpu_spin);
809 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
810 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
812 if (port->mp_flags & MSGPORTF_WAITING) {
813 port->mp_flags &= ~MSGPORTF_WAITING;
816 spin_unlock_wr(&port->mpu_spin);
822 /************************************************************************
823 * SERIALIZER PORT BACKEND *
824 ************************************************************************
826 * This backend uses serializer to protect port accessing. Callers are
827 * assumed to have serializer held. This kind of port is usually created
828 * by network device driver along with _one_ lwkt thread to pipeline
829 * operations which may temporarily release serializer.
831 * Implementation is based on SPIN PORT BACKEND.
836 lwkt_serialize_getport(lwkt_port_t port)
840 ASSERT_SERIALIZED(port->mpu_serialize);
842 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
843 _lwkt_pullmsg(port, msg);
849 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
851 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
852 ASSERT_SERIALIZED(port->mpu_serialize);
854 msg->ms_target_port = port;
855 msg->ms_flags |= MSGF_QUEUED;
856 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
857 if (port->mp_flags & MSGPORTF_WAITING) {
858 port->mp_flags &= ~MSGPORTF_WAITING;
866 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
872 if ((msg->ms_flags & MSGF_DONE) == 0) {
873 port = msg->ms_reply_port;
875 ASSERT_SERIALIZED(port->mpu_serialize);
878 while ((msg->ms_flags & MSGF_DONE) == 0) {
882 * If message was sent synchronously from the beginning
883 * the wakeup will be on the message structure, else it
884 * will be on the port structure.
886 if (msg->ms_flags & MSGF_SYNC) {
890 port->mp_flags |= MSGPORTF_WAITING;
894 * Only messages which support abort can be interrupted.
895 * We must still wait for message completion regardless.
897 if ((flags & PCATCH) && sentabort == 0) {
898 error = serialize_sleep(won, port->mpu_serialize, PCATCH,
902 lwkt_serialize_exit(port->mpu_serialize);
904 lwkt_serialize_enter(port->mpu_serialize);
907 error = serialize_sleep(won, port->mpu_serialize, 0,
910 /* see note at the top on the MSGPORTF_WAITING flag */
913 * Turn EINTR into ERESTART if the signal indicates.
915 if (sentabort && msg->ms_error == EINTR)
916 msg->ms_error = sentabort;
917 if (msg->ms_flags & MSGF_QUEUED)
918 _lwkt_pullmsg(port, msg);
920 if (msg->ms_flags & MSGF_QUEUED) {
921 port = msg->ms_reply_port;
923 ASSERT_SERIALIZED(port->mpu_serialize);
924 _lwkt_pullmsg(port, msg);
927 return(msg->ms_error);
932 lwkt_serialize_waitport(lwkt_port_t port, int flags)
937 ASSERT_SERIALIZED(port->mpu_serialize);
939 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
940 port->mp_flags |= MSGPORTF_WAITING;
941 error = serialize_sleep(port, port->mpu_serialize, flags,
943 /* see note at the top on the MSGPORTF_WAITING flag */
947 _lwkt_pullmsg(port, msg);
953 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
955 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
956 ASSERT_SERIALIZED(port->mpu_serialize);
958 if (msg->ms_flags & MSGF_SYNC) {
960 * If a synchronous completion has been requested, just wakeup
961 * the message without bothering to queue it to the target port.
963 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
967 * If an asynchronous completion has been requested the message
968 * must be queued to the reply port.
970 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
971 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
972 if (port->mp_flags & MSGPORTF_WAITING) {
973 port->mp_flags &= ~MSGPORTF_WAITING;
979 /************************************************************************
980 * PANIC AND SPECIAL PORT FUNCTIONS *
981 ************************************************************************/
984 * You can point a port's reply vector at this function if you just want
985 * the message marked done, without any queueing or signaling. This is
986 * often used for structure-embedded messages.
990 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
992 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
997 lwkt_panic_getport(lwkt_port_t port)
999 panic("lwkt_getport() illegal on port %p", port);
1004 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1006 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1011 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1013 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1018 lwkt_panic_waitport(lwkt_port_t port, int flags)
1020 panic("port %p cannot be waited on", port);
1025 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1027 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);