2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
73 /************************************************************************
75 ************************************************************************/
80 * Request asynchronous completion and call lwkt_beginmsg(). The
81 * target port can opt to execute the message synchronously or
82 * asynchronously and this function will automatically queue the
83 * response if the target executes the message synchronously.
85 * NOTE: The message is in an indeterminant state until this call
86 * returns. The caller should not mess with it (e.g. try to abort it)
89 * NOTE: Do not use this function to forward a message as we might
90 * clobber ms_flags in a SMP race.
93 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
97 KKASSERT(msg->ms_reply_port != NULL &&
98 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
99 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
100 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 * Target port opted to execute the message synchronously so
103 * queue the response.
105 lwkt_replymsg(msg, error);
110 lwkt_sendmsg_stage1(lwkt_port_t port, lwkt_msg_t msg)
112 KKASSERT(msg->ms_reply_port != NULL &&
113 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
114 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
118 lwkt_sendmsg_stage2(lwkt_port_t port, lwkt_msg_t msg)
122 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
124 * Target port opted to execute the message synchronously so
125 * queue the response.
127 lwkt_replymsg(msg, error);
134 * Request synchronous completion and call lwkt_beginmsg(). The
135 * target port can opt to execute the message synchronously or
136 * asynchronously and this function will automatically block and
137 * wait for a response if the target executes the message
140 * NOTE: Do not use this function to forward a message as we might
141 * clobber ms_flags in a SMP race.
144 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
148 KKASSERT(msg->ms_reply_port != NULL &&
149 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
150 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
151 msg->ms_flags |= MSGF_SYNC;
152 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
154 * Target port opted to execute the message asynchronously so
155 * block and wait for a reply.
157 error = lwkt_waitmsg(msg, flags);
159 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
167 * Forward a message received on one port to another port.
170 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
174 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
175 if ((error = port->mp_putport(port, msg)) != EASYNC)
176 lwkt_replymsg(msg, error);
183 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
184 * The caller must ensure that the message will not be both replied AND
185 * destroyed while the abort is in progress.
187 * This function issues a callback which might block!
190 lwkt_abortmsg(lwkt_msg_t msg)
193 * A critical section protects us from reply IPIs on this cpu.
198 * Shortcut the operation if the message has already been returned.
199 * The callback typically constructs a lwkt_msg with the abort request,
200 * issues it synchronously, and waits for completion. The callback
201 * is not required to actually abort the message and the target port,
202 * upon receiving an abort request message generated by the callback
203 * should check whether the original message has already completed or
206 if (msg->ms_flags & MSGF_ABORTABLE) {
207 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
208 msg->ms_abortfn(msg);
213 /************************************************************************
214 * PORT INITIALIZATION API *
215 ************************************************************************/
217 static void *lwkt_thread_getport(lwkt_port_t port);
218 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
219 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
220 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
221 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
222 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
224 static void *lwkt_spin_getport(lwkt_port_t port);
225 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
226 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
227 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
228 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
229 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
231 static void *lwkt_serialize_getport(lwkt_port_t port);
232 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
233 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
234 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
235 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
237 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
238 static void *lwkt_panic_getport(lwkt_port_t port);
239 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
240 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
241 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
242 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
243 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
246 * Core port initialization (internal)
250 _lwkt_initport(lwkt_port_t port,
251 void *(*gportfn)(lwkt_port_t),
252 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
253 int (*wmsgfn)(lwkt_msg_t, int),
254 void *(*wportfn)(lwkt_port_t, int),
255 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
256 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
258 bzero(port, sizeof(*port));
259 TAILQ_INIT(&port->mp_msgq);
260 TAILQ_INIT(&port->mp_msgq_prio);
261 port->mp_getport = gportfn;
262 port->mp_putport = pportfn;
263 port->mp_waitmsg = wmsgfn;
264 port->mp_waitport = wportfn;
265 port->mp_replyport = rportfn;
266 port->mp_dropmsg = dmsgfn;
270 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
271 * we tell the scheduler not to reschedule if td is at a higher priority.
273 * This routine is called even if the thread is already scheduled.
277 _lwkt_schedule_msg(thread_t td, int flags)
283 * lwkt_initport_thread()
285 * Initialize a port for use by a particular thread. The port may
286 * only be used by <td>.
289 lwkt_initport_thread(lwkt_port_t port, thread_t td)
295 lwkt_thread_waitport,
296 lwkt_thread_replyport,
297 lwkt_thread_dropmsg);
302 * lwkt_initport_spin()
304 * Initialize a port for use with descriptors that might be accessed
305 * via multiple LWPs, processes, or threads. Has somewhat more
306 * overhead then thread ports.
309 lwkt_initport_spin(lwkt_port_t port, thread_t td)
311 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
314 dmsgfn = lwkt_panic_dropmsg;
316 dmsgfn = lwkt_spin_dropmsg;
325 spin_init(&port->mpu_spin);
330 * lwkt_initport_serialize()
332 * Initialize a port for use with descriptors that might be accessed
333 * via multiple LWPs, processes, or threads. Callers are assumed to
334 * have held the serializer (slz).
337 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
340 lwkt_serialize_getport,
341 lwkt_serialize_putport,
342 lwkt_serialize_waitmsg,
343 lwkt_serialize_waitport,
344 lwkt_serialize_replyport,
346 port->mpu_serialize = slz;
350 * Similar to the standard initport, this function simply marks the message
351 * as being done and does not attempt to return it to an originating port.
354 lwkt_initport_replyonly_null(lwkt_port_t port)
366 * Initialize a reply-only port, typically used as a message sink. Such
367 * ports can only be used as a reply port.
370 lwkt_initport_replyonly(lwkt_port_t port,
371 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
373 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
374 lwkt_panic_waitmsg, lwkt_panic_waitport,
375 rportfn, lwkt_panic_dropmsg);
379 lwkt_initport_putonly(lwkt_port_t port,
380 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
382 _lwkt_initport(port, lwkt_panic_getport, pportfn,
383 lwkt_panic_waitmsg, lwkt_panic_waitport,
384 lwkt_panic_replyport, lwkt_panic_dropmsg);
388 lwkt_initport_panic(lwkt_port_t port)
391 lwkt_panic_getport, lwkt_panic_putport,
392 lwkt_panic_waitmsg, lwkt_panic_waitport,
393 lwkt_panic_replyport, lwkt_panic_dropmsg);
398 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
400 lwkt_msg_queue *queue;
403 * normal case, remove and return the message.
405 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
406 queue = &port->mp_msgq_prio;
408 queue = &port->mp_msgq;
409 TAILQ_REMOVE(queue, msg, ms_node);
412 * atomic op needed for spin ports
414 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
419 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
421 lwkt_msg_queue *queue;
424 * atomic op needed for spin ports
426 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
427 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
428 queue = &port->mp_msgq_prio;
430 queue = &port->mp_msgq;
431 TAILQ_INSERT_TAIL(queue, msg, ms_node);
436 _lwkt_pollmsg(lwkt_port_t port)
440 msg = TAILQ_FIRST(&port->mp_msgq_prio);
441 if (__predict_false(msg != NULL))
445 * Priority queue has no message, fallback to non-priority queue.
447 return TAILQ_FIRST(&port->mp_msgq);
452 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
455 * atomic op needed for spin ports
457 _lwkt_pushmsg(port, msg);
458 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
461 /************************************************************************
462 * THREAD PORT BACKEND *
463 ************************************************************************
465 * This backend is used when the port a message is retrieved from is owned
466 * by a single thread (the calling thread). Messages are IPId to the
467 * correct cpu before being enqueued to a port. Note that this is fairly
468 * optimal since scheduling would have had to do an IPI anyway if the
469 * message were headed to a different cpu.
473 * This function completes reply processing for the default case in the
474 * context of the originating cpu.
478 lwkt_thread_replyport_remote(lwkt_msg_t msg)
480 lwkt_port_t port = msg->ms_reply_port;
484 * Chase any thread migration that occurs
486 if (port->mpu_td->td_gd != mycpu) {
487 lwkt_send_ipiq(port->mpu_td->td_gd,
488 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
493 * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
496 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
497 msg->ms_flags &= ~MSGF_INTRANSIT;
499 flags = msg->ms_flags;
500 if (msg->ms_flags & MSGF_SYNC) {
502 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
504 _lwkt_enqueue_reply(port, msg);
506 if (port->mp_flags & MSGPORTF_WAITING)
507 _lwkt_schedule_msg(port->mpu_td, flags);
511 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
513 * Called with the reply port as an argument but in the context of the
514 * original target port. Completion must occur on the target port's
517 * The critical section protects us from IPIs on the this CPU.
521 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
525 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
527 if (msg->ms_flags & MSGF_SYNC) {
529 * If a synchronous completion has been requested, just wakeup
530 * the message without bothering to queue it to the target port.
532 * Assume the target thread is non-preemptive, so no critical
533 * section is required.
535 if (port->mpu_td->td_gd == mycpu) {
537 flags = msg->ms_flags;
539 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
540 if (port->mp_flags & MSGPORTF_WAITING)
541 _lwkt_schedule_msg(port->mpu_td, flags);
545 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
547 atomic_set_int(&msg->ms_flags, MSGF_REPLY);
548 lwkt_send_ipiq(port->mpu_td->td_gd,
549 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
553 * If an asynchronous completion has been requested the message
554 * must be queued to the reply port.
556 * A critical section is required to interlock the port queue.
558 if (port->mpu_td->td_gd == mycpu) {
560 _lwkt_enqueue_reply(port, msg);
561 if (port->mp_flags & MSGPORTF_WAITING)
562 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
566 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
568 atomic_set_int(&msg->ms_flags, MSGF_REPLY);
569 lwkt_send_ipiq(port->mpu_td->td_gd,
570 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
576 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
578 * This function could _only_ be used when caller is in the same thread
579 * as the message's target port owner thread.
582 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
586 KASSERT(port->mpu_td == curthread,
587 ("message could only be dropped in the same thread "
588 "as the message target port thread"));
589 crit_enter_quick(port->mpu_td);
590 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
591 _lwkt_pullmsg(port, msg);
592 atomic_set_int(&msg->ms_flags, MSGF_DONE);
597 crit_exit_quick(port->mpu_td);
603 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
605 * Called with the target port as an argument but in the context of the
606 * reply port. This function always implements an asynchronous put to
607 * the target message port, and thus returns EASYNC.
609 * The message must already have cleared MSGF_DONE and MSGF_REPLY
613 lwkt_thread_putport_remote(lwkt_msg_t msg)
615 lwkt_port_t port = msg->ms_target_port;
618 * Chase any thread migration that occurs
620 if (port->mpu_td->td_gd != mycpu) {
621 lwkt_send_ipiq(port->mpu_td->td_gd,
622 (ipifunc1_t)lwkt_thread_putport_remote, msg);
627 * An atomic op is needed on ms_flags vs originator. Also
628 * note that the originator might be using a different type
632 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
633 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
635 _lwkt_pushmsg(port, msg);
636 if (port->mp_flags & MSGPORTF_WAITING)
637 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
642 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
644 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
646 msg->ms_target_port = port;
647 if (port->mpu_td->td_gd == mycpu) {
649 _lwkt_pushmsg(port, msg);
650 if (port->mp_flags & MSGPORTF_WAITING)
651 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
658 * An atomic op is needed on ms_flags vs originator. Also
659 * note that the originator might be using a different type
662 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
664 lwkt_send_ipiq(port->mpu_td->td_gd,
665 (ipifunc1_t)lwkt_thread_putport_remote, msg);
671 * lwkt_thread_getport()
673 * Retrieve the next message from the port or NULL if no messages
678 lwkt_thread_getport(lwkt_port_t port)
682 KKASSERT(port->mpu_td == curthread);
684 crit_enter_quick(port->mpu_td);
685 if ((msg = _lwkt_pollmsg(port)) != NULL)
686 _lwkt_pullmsg(port, msg);
687 crit_exit_quick(port->mpu_td);
692 * lwkt_thread_waitmsg()
694 * Wait for a particular message to be replied. We must be the only
695 * thread waiting on the message. The port must be owned by the
700 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
702 thread_t td = curthread;
704 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
705 ("can't wait dropable message"));
707 if ((msg->ms_flags & MSGF_DONE) == 0) {
709 * If the done bit was not set we have to block until it is.
711 lwkt_port_t port = msg->ms_reply_port;
714 KKASSERT(port->mpu_td == td);
715 crit_enter_quick(td);
718 while ((msg->ms_flags & MSGF_DONE) == 0) {
719 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
720 if (sentabort == 0) {
721 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
725 lwkt_sleep("waitabt", 0);
727 port->mp_flags &= ~MSGPORTF_WAITING;
729 if (msg->ms_flags & MSGF_QUEUED)
730 _lwkt_pullmsg(port, msg);
734 * If the done bit was set we only have to mess around with the
735 * message if it is queued on the reply port.
737 crit_enter_quick(td);
738 if (msg->ms_flags & MSGF_QUEUED) {
739 lwkt_port_t port = msg->ms_reply_port;
740 thread_t td __debugvar = curthread;
742 KKASSERT(port->mpu_td == td);
743 _lwkt_pullmsg(port, msg);
747 return(msg->ms_error);
751 * lwkt_thread_waitport()
753 * Wait for a new message to be available on the port. We must be the
754 * the only thread waiting on the port. The port must be owned by caller.
758 lwkt_thread_waitport(lwkt_port_t port, int flags)
760 thread_t td = curthread;
764 KKASSERT(port->mpu_td == td);
765 crit_enter_quick(td);
766 while ((msg = _lwkt_pollmsg(port)) == NULL) {
767 port->mp_flags |= MSGPORTF_WAITING;
768 error = lwkt_sleep("waitport", flags);
769 port->mp_flags &= ~MSGPORTF_WAITING;
773 _lwkt_pullmsg(port, msg);
779 /************************************************************************
780 * SPIN PORT BACKEND *
781 ************************************************************************
783 * This backend uses spinlocks instead of making assumptions about which
784 * thread is accessing the port. It must be used when a port is not owned
785 * by a particular thread. This is less optimal then thread ports but
786 * you don't have a choice if there are multiple threads accessing the port.
788 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
789 * on the message port, it is the responsibility of the code doing the
790 * wakeup to clear this flag rather then the blocked threads. Some
791 * superfluous wakeups may occur, which is ok.
793 * XXX synchronous message wakeups are not current optimized.
798 lwkt_spin_getport(lwkt_port_t port)
802 spin_lock(&port->mpu_spin);
803 if ((msg = _lwkt_pollmsg(port)) != NULL)
804 _lwkt_pullmsg(port, msg);
805 spin_unlock(&port->mpu_spin);
811 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
815 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
817 msg->ms_target_port = port;
818 spin_lock(&port->mpu_spin);
819 _lwkt_pushmsg(port, msg);
821 if (port->mp_flags & MSGPORTF_WAITING) {
822 port->mp_flags &= ~MSGPORTF_WAITING;
825 spin_unlock(&port->mpu_spin);
833 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
839 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
840 ("can't wait dropable message"));
841 port = msg->ms_reply_port;
843 if ((msg->ms_flags & MSGF_DONE) == 0) {
845 spin_lock(&port->mpu_spin);
846 while ((msg->ms_flags & MSGF_DONE) == 0) {
850 * If message was sent synchronously from the beginning
851 * the wakeup will be on the message structure, else it
852 * will be on the port structure.
854 * ms_flags needs atomic op originator vs target MSGF_QUEUED
856 if (msg->ms_flags & MSGF_SYNC) {
858 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
861 port->mp_flags |= MSGPORTF_WAITING;
865 * Only messages which support abort can be interrupted.
866 * We must still wait for message completion regardless.
868 if ((flags & PCATCH) && sentabort == 0) {
869 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
872 spin_unlock(&port->mpu_spin);
874 spin_lock(&port->mpu_spin);
877 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
879 /* see note at the top on the MSGPORTF_WAITING flag */
882 * Turn EINTR into ERESTART if the signal indicates.
884 if (sentabort && msg->ms_error == EINTR)
885 msg->ms_error = sentabort;
886 if (msg->ms_flags & MSGF_QUEUED)
887 _lwkt_pullmsg(port, msg);
888 spin_unlock(&port->mpu_spin);
890 spin_lock(&port->mpu_spin);
891 if (msg->ms_flags & MSGF_QUEUED) {
892 _lwkt_pullmsg(port, msg);
894 spin_unlock(&port->mpu_spin);
896 return(msg->ms_error);
901 lwkt_spin_waitport(lwkt_port_t port, int flags)
906 spin_lock(&port->mpu_spin);
907 while ((msg = _lwkt_pollmsg(port)) == NULL) {
908 port->mp_flags |= MSGPORTF_WAITING;
909 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
910 /* see note at the top on the MSGPORTF_WAITING flag */
912 spin_unlock(&port->mpu_spin);
916 _lwkt_pullmsg(port, msg);
917 spin_unlock(&port->mpu_spin);
923 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
927 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
929 if (msg->ms_flags & MSGF_SYNC) {
931 * If a synchronous completion has been requested, just wakeup
932 * the message without bothering to queue it to the target port.
934 * ms_flags protected by reply port spinlock
936 spin_lock(&port->mpu_spin);
937 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
939 if (msg->ms_flags & MSGF_WAITING) {
940 msg->ms_flags &= ~MSGF_WAITING;
943 spin_unlock(&port->mpu_spin);
948 * If an asynchronous completion has been requested the message
949 * must be queued to the reply port.
951 spin_lock(&port->mpu_spin);
952 _lwkt_enqueue_reply(port, msg);
954 if (port->mp_flags & MSGPORTF_WAITING) {
955 port->mp_flags &= ~MSGPORTF_WAITING;
958 spin_unlock(&port->mpu_spin);
965 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
967 * This function could _only_ be used when caller is in the same thread
968 * as the message's target port owner thread.
971 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
975 KASSERT(port->mpu_td == curthread,
976 ("message could only be dropped in the same thread "
977 "as the message target port thread\n"));
978 spin_lock(&port->mpu_spin);
979 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
980 _lwkt_pullmsg(port, msg);
981 msg->ms_flags |= MSGF_DONE;
986 spin_unlock(&port->mpu_spin);
991 /************************************************************************
992 * SERIALIZER PORT BACKEND *
993 ************************************************************************
995 * This backend uses serializer to protect port accessing. Callers are
996 * assumed to have serializer held. This kind of port is usually created
997 * by network device driver along with _one_ lwkt thread to pipeline
998 * operations which may temporarily release serializer.
1000 * Implementation is based on SPIN PORT BACKEND.
1005 lwkt_serialize_getport(lwkt_port_t port)
1009 ASSERT_SERIALIZED(port->mpu_serialize);
1011 if ((msg = _lwkt_pollmsg(port)) != NULL)
1012 _lwkt_pullmsg(port, msg);
1018 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1020 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1021 ASSERT_SERIALIZED(port->mpu_serialize);
1023 msg->ms_target_port = port;
1024 _lwkt_pushmsg(port, msg);
1025 if (port->mp_flags & MSGPORTF_WAITING) {
1026 port->mp_flags &= ~MSGPORTF_WAITING;
1034 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1040 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1041 ("can't wait dropable message"));
1043 if ((msg->ms_flags & MSGF_DONE) == 0) {
1044 port = msg->ms_reply_port;
1046 ASSERT_SERIALIZED(port->mpu_serialize);
1049 while ((msg->ms_flags & MSGF_DONE) == 0) {
1053 * If message was sent synchronously from the beginning
1054 * the wakeup will be on the message structure, else it
1055 * will be on the port structure.
1057 if (msg->ms_flags & MSGF_SYNC) {
1061 port->mp_flags |= MSGPORTF_WAITING;
1065 * Only messages which support abort can be interrupted.
1066 * We must still wait for message completion regardless.
1068 if ((flags & PCATCH) && sentabort == 0) {
1069 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1072 lwkt_serialize_exit(port->mpu_serialize);
1074 lwkt_serialize_enter(port->mpu_serialize);
1077 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1079 /* see note at the top on the MSGPORTF_WAITING flag */
1082 * Turn EINTR into ERESTART if the signal indicates.
1084 if (sentabort && msg->ms_error == EINTR)
1085 msg->ms_error = sentabort;
1086 if (msg->ms_flags & MSGF_QUEUED)
1087 _lwkt_pullmsg(port, msg);
1089 if (msg->ms_flags & MSGF_QUEUED) {
1090 port = msg->ms_reply_port;
1092 ASSERT_SERIALIZED(port->mpu_serialize);
1093 _lwkt_pullmsg(port, msg);
1096 return(msg->ms_error);
1101 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1106 ASSERT_SERIALIZED(port->mpu_serialize);
1108 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1109 port->mp_flags |= MSGPORTF_WAITING;
1110 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1111 /* see note at the top on the MSGPORTF_WAITING flag */
1115 _lwkt_pullmsg(port, msg);
1121 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1123 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1124 ASSERT_SERIALIZED(port->mpu_serialize);
1126 if (msg->ms_flags & MSGF_SYNC) {
1128 * If a synchronous completion has been requested, just wakeup
1129 * the message without bothering to queue it to the target port.
1131 * (both sides synchronized via serialized reply port)
1133 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1137 * If an asynchronous completion has been requested the message
1138 * must be queued to the reply port.
1140 _lwkt_enqueue_reply(port, msg);
1141 if (port->mp_flags & MSGPORTF_WAITING) {
1142 port->mp_flags &= ~MSGPORTF_WAITING;
1148 /************************************************************************
1149 * PANIC AND SPECIAL PORT FUNCTIONS *
1150 ************************************************************************/
1153 * You can point a port's reply vector at this function if you just want
1154 * the message marked done, without any queueing or signaling. This is
1155 * often used for structure-embedded messages.
1159 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1161 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1166 lwkt_panic_getport(lwkt_port_t port)
1168 panic("lwkt_getport() illegal on port %p", port);
1173 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1175 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1180 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1182 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1187 lwkt_panic_waitport(lwkt_port_t port, int flags)
1189 panic("port %p cannot be waited on", port);
1194 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1196 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1201 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1203 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);