2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
69 #include <machine/smp.h>
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
75 /************************************************************************
77 ************************************************************************/
82 * Request asynchronous completion and call lwkt_beginmsg(). The
83 * target port can opt to execute the message synchronously or
84 * asynchronously and this function will automatically queue the
85 * response if the target executes the message synchronously.
87 * NOTE: The message is in an indeterminant state until this call
88 * returns. The caller should not mess with it (e.g. try to abort it)
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
96 KKASSERT(msg->ms_reply_port != NULL &&
97 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
100 lwkt_replymsg(msg, error);
107 * Request asynchronous completion and call lwkt_beginmsg(). The
108 * target port can opt to execute the message synchronously or
109 * asynchronously and this function will automatically queue the
110 * response if the target executes the message synchronously.
113 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
117 KKASSERT(msg->ms_reply_port != NULL &&
118 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
119 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
120 msg->ms_flags |= MSGF_SYNC;
121 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
122 error = lwkt_waitmsg(msg, flags);
124 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
132 * Forward a message received on one port to another port.
135 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
140 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
141 if ((error = port->mp_putport(port, msg)) != EASYNC)
142 lwkt_replymsg(msg, error);
150 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
151 * The caller must ensure that the message will not be both replied AND
152 * destroyed while the abort is in progress.
154 * This function issues a callback which might block!
157 lwkt_abortmsg(lwkt_msg_t msg)
160 * A critical section protects us from reply IPIs on this cpu.
165 * Shortcut the operation if the message has already been returned.
166 * The callback typically constructs a lwkt_msg with the abort request,
167 * issues it synchronously, and waits for completion. The callback
168 * is not required to actually abort the message and the target port,
169 * upon receiving an abort request message generated by the callback
170 * should check whether the original message has already completed or
173 if (msg->ms_flags & MSGF_ABORTABLE) {
174 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
175 msg->ms_abortfn(msg);
180 /************************************************************************
181 * PORT INITIALIZATION API *
182 ************************************************************************/
184 static void *lwkt_thread_getport(lwkt_port_t port);
185 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
186 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
187 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
188 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
189 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
191 static void *lwkt_spin_getport(lwkt_port_t port);
192 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
193 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
194 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
195 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
196 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
198 static void *lwkt_serialize_getport(lwkt_port_t port);
199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_panic_getport(lwkt_port_t port);
206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
210 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
213 * Core port initialization (internal)
217 _lwkt_initport(lwkt_port_t port,
218 void *(*gportfn)(lwkt_port_t),
219 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
220 int (*wmsgfn)(lwkt_msg_t, int),
221 void *(*wportfn)(lwkt_port_t, int),
222 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
223 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
225 bzero(port, sizeof(*port));
226 TAILQ_INIT(&port->mp_msgq);
227 TAILQ_INIT(&port->mp_msgq_prio);
228 port->mp_getport = gportfn;
229 port->mp_putport = pportfn;
230 port->mp_waitmsg = wmsgfn;
231 port->mp_waitport = wportfn;
232 port->mp_replyport = rportfn;
233 port->mp_dropmsg = dmsgfn;
237 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
238 * we tell the scheduler not to reschedule if td is at a higher priority.
240 * This routine is called even if the thread is already scheduled.
244 _lwkt_schedule_msg(thread_t td, int flags)
250 * lwkt_initport_thread()
252 * Initialize a port for use by a particular thread. The port may
253 * only be used by <td>.
256 lwkt_initport_thread(lwkt_port_t port, thread_t td)
262 lwkt_thread_waitport,
263 lwkt_thread_replyport,
264 lwkt_thread_dropmsg);
269 * lwkt_initport_spin()
271 * Initialize a port for use with descriptors that might be accessed
272 * via multiple LWPs, processes, or threads. Has somewhat more
273 * overhead then thread ports.
276 lwkt_initport_spin(lwkt_port_t port)
285 spin_init(&port->mpu_spin);
289 * lwkt_initport_serialize()
291 * Initialize a port for use with descriptors that might be accessed
292 * via multiple LWPs, processes, or threads. Callers are assumed to
293 * have held the serializer (slz).
296 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
299 lwkt_serialize_getport,
300 lwkt_serialize_putport,
301 lwkt_serialize_waitmsg,
302 lwkt_serialize_waitport,
303 lwkt_serialize_replyport,
305 port->mpu_serialize = slz;
309 * Similar to the standard initport, this function simply marks the message
310 * as being done and does not attempt to return it to an originating port.
313 lwkt_initport_replyonly_null(lwkt_port_t port)
325 * Initialize a reply-only port, typically used as a message sink. Such
326 * ports can only be used as a reply port.
329 lwkt_initport_replyonly(lwkt_port_t port,
330 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
332 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
333 lwkt_panic_waitmsg, lwkt_panic_waitport,
334 rportfn, lwkt_panic_dropmsg);
338 lwkt_initport_putonly(lwkt_port_t port,
339 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
341 _lwkt_initport(port, lwkt_panic_getport, pportfn,
342 lwkt_panic_waitmsg, lwkt_panic_waitport,
343 lwkt_panic_replyport, lwkt_panic_dropmsg);
347 lwkt_initport_panic(lwkt_port_t port)
350 lwkt_panic_getport, lwkt_panic_putport,
351 lwkt_panic_waitmsg, lwkt_panic_waitport,
352 lwkt_panic_replyport, lwkt_panic_dropmsg);
357 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
359 lwkt_msg_queue *queue;
362 * normal case, remove and return the message.
364 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
365 queue = &port->mp_msgq_prio;
367 queue = &port->mp_msgq;
368 TAILQ_REMOVE(queue, msg, ms_node);
371 * atomic op needed for spin ports
373 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
378 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
380 lwkt_msg_queue *queue;
383 * atomic op needed for spin ports
385 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
386 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
387 queue = &port->mp_msgq_prio;
389 queue = &port->mp_msgq;
390 TAILQ_INSERT_TAIL(queue, msg, ms_node);
395 _lwkt_pollmsg(lwkt_port_t port)
399 msg = TAILQ_FIRST(&port->mp_msgq_prio);
400 if (__predict_false(msg != NULL))
404 * Priority queue has no message, fallback to non-priority queue.
406 return TAILQ_FIRST(&port->mp_msgq);
411 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
414 * atomic op needed for spin ports
416 _lwkt_pushmsg(port, msg);
417 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
420 /************************************************************************
421 * THREAD PORT BACKEND *
422 ************************************************************************
424 * This backend is used when the port a message is retrieved from is owned
425 * by a single thread (the calling thread). Messages are IPId to the
426 * correct cpu before being enqueued to a port. Note that this is fairly
427 * optimal since scheduling would have had to do an IPI anyway if the
428 * message were headed to a different cpu.
434 * This function completes reply processing for the default case in the
435 * context of the originating cpu.
439 lwkt_thread_replyport_remote(lwkt_msg_t msg)
441 lwkt_port_t port = msg->ms_reply_port;
445 * Chase any thread migration that occurs
447 if (port->mpu_td->td_gd != mycpu) {
448 lwkt_send_ipiq(port->mpu_td->td_gd,
449 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
457 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
458 msg->ms_flags &= ~MSGF_INTRANSIT;
460 flags = msg->ms_flags;
461 if (msg->ms_flags & MSGF_SYNC) {
463 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
465 _lwkt_enqueue_reply(port, msg);
467 if (port->mp_flags & MSGPORTF_WAITING)
468 _lwkt_schedule_msg(port->mpu_td, flags);
474 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
476 * Called with the reply port as an argument but in the context of the
477 * original target port. Completion must occur on the target port's
480 * The critical section protects us from IPIs on the this CPU.
484 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
488 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
490 if (msg->ms_flags & MSGF_SYNC) {
492 * If a synchronous completion has been requested, just wakeup
493 * the message without bothering to queue it to the target port.
495 * Assume the target thread is non-preemptive, so no critical
496 * section is required.
499 if (port->mpu_td->td_gd == mycpu) {
501 flags = msg->ms_flags;
503 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
504 if (port->mp_flags & MSGPORTF_WAITING)
505 _lwkt_schedule_msg(port->mpu_td, flags);
509 msg->ms_flags |= MSGF_INTRANSIT;
511 msg->ms_flags |= MSGF_REPLY;
512 lwkt_send_ipiq(port->mpu_td->td_gd,
513 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
518 * If an asynchronous completion has been requested the message
519 * must be queued to the reply port.
521 * A critical section is required to interlock the port queue.
524 if (port->mpu_td->td_gd == mycpu) {
527 _lwkt_enqueue_reply(port, msg);
528 if (port->mp_flags & MSGPORTF_WAITING)
529 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
534 msg->ms_flags |= MSGF_INTRANSIT;
536 msg->ms_flags |= MSGF_REPLY;
537 lwkt_send_ipiq(port->mpu_td->td_gd,
538 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
545 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
547 * This function could _only_ be used when caller is in the same thread
548 * as the message's target port owner thread.
551 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
553 KASSERT(port->mpu_td == curthread,
554 ("message could only be dropped in the same thread "
555 "as the message target port thread"));
556 crit_enter_quick(port->mpu_td);
557 _lwkt_pullmsg(port, msg);
558 msg->ms_flags |= MSGF_DONE;
559 crit_exit_quick(port->mpu_td);
563 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
565 * Called with the target port as an argument but in the context of the
566 * reply port. This function always implements an asynchronous put to
567 * the target message port, and thus returns EASYNC.
569 * The message must already have cleared MSGF_DONE and MSGF_REPLY
576 lwkt_thread_putport_remote(lwkt_msg_t msg)
578 lwkt_port_t port = msg->ms_target_port;
581 * Chase any thread migration that occurs
583 if (port->mpu_td->td_gd != mycpu) {
584 lwkt_send_ipiq(port->mpu_td->td_gd,
585 (ipifunc1_t)lwkt_thread_putport_remote, msg);
593 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
594 msg->ms_flags &= ~MSGF_INTRANSIT;
596 _lwkt_pushmsg(port, msg);
597 if (port->mp_flags & MSGPORTF_WAITING)
598 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
605 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
607 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
609 msg->ms_target_port = port;
611 if (port->mpu_td->td_gd == mycpu) {
614 _lwkt_pushmsg(port, msg);
615 if (port->mp_flags & MSGPORTF_WAITING)
616 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
621 msg->ms_flags |= MSGF_INTRANSIT;
623 lwkt_send_ipiq(port->mpu_td->td_gd,
624 (ipifunc1_t)lwkt_thread_putport_remote, msg);
631 * lwkt_thread_getport()
633 * Retrieve the next message from the port or NULL if no messages
638 lwkt_thread_getport(lwkt_port_t port)
642 KKASSERT(port->mpu_td == curthread);
644 crit_enter_quick(port->mpu_td);
645 if ((msg = _lwkt_pollmsg(port)) != NULL)
646 _lwkt_pullmsg(port, msg);
647 crit_exit_quick(port->mpu_td);
652 * lwkt_thread_waitmsg()
654 * Wait for a particular message to be replied. We must be the only
655 * thread waiting on the message. The port must be owned by the
660 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
662 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
663 ("can't wait dropable message"));
665 if ((msg->ms_flags & MSGF_DONE) == 0) {
667 * If the done bit was not set we have to block until it is.
669 lwkt_port_t port = msg->ms_reply_port;
670 thread_t td = curthread;
673 KKASSERT(port->mpu_td == td);
674 crit_enter_quick(td);
677 while ((msg->ms_flags & MSGF_DONE) == 0) {
678 port->mp_flags |= MSGPORTF_WAITING;
679 if (sentabort == 0) {
680 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
684 lwkt_sleep("waitabt", 0);
686 port->mp_flags &= ~MSGPORTF_WAITING;
688 if (msg->ms_flags & MSGF_QUEUED)
689 _lwkt_pullmsg(port, msg);
693 * If the done bit was set we only have to mess around with the
694 * message if it is queued on the reply port.
696 if (msg->ms_flags & MSGF_QUEUED) {
697 lwkt_port_t port = msg->ms_reply_port;
698 thread_t td = curthread;
700 KKASSERT(port->mpu_td == td);
701 crit_enter_quick(td);
702 _lwkt_pullmsg(port, msg);
706 return(msg->ms_error);
711 lwkt_thread_waitport(lwkt_port_t port, int flags)
713 thread_t td = curthread;
717 KKASSERT(port->mpu_td == td);
718 crit_enter_quick(td);
719 while ((msg = _lwkt_pollmsg(port)) == NULL) {
720 port->mp_flags |= MSGPORTF_WAITING;
721 error = lwkt_sleep("waitport", flags);
722 port->mp_flags &= ~MSGPORTF_WAITING;
726 _lwkt_pullmsg(port, msg);
732 /************************************************************************
733 * SPIN PORT BACKEND *
734 ************************************************************************
736 * This backend uses spinlocks instead of making assumptions about which
737 * thread is accessing the port. It must be used when a port is not owned
738 * by a particular thread. This is less optimal then thread ports but
739 * you don't have a choice if there are multiple threads accessing the port.
741 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
742 * on the message port, it is the responsibility of the code doing the
743 * wakeup to clear this flag rather then the blocked threads. Some
744 * superfluous wakeups may occur, which is ok.
746 * XXX synchronous message wakeups are not current optimized.
751 lwkt_spin_getport(lwkt_port_t port)
755 spin_lock(&port->mpu_spin);
756 if ((msg = _lwkt_pollmsg(port)) != NULL)
757 _lwkt_pullmsg(port, msg);
758 spin_unlock(&port->mpu_spin);
764 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
768 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
770 msg->ms_target_port = port;
771 spin_lock(&port->mpu_spin);
772 _lwkt_pushmsg(port, msg);
774 if (port->mp_flags & MSGPORTF_WAITING) {
775 port->mp_flags &= ~MSGPORTF_WAITING;
778 spin_unlock(&port->mpu_spin);
786 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
792 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
793 ("can't wait dropable message"));
795 if ((msg->ms_flags & MSGF_DONE) == 0) {
796 port = msg->ms_reply_port;
798 spin_lock(&port->mpu_spin);
799 while ((msg->ms_flags & MSGF_DONE) == 0) {
803 * If message was sent synchronously from the beginning
804 * the wakeup will be on the message structure, else it
805 * will be on the port structure.
807 if (msg->ms_flags & MSGF_SYNC) {
809 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
812 port->mp_flags |= MSGPORTF_WAITING;
816 * Only messages which support abort can be interrupted.
817 * We must still wait for message completion regardless.
819 if ((flags & PCATCH) && sentabort == 0) {
820 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
823 spin_unlock(&port->mpu_spin);
825 spin_lock(&port->mpu_spin);
828 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
830 /* see note at the top on the MSGPORTF_WAITING flag */
833 * Turn EINTR into ERESTART if the signal indicates.
835 if (sentabort && msg->ms_error == EINTR)
836 msg->ms_error = sentabort;
837 if (msg->ms_flags & MSGF_QUEUED)
838 _lwkt_pullmsg(port, msg);
839 spin_unlock(&port->mpu_spin);
841 if (msg->ms_flags & MSGF_QUEUED) {
842 port = msg->ms_reply_port;
843 spin_lock(&port->mpu_spin);
844 _lwkt_pullmsg(port, msg);
845 spin_unlock(&port->mpu_spin);
848 return(msg->ms_error);
853 lwkt_spin_waitport(lwkt_port_t port, int flags)
858 spin_lock(&port->mpu_spin);
859 while ((msg = _lwkt_pollmsg(port)) == NULL) {
860 port->mp_flags |= MSGPORTF_WAITING;
861 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
862 /* see note at the top on the MSGPORTF_WAITING flag */
864 spin_unlock(&port->mpu_spin);
868 _lwkt_pullmsg(port, msg);
869 spin_unlock(&port->mpu_spin);
875 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
879 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
881 if (msg->ms_flags & MSGF_SYNC) {
883 * If a synchronous completion has been requested, just wakeup
884 * the message without bothering to queue it to the target port.
886 spin_lock(&port->mpu_spin);
887 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
889 if (msg->ms_flags & MSGF_WAITING) {
890 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
893 spin_unlock(&port->mpu_spin);
898 * If an asynchronous completion has been requested the message
899 * must be queued to the reply port.
901 spin_lock(&port->mpu_spin);
902 _lwkt_enqueue_reply(port, msg);
904 if (port->mp_flags & MSGPORTF_WAITING) {
905 port->mp_flags &= ~MSGPORTF_WAITING;
908 spin_unlock(&port->mpu_spin);
915 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
917 * This function could _only_ be used when caller is in the same thread
918 * as the message's target port owner thread.
921 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
923 KASSERT(port->mpu_td == curthread,
924 ("message could only be dropped in the same thread "
925 "as the message target port thread\n"));
926 spin_lock(&port->mpu_spin);
927 _lwkt_pullmsg(port, msg);
928 msg->ms_flags |= MSGF_DONE;
929 spin_unlock(&port->mpu_spin);
932 /************************************************************************
933 * SERIALIZER PORT BACKEND *
934 ************************************************************************
936 * This backend uses serializer to protect port accessing. Callers are
937 * assumed to have serializer held. This kind of port is usually created
938 * by network device driver along with _one_ lwkt thread to pipeline
939 * operations which may temporarily release serializer.
941 * Implementation is based on SPIN PORT BACKEND.
946 lwkt_serialize_getport(lwkt_port_t port)
950 ASSERT_SERIALIZED(port->mpu_serialize);
952 if ((msg = _lwkt_pollmsg(port)) != NULL)
953 _lwkt_pullmsg(port, msg);
959 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
961 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
962 ASSERT_SERIALIZED(port->mpu_serialize);
964 msg->ms_target_port = port;
965 _lwkt_pushmsg(port, msg);
966 if (port->mp_flags & MSGPORTF_WAITING) {
967 port->mp_flags &= ~MSGPORTF_WAITING;
975 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
981 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
982 ("can't wait dropable message"));
984 if ((msg->ms_flags & MSGF_DONE) == 0) {
985 port = msg->ms_reply_port;
987 ASSERT_SERIALIZED(port->mpu_serialize);
990 while ((msg->ms_flags & MSGF_DONE) == 0) {
994 * If message was sent synchronously from the beginning
995 * the wakeup will be on the message structure, else it
996 * will be on the port structure.
998 if (msg->ms_flags & MSGF_SYNC) {
1002 port->mp_flags |= MSGPORTF_WAITING;
1006 * Only messages which support abort can be interrupted.
1007 * We must still wait for message completion regardless.
1009 if ((flags & PCATCH) && sentabort == 0) {
1010 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1013 lwkt_serialize_exit(port->mpu_serialize);
1015 lwkt_serialize_enter(port->mpu_serialize);
1018 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1020 /* see note at the top on the MSGPORTF_WAITING flag */
1023 * Turn EINTR into ERESTART if the signal indicates.
1025 if (sentabort && msg->ms_error == EINTR)
1026 msg->ms_error = sentabort;
1027 if (msg->ms_flags & MSGF_QUEUED)
1028 _lwkt_pullmsg(port, msg);
1030 if (msg->ms_flags & MSGF_QUEUED) {
1031 port = msg->ms_reply_port;
1033 ASSERT_SERIALIZED(port->mpu_serialize);
1034 _lwkt_pullmsg(port, msg);
1037 return(msg->ms_error);
1042 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1047 ASSERT_SERIALIZED(port->mpu_serialize);
1049 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1050 port->mp_flags |= MSGPORTF_WAITING;
1051 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1052 /* see note at the top on the MSGPORTF_WAITING flag */
1056 _lwkt_pullmsg(port, msg);
1062 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1064 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1065 ASSERT_SERIALIZED(port->mpu_serialize);
1067 if (msg->ms_flags & MSGF_SYNC) {
1069 * If a synchronous completion has been requested, just wakeup
1070 * the message without bothering to queue it to the target port.
1072 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1076 * If an asynchronous completion has been requested the message
1077 * must be queued to the reply port.
1079 _lwkt_enqueue_reply(port, msg);
1080 if (port->mp_flags & MSGPORTF_WAITING) {
1081 port->mp_flags &= ~MSGPORTF_WAITING;
1087 /************************************************************************
1088 * PANIC AND SPECIAL PORT FUNCTIONS *
1089 ************************************************************************/
1092 * You can point a port's reply vector at this function if you just want
1093 * the message marked done, without any queueing or signaling. This is
1094 * often used for structure-embedded messages.
1098 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1100 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1105 lwkt_panic_getport(lwkt_port_t port)
1107 panic("lwkt_getport() illegal on port %p", port);
1112 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1114 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1119 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1121 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1126 lwkt_panic_waitport(lwkt_port_t port, int flags)
1128 panic("port %p cannot be waited on", port);
1133 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1135 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1140 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1142 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);