2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
71 #include <machine/smp.h>
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77 /************************************************************************
79 ************************************************************************/
84 * Request asynchronous completion and call lwkt_beginmsg(). The
85 * target port can opt to execute the message synchronously or
86 * asynchronously and this function will automatically queue the
87 * response if the target executes the message synchronously.
89 * NOTE: The message is in an indeterminant state until this call
90 * returns. The caller should not mess with it (e.g. try to abort it)
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
98 KKASSERT(msg->ms_reply_port != NULL &&
99 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 lwkt_replymsg(msg, error);
109 * Request asynchronous completion and call lwkt_beginmsg(). The
110 * target port can opt to execute the message synchronously or
111 * asynchronously and this function will automatically queue the
112 * response if the target executes the message synchronously.
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
119 KKASSERT(msg->ms_reply_port != NULL &&
120 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122 msg->ms_flags |= MSGF_SYNC;
123 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124 error = lwkt_waitmsg(msg, flags);
126 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
134 * Forward a message received on one port to another port.
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
142 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143 if ((error = port->mp_putport(port, msg)) != EASYNC)
144 lwkt_replymsg(msg, error);
152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
153 * The caller must ensure that the message will not be both replied AND
154 * destroyed while the abort is in progress.
156 * This function issues a callback which might block!
159 lwkt_abortmsg(lwkt_msg_t msg)
162 * A critical section protects us from reply IPIs on this cpu.
167 * Shortcut the operation if the message has already been returned.
168 * The callback typically constructs a lwkt_msg with the abort request,
169 * issues it synchronously, and waits for completion. The callback
170 * is not required to actually abort the message and the target port,
171 * upon receiving an abort request message generated by the callback
172 * should check whether the original message has already completed or
175 if (msg->ms_flags & MSGF_ABORTABLE) {
176 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177 msg->ms_abortfn(msg);
182 /************************************************************************
183 * PORT INITIALIZATION API *
184 ************************************************************************/
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
193 static void *lwkt_spin_getport(lwkt_port_t port);
194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
200 static void *lwkt_serialize_getport(lwkt_port_t port);
201 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
204 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
206 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
207 static void *lwkt_panic_getport(lwkt_port_t port);
208 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
211 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
215 * Core port initialization (internal)
219 _lwkt_initport(lwkt_port_t port,
220 void *(*gportfn)(lwkt_port_t),
221 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
222 int (*wmsgfn)(lwkt_msg_t, int),
223 void *(*wportfn)(lwkt_port_t, int),
224 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
225 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
227 bzero(port, sizeof(*port));
228 TAILQ_INIT(&port->mp_msgq);
229 TAILQ_INIT(&port->mp_msgq_prio);
230 port->mp_getport = gportfn;
231 port->mp_putport = pportfn;
232 port->mp_waitmsg = wmsgfn;
233 port->mp_waitport = wportfn;
234 port->mp_replyport = rportfn;
235 port->mp_dropmsg = dmsgfn;
239 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
240 * we tell the scheduler not to reschedule if td is at a higher priority.
242 * This routine is called even if the thread is already scheduled.
246 _lwkt_schedule_msg(thread_t td, int flags)
252 * lwkt_initport_thread()
254 * Initialize a port for use by a particular thread. The port may
255 * only be used by <td>.
258 lwkt_initport_thread(lwkt_port_t port, thread_t td)
264 lwkt_thread_waitport,
265 lwkt_thread_replyport,
266 lwkt_thread_dropmsg);
271 * lwkt_initport_spin()
273 * Initialize a port for use with descriptors that might be accessed
274 * via multiple LWPs, processes, or threads. Has somewhat more
275 * overhead then thread ports.
278 lwkt_initport_spin(lwkt_port_t port)
287 spin_init(&port->mpu_spin);
291 * lwkt_initport_serialize()
293 * Initialize a port for use with descriptors that might be accessed
294 * via multiple LWPs, processes, or threads. Callers are assumed to
295 * have held the serializer (slz).
298 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
301 lwkt_serialize_getport,
302 lwkt_serialize_putport,
303 lwkt_serialize_waitmsg,
304 lwkt_serialize_waitport,
305 lwkt_serialize_replyport,
307 port->mpu_serialize = slz;
311 * Similar to the standard initport, this function simply marks the message
312 * as being done and does not attempt to return it to an originating port.
315 lwkt_initport_replyonly_null(lwkt_port_t port)
327 * Initialize a reply-only port, typically used as a message sink. Such
328 * ports can only be used as a reply port.
331 lwkt_initport_replyonly(lwkt_port_t port,
332 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
334 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
335 lwkt_panic_waitmsg, lwkt_panic_waitport,
336 rportfn, lwkt_panic_dropmsg);
340 lwkt_initport_putonly(lwkt_port_t port,
341 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
343 _lwkt_initport(port, lwkt_panic_getport, pportfn,
344 lwkt_panic_waitmsg, lwkt_panic_waitport,
345 lwkt_panic_replyport, lwkt_panic_dropmsg);
349 lwkt_initport_panic(lwkt_port_t port)
352 lwkt_panic_getport, lwkt_panic_putport,
353 lwkt_panic_waitmsg, lwkt_panic_waitport,
354 lwkt_panic_replyport, lwkt_panic_dropmsg);
359 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
361 lwkt_msg_queue *queue;
364 * normal case, remove and return the message.
366 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
367 queue = &port->mp_msgq_prio;
369 queue = &port->mp_msgq;
370 TAILQ_REMOVE(queue, msg, ms_node);
373 * atomic op needed for spin ports
375 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
380 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
382 lwkt_msg_queue *queue;
385 * atomic op needed for spin ports
387 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
388 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
389 queue = &port->mp_msgq_prio;
391 queue = &port->mp_msgq;
392 TAILQ_INSERT_TAIL(queue, msg, ms_node);
397 _lwkt_pollmsg(lwkt_port_t port)
401 msg = TAILQ_FIRST(&port->mp_msgq_prio);
402 if (__predict_false(msg != NULL))
406 * Priority queue has no message, fallback to non-priority queue.
408 return TAILQ_FIRST(&port->mp_msgq);
413 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
416 * atomic op needed for spin ports
418 _lwkt_pushmsg(port, msg);
419 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
422 /************************************************************************
423 * THREAD PORT BACKEND *
424 ************************************************************************
426 * This backend is used when the port a message is retrieved from is owned
427 * by a single thread (the calling thread). Messages are IPId to the
428 * correct cpu before being enqueued to a port. Note that this is fairly
429 * optimal since scheduling would have had to do an IPI anyway if the
430 * message were headed to a different cpu.
436 * This function completes reply processing for the default case in the
437 * context of the originating cpu.
441 lwkt_thread_replyport_remote(lwkt_msg_t msg)
443 lwkt_port_t port = msg->ms_reply_port;
447 * Chase any thread migration that occurs
449 if (port->mpu_td->td_gd != mycpu) {
450 lwkt_send_ipiq(port->mpu_td->td_gd,
451 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
459 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
460 msg->ms_flags &= ~MSGF_INTRANSIT;
462 flags = msg->ms_flags;
463 if (msg->ms_flags & MSGF_SYNC) {
465 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
467 _lwkt_enqueue_reply(port, msg);
469 if (port->mp_flags & MSGPORTF_WAITING)
470 _lwkt_schedule_msg(port->mpu_td, flags);
476 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
478 * Called with the reply port as an argument but in the context of the
479 * original target port. Completion must occur on the target port's
482 * The critical section protects us from IPIs on the this CPU.
486 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
490 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
492 if (msg->ms_flags & MSGF_SYNC) {
494 * If a synchronous completion has been requested, just wakeup
495 * the message without bothering to queue it to the target port.
497 * Assume the target thread is non-preemptive, so no critical
498 * section is required.
501 if (port->mpu_td->td_gd == mycpu) {
503 flags = msg->ms_flags;
505 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
506 if (port->mp_flags & MSGPORTF_WAITING)
507 _lwkt_schedule_msg(port->mpu_td, flags);
511 msg->ms_flags |= MSGF_INTRANSIT;
513 msg->ms_flags |= MSGF_REPLY;
514 lwkt_send_ipiq(port->mpu_td->td_gd,
515 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
520 * If an asynchronous completion has been requested the message
521 * must be queued to the reply port.
523 * A critical section is required to interlock the port queue.
526 if (port->mpu_td->td_gd == mycpu) {
529 _lwkt_enqueue_reply(port, msg);
530 if (port->mp_flags & MSGPORTF_WAITING)
531 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
536 msg->ms_flags |= MSGF_INTRANSIT;
538 msg->ms_flags |= MSGF_REPLY;
539 lwkt_send_ipiq(port->mpu_td->td_gd,
540 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
547 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
549 * This function could _only_ be used when caller is in the same thread
550 * as the message's target port owner thread.
553 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
555 KASSERT(port->mpu_td == curthread,
556 ("message could only be dropped in the same thread "
557 "as the message target port thread\n"));
558 crit_enter_quick(port->mpu_td);
559 _lwkt_pullmsg(port, msg);
560 msg->ms_flags |= MSGF_DONE;
561 crit_exit_quick(port->mpu_td);
565 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
567 * Called with the target port as an argument but in the context of the
568 * reply port. This function always implements an asynchronous put to
569 * the target message port, and thus returns EASYNC.
571 * The message must already have cleared MSGF_DONE and MSGF_REPLY
578 lwkt_thread_putport_remote(lwkt_msg_t msg)
580 lwkt_port_t port = msg->ms_target_port;
583 * Chase any thread migration that occurs
585 if (port->mpu_td->td_gd != mycpu) {
586 lwkt_send_ipiq(port->mpu_td->td_gd,
587 (ipifunc1_t)lwkt_thread_putport_remote, msg);
595 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
596 msg->ms_flags &= ~MSGF_INTRANSIT;
598 _lwkt_pushmsg(port, msg);
599 if (port->mp_flags & MSGPORTF_WAITING)
600 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
607 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
609 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
611 msg->ms_target_port = port;
613 if (port->mpu_td->td_gd == mycpu) {
616 _lwkt_pushmsg(port, msg);
617 if (port->mp_flags & MSGPORTF_WAITING)
618 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
623 msg->ms_flags |= MSGF_INTRANSIT;
625 lwkt_send_ipiq(port->mpu_td->td_gd,
626 (ipifunc1_t)lwkt_thread_putport_remote, msg);
633 * lwkt_thread_getport()
635 * Retrieve the next message from the port or NULL if no messages
640 lwkt_thread_getport(lwkt_port_t port)
644 KKASSERT(port->mpu_td == curthread);
646 crit_enter_quick(port->mpu_td);
647 if ((msg = _lwkt_pollmsg(port)) != NULL)
648 _lwkt_pullmsg(port, msg);
649 crit_exit_quick(port->mpu_td);
654 * lwkt_thread_waitmsg()
656 * Wait for a particular message to be replied. We must be the only
657 * thread waiting on the message. The port must be owned by the
662 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
664 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
665 ("can't wait dropable message\n"));
667 if ((msg->ms_flags & MSGF_DONE) == 0) {
669 * If the done bit was not set we have to block until it is.
671 lwkt_port_t port = msg->ms_reply_port;
672 thread_t td = curthread;
675 KKASSERT(port->mpu_td == td);
676 crit_enter_quick(td);
679 while ((msg->ms_flags & MSGF_DONE) == 0) {
680 port->mp_flags |= MSGPORTF_WAITING;
681 if (sentabort == 0) {
682 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
686 lwkt_sleep("waitabt", 0);
688 port->mp_flags &= ~MSGPORTF_WAITING;
690 if (msg->ms_flags & MSGF_QUEUED)
691 _lwkt_pullmsg(port, msg);
695 * If the done bit was set we only have to mess around with the
696 * message if it is queued on the reply port.
698 if (msg->ms_flags & MSGF_QUEUED) {
699 lwkt_port_t port = msg->ms_reply_port;
700 thread_t td = curthread;
702 KKASSERT(port->mpu_td == td);
703 crit_enter_quick(td);
704 _lwkt_pullmsg(port, msg);
708 return(msg->ms_error);
713 lwkt_thread_waitport(lwkt_port_t port, int flags)
715 thread_t td = curthread;
719 KKASSERT(port->mpu_td == td);
720 crit_enter_quick(td);
721 while ((msg = _lwkt_pollmsg(port)) == NULL) {
722 port->mp_flags |= MSGPORTF_WAITING;
723 error = lwkt_sleep("waitport", flags);
724 port->mp_flags &= ~MSGPORTF_WAITING;
728 _lwkt_pullmsg(port, msg);
734 /************************************************************************
735 * SPIN PORT BACKEND *
736 ************************************************************************
738 * This backend uses spinlocks instead of making assumptions about which
739 * thread is accessing the port. It must be used when a port is not owned
740 * by a particular thread. This is less optimal then thread ports but
741 * you don't have a choice if there are multiple threads accessing the port.
743 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
744 * on the message port, it is the responsibility of the code doing the
745 * wakeup to clear this flag rather then the blocked threads. Some
746 * superfluous wakeups may occur, which is ok.
748 * XXX synchronous message wakeups are not current optimized.
753 lwkt_spin_getport(lwkt_port_t port)
757 spin_lock(&port->mpu_spin);
758 if ((msg = _lwkt_pollmsg(port)) != NULL)
759 _lwkt_pullmsg(port, msg);
760 spin_unlock(&port->mpu_spin);
766 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
770 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
772 msg->ms_target_port = port;
773 spin_lock(&port->mpu_spin);
774 _lwkt_pushmsg(port, msg);
776 if (port->mp_flags & MSGPORTF_WAITING) {
777 port->mp_flags &= ~MSGPORTF_WAITING;
780 spin_unlock(&port->mpu_spin);
788 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
794 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
795 ("can't wait dropable message\n"));
797 if ((msg->ms_flags & MSGF_DONE) == 0) {
798 port = msg->ms_reply_port;
800 spin_lock(&port->mpu_spin);
801 while ((msg->ms_flags & MSGF_DONE) == 0) {
805 * If message was sent synchronously from the beginning
806 * the wakeup will be on the message structure, else it
807 * will be on the port structure.
809 if (msg->ms_flags & MSGF_SYNC) {
811 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
814 port->mp_flags |= MSGPORTF_WAITING;
818 * Only messages which support abort can be interrupted.
819 * We must still wait for message completion regardless.
821 if ((flags & PCATCH) && sentabort == 0) {
822 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
825 spin_unlock(&port->mpu_spin);
827 spin_lock(&port->mpu_spin);
830 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
832 /* see note at the top on the MSGPORTF_WAITING flag */
835 * Turn EINTR into ERESTART if the signal indicates.
837 if (sentabort && msg->ms_error == EINTR)
838 msg->ms_error = sentabort;
839 if (msg->ms_flags & MSGF_QUEUED)
840 _lwkt_pullmsg(port, msg);
841 spin_unlock(&port->mpu_spin);
843 if (msg->ms_flags & MSGF_QUEUED) {
844 port = msg->ms_reply_port;
845 spin_lock(&port->mpu_spin);
846 _lwkt_pullmsg(port, msg);
847 spin_unlock(&port->mpu_spin);
850 return(msg->ms_error);
855 lwkt_spin_waitport(lwkt_port_t port, int flags)
860 spin_lock(&port->mpu_spin);
861 while ((msg = _lwkt_pollmsg(port)) == NULL) {
862 port->mp_flags |= MSGPORTF_WAITING;
863 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
864 /* see note at the top on the MSGPORTF_WAITING flag */
866 spin_unlock(&port->mpu_spin);
870 _lwkt_pullmsg(port, msg);
871 spin_unlock(&port->mpu_spin);
877 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
881 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
883 if (msg->ms_flags & MSGF_SYNC) {
885 * If a synchronous completion has been requested, just wakeup
886 * the message without bothering to queue it to the target port.
888 spin_lock(&port->mpu_spin);
889 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
891 if (msg->ms_flags & MSGF_WAITING) {
892 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
895 spin_unlock(&port->mpu_spin);
900 * If an asynchronous completion has been requested the message
901 * must be queued to the reply port.
903 spin_lock(&port->mpu_spin);
904 _lwkt_enqueue_reply(port, msg);
906 if (port->mp_flags & MSGPORTF_WAITING) {
907 port->mp_flags &= ~MSGPORTF_WAITING;
910 spin_unlock(&port->mpu_spin);
917 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
919 * This function could _only_ be used when caller is in the same thread
920 * as the message's target port owner thread.
923 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
925 KASSERT(port->mpu_td == curthread,
926 ("message could only be dropped in the same thread "
927 "as the message target port thread\n"));
928 spin_lock(&port->mpu_spin);
929 _lwkt_pullmsg(port, msg);
930 msg->ms_flags |= MSGF_DONE;
931 spin_unlock(&port->mpu_spin);
934 /************************************************************************
935 * SERIALIZER PORT BACKEND *
936 ************************************************************************
938 * This backend uses serializer to protect port accessing. Callers are
939 * assumed to have serializer held. This kind of port is usually created
940 * by network device driver along with _one_ lwkt thread to pipeline
941 * operations which may temporarily release serializer.
943 * Implementation is based on SPIN PORT BACKEND.
948 lwkt_serialize_getport(lwkt_port_t port)
952 ASSERT_SERIALIZED(port->mpu_serialize);
954 if ((msg = _lwkt_pollmsg(port)) != NULL)
955 _lwkt_pullmsg(port, msg);
961 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
963 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
964 ASSERT_SERIALIZED(port->mpu_serialize);
966 msg->ms_target_port = port;
967 _lwkt_pushmsg(port, msg);
968 if (port->mp_flags & MSGPORTF_WAITING) {
969 port->mp_flags &= ~MSGPORTF_WAITING;
977 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
983 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
984 ("can't wait dropable message\n"));
986 if ((msg->ms_flags & MSGF_DONE) == 0) {
987 port = msg->ms_reply_port;
989 ASSERT_SERIALIZED(port->mpu_serialize);
992 while ((msg->ms_flags & MSGF_DONE) == 0) {
996 * If message was sent synchronously from the beginning
997 * the wakeup will be on the message structure, else it
998 * will be on the port structure.
1000 if (msg->ms_flags & MSGF_SYNC) {
1004 port->mp_flags |= MSGPORTF_WAITING;
1008 * Only messages which support abort can be interrupted.
1009 * We must still wait for message completion regardless.
1011 if ((flags & PCATCH) && sentabort == 0) {
1012 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1015 lwkt_serialize_exit(port->mpu_serialize);
1017 lwkt_serialize_enter(port->mpu_serialize);
1020 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1022 /* see note at the top on the MSGPORTF_WAITING flag */
1025 * Turn EINTR into ERESTART if the signal indicates.
1027 if (sentabort && msg->ms_error == EINTR)
1028 msg->ms_error = sentabort;
1029 if (msg->ms_flags & MSGF_QUEUED)
1030 _lwkt_pullmsg(port, msg);
1032 if (msg->ms_flags & MSGF_QUEUED) {
1033 port = msg->ms_reply_port;
1035 ASSERT_SERIALIZED(port->mpu_serialize);
1036 _lwkt_pullmsg(port, msg);
1039 return(msg->ms_error);
1044 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1049 ASSERT_SERIALIZED(port->mpu_serialize);
1051 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1052 port->mp_flags |= MSGPORTF_WAITING;
1053 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1054 /* see note at the top on the MSGPORTF_WAITING flag */
1058 _lwkt_pullmsg(port, msg);
1064 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1066 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1067 ASSERT_SERIALIZED(port->mpu_serialize);
1069 if (msg->ms_flags & MSGF_SYNC) {
1071 * If a synchronous completion has been requested, just wakeup
1072 * the message without bothering to queue it to the target port.
1074 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1078 * If an asynchronous completion has been requested the message
1079 * must be queued to the reply port.
1081 _lwkt_enqueue_reply(port, msg);
1082 if (port->mp_flags & MSGPORTF_WAITING) {
1083 port->mp_flags &= ~MSGPORTF_WAITING;
1089 /************************************************************************
1090 * PANIC AND SPECIAL PORT FUNCTIONS *
1091 ************************************************************************/
1094 * You can point a port's reply vector at this function if you just want
1095 * the message marked done, without any queueing or signaling. This is
1096 * often used for structure-embedded messages.
1100 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1102 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1107 lwkt_panic_getport(lwkt_port_t port)
1109 panic("lwkt_getport() illegal on port %p", port);
1114 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1116 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1121 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1123 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1128 lwkt_panic_waitport(lwkt_port_t port, int flags)
1130 panic("port %p cannot be waited on", port);
1135 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1137 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1142 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1144 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);