2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
69 #include <machine/smp.h>
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
75 /************************************************************************
77 ************************************************************************/
82 * Request asynchronous completion and call lwkt_beginmsg(). The
83 * target port can opt to execute the message synchronously or
84 * asynchronously and this function will automatically queue the
85 * response if the target executes the message synchronously.
87 * NOTE: The message is in an indeterminant state until this call
88 * returns. The caller should not mess with it (e.g. try to abort it)
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
96 KKASSERT(msg->ms_reply_port != NULL &&
97 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
101 * Target port opted to execute the message synchronously so
102 * queue the response.
104 lwkt_replymsg(msg, error);
111 * Request synchronous completion and call lwkt_beginmsg(). The
112 * target port can opt to execute the message synchronously or
113 * asynchronously and this function will automatically block and
114 * wait for a response if the target executes the message
118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
122 KKASSERT(msg->ms_reply_port != NULL &&
123 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
124 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
125 msg->ms_flags |= MSGF_SYNC;
126 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
128 * Target port opted to execute the message asynchronously so
129 * block and wait for a reply.
131 error = lwkt_waitmsg(msg, flags);
133 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
141 * Forward a message received on one port to another port.
144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
149 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
150 if ((error = port->mp_putport(port, msg)) != EASYNC)
151 lwkt_replymsg(msg, error);
159 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
160 * The caller must ensure that the message will not be both replied AND
161 * destroyed while the abort is in progress.
163 * This function issues a callback which might block!
166 lwkt_abortmsg(lwkt_msg_t msg)
169 * A critical section protects us from reply IPIs on this cpu.
174 * Shortcut the operation if the message has already been returned.
175 * The callback typically constructs a lwkt_msg with the abort request,
176 * issues it synchronously, and waits for completion. The callback
177 * is not required to actually abort the message and the target port,
178 * upon receiving an abort request message generated by the callback
179 * should check whether the original message has already completed or
182 if (msg->ms_flags & MSGF_ABORTABLE) {
183 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
184 msg->ms_abortfn(msg);
189 /************************************************************************
190 * PORT INITIALIZATION API *
191 ************************************************************************/
193 static void *lwkt_thread_getport(lwkt_port_t port);
194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
200 static void *lwkt_spin_getport(lwkt_port_t port);
201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
207 static void *lwkt_serialize_getport(lwkt_port_t port);
208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
222 * Core port initialization (internal)
226 _lwkt_initport(lwkt_port_t port,
227 void *(*gportfn)(lwkt_port_t),
228 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
229 int (*wmsgfn)(lwkt_msg_t, int),
230 void *(*wportfn)(lwkt_port_t, int),
231 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
232 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
234 bzero(port, sizeof(*port));
235 TAILQ_INIT(&port->mp_msgq);
236 TAILQ_INIT(&port->mp_msgq_prio);
237 port->mp_getport = gportfn;
238 port->mp_putport = pportfn;
239 port->mp_waitmsg = wmsgfn;
240 port->mp_waitport = wportfn;
241 port->mp_replyport = rportfn;
242 port->mp_dropmsg = dmsgfn;
246 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
247 * we tell the scheduler not to reschedule if td is at a higher priority.
249 * This routine is called even if the thread is already scheduled.
253 _lwkt_schedule_msg(thread_t td, int flags)
259 * lwkt_initport_thread()
261 * Initialize a port for use by a particular thread. The port may
262 * only be used by <td>.
265 lwkt_initport_thread(lwkt_port_t port, thread_t td)
271 lwkt_thread_waitport,
272 lwkt_thread_replyport,
273 lwkt_thread_dropmsg);
278 * lwkt_initport_spin()
280 * Initialize a port for use with descriptors that might be accessed
281 * via multiple LWPs, processes, or threads. Has somewhat more
282 * overhead then thread ports.
285 lwkt_initport_spin(lwkt_port_t port, thread_t td)
287 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
290 dmsgfn = lwkt_panic_dropmsg;
292 dmsgfn = lwkt_spin_dropmsg;
301 spin_init(&port->mpu_spin);
306 * lwkt_initport_serialize()
308 * Initialize a port for use with descriptors that might be accessed
309 * via multiple LWPs, processes, or threads. Callers are assumed to
310 * have held the serializer (slz).
313 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
316 lwkt_serialize_getport,
317 lwkt_serialize_putport,
318 lwkt_serialize_waitmsg,
319 lwkt_serialize_waitport,
320 lwkt_serialize_replyport,
322 port->mpu_serialize = slz;
326 * Similar to the standard initport, this function simply marks the message
327 * as being done and does not attempt to return it to an originating port.
330 lwkt_initport_replyonly_null(lwkt_port_t port)
342 * Initialize a reply-only port, typically used as a message sink. Such
343 * ports can only be used as a reply port.
346 lwkt_initport_replyonly(lwkt_port_t port,
347 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
349 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
350 lwkt_panic_waitmsg, lwkt_panic_waitport,
351 rportfn, lwkt_panic_dropmsg);
355 lwkt_initport_putonly(lwkt_port_t port,
356 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
358 _lwkt_initport(port, lwkt_panic_getport, pportfn,
359 lwkt_panic_waitmsg, lwkt_panic_waitport,
360 lwkt_panic_replyport, lwkt_panic_dropmsg);
364 lwkt_initport_panic(lwkt_port_t port)
367 lwkt_panic_getport, lwkt_panic_putport,
368 lwkt_panic_waitmsg, lwkt_panic_waitport,
369 lwkt_panic_replyport, lwkt_panic_dropmsg);
374 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
376 lwkt_msg_queue *queue;
379 * normal case, remove and return the message.
381 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
382 queue = &port->mp_msgq_prio;
384 queue = &port->mp_msgq;
385 TAILQ_REMOVE(queue, msg, ms_node);
388 * atomic op needed for spin ports
390 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
395 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
397 lwkt_msg_queue *queue;
400 * atomic op needed for spin ports
402 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
403 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
404 queue = &port->mp_msgq_prio;
406 queue = &port->mp_msgq;
407 TAILQ_INSERT_TAIL(queue, msg, ms_node);
412 _lwkt_pollmsg(lwkt_port_t port)
416 msg = TAILQ_FIRST(&port->mp_msgq_prio);
417 if (__predict_false(msg != NULL))
421 * Priority queue has no message, fallback to non-priority queue.
423 return TAILQ_FIRST(&port->mp_msgq);
428 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
431 * atomic op needed for spin ports
433 _lwkt_pushmsg(port, msg);
434 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
437 /************************************************************************
438 * THREAD PORT BACKEND *
439 ************************************************************************
441 * This backend is used when the port a message is retrieved from is owned
442 * by a single thread (the calling thread). Messages are IPId to the
443 * correct cpu before being enqueued to a port. Note that this is fairly
444 * optimal since scheduling would have had to do an IPI anyway if the
445 * message were headed to a different cpu.
451 * This function completes reply processing for the default case in the
452 * context of the originating cpu.
456 lwkt_thread_replyport_remote(lwkt_msg_t msg)
458 lwkt_port_t port = msg->ms_reply_port;
462 * Chase any thread migration that occurs
464 if (port->mpu_td->td_gd != mycpu) {
465 lwkt_send_ipiq(port->mpu_td->td_gd,
466 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
474 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
475 msg->ms_flags &= ~MSGF_INTRANSIT;
477 flags = msg->ms_flags;
478 if (msg->ms_flags & MSGF_SYNC) {
480 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
482 _lwkt_enqueue_reply(port, msg);
484 if (port->mp_flags & MSGPORTF_WAITING)
485 _lwkt_schedule_msg(port->mpu_td, flags);
491 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
493 * Called with the reply port as an argument but in the context of the
494 * original target port. Completion must occur on the target port's
497 * The critical section protects us from IPIs on the this CPU.
501 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
505 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
507 if (msg->ms_flags & MSGF_SYNC) {
509 * If a synchronous completion has been requested, just wakeup
510 * the message without bothering to queue it to the target port.
512 * Assume the target thread is non-preemptive, so no critical
513 * section is required.
516 if (port->mpu_td->td_gd == mycpu) {
518 flags = msg->ms_flags;
520 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
521 if (port->mp_flags & MSGPORTF_WAITING)
522 _lwkt_schedule_msg(port->mpu_td, flags);
526 msg->ms_flags |= MSGF_INTRANSIT;
528 msg->ms_flags |= MSGF_REPLY;
529 lwkt_send_ipiq(port->mpu_td->td_gd,
530 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
535 * If an asynchronous completion has been requested the message
536 * must be queued to the reply port.
538 * A critical section is required to interlock the port queue.
541 if (port->mpu_td->td_gd == mycpu) {
544 _lwkt_enqueue_reply(port, msg);
545 if (port->mp_flags & MSGPORTF_WAITING)
546 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
551 msg->ms_flags |= MSGF_INTRANSIT;
553 msg->ms_flags |= MSGF_REPLY;
554 lwkt_send_ipiq(port->mpu_td->td_gd,
555 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
562 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
564 * This function could _only_ be used when caller is in the same thread
565 * as the message's target port owner thread.
568 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
570 KASSERT(port->mpu_td == curthread,
571 ("message could only be dropped in the same thread "
572 "as the message target port thread"));
573 crit_enter_quick(port->mpu_td);
574 _lwkt_pullmsg(port, msg);
575 msg->ms_flags |= MSGF_DONE;
576 crit_exit_quick(port->mpu_td);
580 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
582 * Called with the target port as an argument but in the context of the
583 * reply port. This function always implements an asynchronous put to
584 * the target message port, and thus returns EASYNC.
586 * The message must already have cleared MSGF_DONE and MSGF_REPLY
593 lwkt_thread_putport_remote(lwkt_msg_t msg)
595 lwkt_port_t port = msg->ms_target_port;
598 * Chase any thread migration that occurs
600 if (port->mpu_td->td_gd != mycpu) {
601 lwkt_send_ipiq(port->mpu_td->td_gd,
602 (ipifunc1_t)lwkt_thread_putport_remote, msg);
610 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
611 msg->ms_flags &= ~MSGF_INTRANSIT;
613 _lwkt_pushmsg(port, msg);
614 if (port->mp_flags & MSGPORTF_WAITING)
615 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
622 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
624 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
626 msg->ms_target_port = port;
628 if (port->mpu_td->td_gd == mycpu) {
631 _lwkt_pushmsg(port, msg);
632 if (port->mp_flags & MSGPORTF_WAITING)
633 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
638 msg->ms_flags |= MSGF_INTRANSIT;
640 lwkt_send_ipiq(port->mpu_td->td_gd,
641 (ipifunc1_t)lwkt_thread_putport_remote, msg);
648 * lwkt_thread_getport()
650 * Retrieve the next message from the port or NULL if no messages
655 lwkt_thread_getport(lwkt_port_t port)
659 KKASSERT(port->mpu_td == curthread);
661 crit_enter_quick(port->mpu_td);
662 if ((msg = _lwkt_pollmsg(port)) != NULL)
663 _lwkt_pullmsg(port, msg);
664 crit_exit_quick(port->mpu_td);
669 * lwkt_thread_waitmsg()
671 * Wait for a particular message to be replied. We must be the only
672 * thread waiting on the message. The port must be owned by the
677 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
679 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
680 ("can't wait dropable message"));
682 if ((msg->ms_flags & MSGF_DONE) == 0) {
684 * If the done bit was not set we have to block until it is.
686 lwkt_port_t port = msg->ms_reply_port;
687 thread_t td = curthread;
690 KKASSERT(port->mpu_td == td);
691 crit_enter_quick(td);
694 while ((msg->ms_flags & MSGF_DONE) == 0) {
695 port->mp_flags |= MSGPORTF_WAITING;
696 if (sentabort == 0) {
697 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
701 lwkt_sleep("waitabt", 0);
703 port->mp_flags &= ~MSGPORTF_WAITING;
705 if (msg->ms_flags & MSGF_QUEUED)
706 _lwkt_pullmsg(port, msg);
710 * If the done bit was set we only have to mess around with the
711 * message if it is queued on the reply port.
713 if (msg->ms_flags & MSGF_QUEUED) {
714 lwkt_port_t port = msg->ms_reply_port;
715 thread_t td = curthread;
717 KKASSERT(port->mpu_td == td);
718 crit_enter_quick(td);
719 _lwkt_pullmsg(port, msg);
723 return(msg->ms_error);
727 * lwkt_thread_waitport()
729 * Wait for a new message to be available on the port. We must be the
730 * the only thread waiting on the port. The port must be owned by caller.
734 lwkt_thread_waitport(lwkt_port_t port, int flags)
736 thread_t td = curthread;
740 KKASSERT(port->mpu_td == td);
741 crit_enter_quick(td);
742 while ((msg = _lwkt_pollmsg(port)) == NULL) {
743 port->mp_flags |= MSGPORTF_WAITING;
744 error = lwkt_sleep("waitport", flags);
745 port->mp_flags &= ~MSGPORTF_WAITING;
749 _lwkt_pullmsg(port, msg);
755 /************************************************************************
756 * SPIN PORT BACKEND *
757 ************************************************************************
759 * This backend uses spinlocks instead of making assumptions about which
760 * thread is accessing the port. It must be used when a port is not owned
761 * by a particular thread. This is less optimal then thread ports but
762 * you don't have a choice if there are multiple threads accessing the port.
764 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
765 * on the message port, it is the responsibility of the code doing the
766 * wakeup to clear this flag rather then the blocked threads. Some
767 * superfluous wakeups may occur, which is ok.
769 * XXX synchronous message wakeups are not current optimized.
774 lwkt_spin_getport(lwkt_port_t port)
778 spin_lock(&port->mpu_spin);
779 if ((msg = _lwkt_pollmsg(port)) != NULL)
780 _lwkt_pullmsg(port, msg);
781 spin_unlock(&port->mpu_spin);
787 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
791 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
793 msg->ms_target_port = port;
794 spin_lock(&port->mpu_spin);
795 _lwkt_pushmsg(port, msg);
797 if (port->mp_flags & MSGPORTF_WAITING) {
798 port->mp_flags &= ~MSGPORTF_WAITING;
801 spin_unlock(&port->mpu_spin);
809 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
815 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
816 ("can't wait dropable message"));
818 if ((msg->ms_flags & MSGF_DONE) == 0) {
819 port = msg->ms_reply_port;
821 spin_lock(&port->mpu_spin);
822 while ((msg->ms_flags & MSGF_DONE) == 0) {
826 * If message was sent synchronously from the beginning
827 * the wakeup will be on the message structure, else it
828 * will be on the port structure.
830 if (msg->ms_flags & MSGF_SYNC) {
832 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
835 port->mp_flags |= MSGPORTF_WAITING;
839 * Only messages which support abort can be interrupted.
840 * We must still wait for message completion regardless.
842 if ((flags & PCATCH) && sentabort == 0) {
843 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
846 spin_unlock(&port->mpu_spin);
848 spin_lock(&port->mpu_spin);
851 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
853 /* see note at the top on the MSGPORTF_WAITING flag */
856 * Turn EINTR into ERESTART if the signal indicates.
858 if (sentabort && msg->ms_error == EINTR)
859 msg->ms_error = sentabort;
860 if (msg->ms_flags & MSGF_QUEUED)
861 _lwkt_pullmsg(port, msg);
862 spin_unlock(&port->mpu_spin);
864 if (msg->ms_flags & MSGF_QUEUED) {
865 port = msg->ms_reply_port;
866 spin_lock(&port->mpu_spin);
867 _lwkt_pullmsg(port, msg);
868 spin_unlock(&port->mpu_spin);
871 return(msg->ms_error);
876 lwkt_spin_waitport(lwkt_port_t port, int flags)
881 spin_lock(&port->mpu_spin);
882 while ((msg = _lwkt_pollmsg(port)) == NULL) {
883 port->mp_flags |= MSGPORTF_WAITING;
884 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
885 /* see note at the top on the MSGPORTF_WAITING flag */
887 spin_unlock(&port->mpu_spin);
891 _lwkt_pullmsg(port, msg);
892 spin_unlock(&port->mpu_spin);
898 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
902 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
904 if (msg->ms_flags & MSGF_SYNC) {
906 * If a synchronous completion has been requested, just wakeup
907 * the message without bothering to queue it to the target port.
909 spin_lock(&port->mpu_spin);
910 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
912 if (msg->ms_flags & MSGF_WAITING) {
913 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
916 spin_unlock(&port->mpu_spin);
921 * If an asynchronous completion has been requested the message
922 * must be queued to the reply port.
924 spin_lock(&port->mpu_spin);
925 _lwkt_enqueue_reply(port, msg);
927 if (port->mp_flags & MSGPORTF_WAITING) {
928 port->mp_flags &= ~MSGPORTF_WAITING;
931 spin_unlock(&port->mpu_spin);
938 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
940 * This function could _only_ be used when caller is in the same thread
941 * as the message's target port owner thread.
944 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
946 KASSERT(port->mpu_td == curthread,
947 ("message could only be dropped in the same thread "
948 "as the message target port thread\n"));
949 spin_lock(&port->mpu_spin);
950 _lwkt_pullmsg(port, msg);
951 msg->ms_flags |= MSGF_DONE;
952 spin_unlock(&port->mpu_spin);
955 /************************************************************************
956 * SERIALIZER PORT BACKEND *
957 ************************************************************************
959 * This backend uses serializer to protect port accessing. Callers are
960 * assumed to have serializer held. This kind of port is usually created
961 * by network device driver along with _one_ lwkt thread to pipeline
962 * operations which may temporarily release serializer.
964 * Implementation is based on SPIN PORT BACKEND.
969 lwkt_serialize_getport(lwkt_port_t port)
973 ASSERT_SERIALIZED(port->mpu_serialize);
975 if ((msg = _lwkt_pollmsg(port)) != NULL)
976 _lwkt_pullmsg(port, msg);
982 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
984 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
985 ASSERT_SERIALIZED(port->mpu_serialize);
987 msg->ms_target_port = port;
988 _lwkt_pushmsg(port, msg);
989 if (port->mp_flags & MSGPORTF_WAITING) {
990 port->mp_flags &= ~MSGPORTF_WAITING;
998 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1004 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1005 ("can't wait dropable message"));
1007 if ((msg->ms_flags & MSGF_DONE) == 0) {
1008 port = msg->ms_reply_port;
1010 ASSERT_SERIALIZED(port->mpu_serialize);
1013 while ((msg->ms_flags & MSGF_DONE) == 0) {
1017 * If message was sent synchronously from the beginning
1018 * the wakeup will be on the message structure, else it
1019 * will be on the port structure.
1021 if (msg->ms_flags & MSGF_SYNC) {
1025 port->mp_flags |= MSGPORTF_WAITING;
1029 * Only messages which support abort can be interrupted.
1030 * We must still wait for message completion regardless.
1032 if ((flags & PCATCH) && sentabort == 0) {
1033 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1036 lwkt_serialize_exit(port->mpu_serialize);
1038 lwkt_serialize_enter(port->mpu_serialize);
1041 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1043 /* see note at the top on the MSGPORTF_WAITING flag */
1046 * Turn EINTR into ERESTART if the signal indicates.
1048 if (sentabort && msg->ms_error == EINTR)
1049 msg->ms_error = sentabort;
1050 if (msg->ms_flags & MSGF_QUEUED)
1051 _lwkt_pullmsg(port, msg);
1053 if (msg->ms_flags & MSGF_QUEUED) {
1054 port = msg->ms_reply_port;
1056 ASSERT_SERIALIZED(port->mpu_serialize);
1057 _lwkt_pullmsg(port, msg);
1060 return(msg->ms_error);
1065 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1070 ASSERT_SERIALIZED(port->mpu_serialize);
1072 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1073 port->mp_flags |= MSGPORTF_WAITING;
1074 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1075 /* see note at the top on the MSGPORTF_WAITING flag */
1079 _lwkt_pullmsg(port, msg);
1085 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1087 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1088 ASSERT_SERIALIZED(port->mpu_serialize);
1090 if (msg->ms_flags & MSGF_SYNC) {
1092 * If a synchronous completion has been requested, just wakeup
1093 * the message without bothering to queue it to the target port.
1095 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1099 * If an asynchronous completion has been requested the message
1100 * must be queued to the reply port.
1102 _lwkt_enqueue_reply(port, msg);
1103 if (port->mp_flags & MSGPORTF_WAITING) {
1104 port->mp_flags &= ~MSGPORTF_WAITING;
1110 /************************************************************************
1111 * PANIC AND SPECIAL PORT FUNCTIONS *
1112 ************************************************************************/
1115 * You can point a port's reply vector at this function if you just want
1116 * the message marked done, without any queueing or signaling. This is
1117 * often used for structure-embedded messages.
1121 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1123 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1128 lwkt_panic_getport(lwkt_port_t port)
1130 panic("lwkt_getport() illegal on port %p", port);
1135 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1137 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1142 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1144 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1149 lwkt_panic_waitport(lwkt_port_t port, int flags)
1151 panic("port %p cannot be waited on", port);
1156 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1158 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1163 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1165 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);