2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * NOTE! This file may be compiled for userland libraries as well as for
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
69 #include <machine/smp.h>
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
75 /************************************************************************
77 ************************************************************************/
82 * Request asynchronous completion and call lwkt_beginmsg(). The
83 * target port can opt to execute the message synchronously or
84 * asynchronously and this function will automatically queue the
85 * response if the target executes the message synchronously.
87 * NOTE: The message is in an indeterminant state until this call
88 * returns. The caller should not mess with it (e.g. try to abort it)
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
96 KKASSERT(msg->ms_reply_port != NULL &&
97 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
101 * Target port opted to execute the message synchronously so
102 * queue the response.
104 lwkt_replymsg(msg, error);
111 * Request synchronous completion and call lwkt_beginmsg(). The
112 * target port can opt to execute the message synchronously or
113 * asynchronously and this function will automatically block and
114 * wait for a response if the target executes the message
118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
122 KKASSERT(msg->ms_reply_port != NULL &&
123 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
124 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
125 msg->ms_flags |= MSGF_SYNC;
126 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
128 * Target port opted to execute the message asynchronously so
129 * block and wait for a reply.
131 error = lwkt_waitmsg(msg, flags);
133 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
141 * Forward a message received on one port to another port.
144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
149 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
150 if ((error = port->mp_putport(port, msg)) != EASYNC)
151 lwkt_replymsg(msg, error);
159 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set.
160 * The caller must ensure that the message will not be both replied AND
161 * destroyed while the abort is in progress.
163 * This function issues a callback which might block!
166 lwkt_abortmsg(lwkt_msg_t msg)
169 * A critical section protects us from reply IPIs on this cpu.
174 * Shortcut the operation if the message has already been returned.
175 * The callback typically constructs a lwkt_msg with the abort request,
176 * issues it synchronously, and waits for completion. The callback
177 * is not required to actually abort the message and the target port,
178 * upon receiving an abort request message generated by the callback
179 * should check whether the original message has already completed or
182 if (msg->ms_flags & MSGF_ABORTABLE) {
183 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
184 msg->ms_abortfn(msg);
189 /************************************************************************
190 * PORT INITIALIZATION API *
191 ************************************************************************/
193 static void *lwkt_thread_getport(lwkt_port_t port);
194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
200 static void *lwkt_spin_getport(lwkt_port_t port);
201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
207 static void *lwkt_serialize_getport(lwkt_port_t port);
208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
222 * Core port initialization (internal)
226 _lwkt_initport(lwkt_port_t port,
227 void *(*gportfn)(lwkt_port_t),
228 int (*pportfn)(lwkt_port_t, lwkt_msg_t),
229 int (*wmsgfn)(lwkt_msg_t, int),
230 void *(*wportfn)(lwkt_port_t, int),
231 void (*rportfn)(lwkt_port_t, lwkt_msg_t),
232 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
234 bzero(port, sizeof(*port));
235 TAILQ_INIT(&port->mp_msgq);
236 TAILQ_INIT(&port->mp_msgq_prio);
237 port->mp_getport = gportfn;
238 port->mp_putport = pportfn;
239 port->mp_waitmsg = wmsgfn;
240 port->mp_waitport = wportfn;
241 port->mp_replyport = rportfn;
242 port->mp_dropmsg = dmsgfn;
246 * Schedule the target thread. If the message flags contains MSGF_NORESCHED
247 * we tell the scheduler not to reschedule if td is at a higher priority.
249 * This routine is called even if the thread is already scheduled.
253 _lwkt_schedule_msg(thread_t td, int flags)
259 * lwkt_initport_thread()
261 * Initialize a port for use by a particular thread. The port may
262 * only be used by <td>.
265 lwkt_initport_thread(lwkt_port_t port, thread_t td)
271 lwkt_thread_waitport,
272 lwkt_thread_replyport,
273 lwkt_thread_dropmsg);
278 * lwkt_initport_spin()
280 * Initialize a port for use with descriptors that might be accessed
281 * via multiple LWPs, processes, or threads. Has somewhat more
282 * overhead then thread ports.
285 lwkt_initport_spin(lwkt_port_t port)
294 spin_init(&port->mpu_spin);
298 * lwkt_initport_serialize()
300 * Initialize a port for use with descriptors that might be accessed
301 * via multiple LWPs, processes, or threads. Callers are assumed to
302 * have held the serializer (slz).
305 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
308 lwkt_serialize_getport,
309 lwkt_serialize_putport,
310 lwkt_serialize_waitmsg,
311 lwkt_serialize_waitport,
312 lwkt_serialize_replyport,
314 port->mpu_serialize = slz;
318 * Similar to the standard initport, this function simply marks the message
319 * as being done and does not attempt to return it to an originating port.
322 lwkt_initport_replyonly_null(lwkt_port_t port)
334 * Initialize a reply-only port, typically used as a message sink. Such
335 * ports can only be used as a reply port.
338 lwkt_initport_replyonly(lwkt_port_t port,
339 void (*rportfn)(lwkt_port_t, lwkt_msg_t))
341 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
342 lwkt_panic_waitmsg, lwkt_panic_waitport,
343 rportfn, lwkt_panic_dropmsg);
347 lwkt_initport_putonly(lwkt_port_t port,
348 int (*pportfn)(lwkt_port_t, lwkt_msg_t))
350 _lwkt_initport(port, lwkt_panic_getport, pportfn,
351 lwkt_panic_waitmsg, lwkt_panic_waitport,
352 lwkt_panic_replyport, lwkt_panic_dropmsg);
356 lwkt_initport_panic(lwkt_port_t port)
359 lwkt_panic_getport, lwkt_panic_putport,
360 lwkt_panic_waitmsg, lwkt_panic_waitport,
361 lwkt_panic_replyport, lwkt_panic_dropmsg);
366 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
368 lwkt_msg_queue *queue;
371 * normal case, remove and return the message.
373 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
374 queue = &port->mp_msgq_prio;
376 queue = &port->mp_msgq;
377 TAILQ_REMOVE(queue, msg, ms_node);
380 * atomic op needed for spin ports
382 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
387 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
389 lwkt_msg_queue *queue;
392 * atomic op needed for spin ports
394 atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
395 if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
396 queue = &port->mp_msgq_prio;
398 queue = &port->mp_msgq;
399 TAILQ_INSERT_TAIL(queue, msg, ms_node);
404 _lwkt_pollmsg(lwkt_port_t port)
408 msg = TAILQ_FIRST(&port->mp_msgq_prio);
409 if (__predict_false(msg != NULL))
413 * Priority queue has no message, fallback to non-priority queue.
415 return TAILQ_FIRST(&port->mp_msgq);
420 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
423 * atomic op needed for spin ports
425 _lwkt_pushmsg(port, msg);
426 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
429 /************************************************************************
430 * THREAD PORT BACKEND *
431 ************************************************************************
433 * This backend is used when the port a message is retrieved from is owned
434 * by a single thread (the calling thread). Messages are IPId to the
435 * correct cpu before being enqueued to a port. Note that this is fairly
436 * optimal since scheduling would have had to do an IPI anyway if the
437 * message were headed to a different cpu.
443 * This function completes reply processing for the default case in the
444 * context of the originating cpu.
448 lwkt_thread_replyport_remote(lwkt_msg_t msg)
450 lwkt_port_t port = msg->ms_reply_port;
454 * Chase any thread migration that occurs
456 if (port->mpu_td->td_gd != mycpu) {
457 lwkt_send_ipiq(port->mpu_td->td_gd,
458 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
466 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
467 msg->ms_flags &= ~MSGF_INTRANSIT;
469 flags = msg->ms_flags;
470 if (msg->ms_flags & MSGF_SYNC) {
472 msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
474 _lwkt_enqueue_reply(port, msg);
476 if (port->mp_flags & MSGPORTF_WAITING)
477 _lwkt_schedule_msg(port->mpu_td, flags);
483 * lwkt_thread_replyport() - Backend to lwkt_replymsg()
485 * Called with the reply port as an argument but in the context of the
486 * original target port. Completion must occur on the target port's
489 * The critical section protects us from IPIs on the this CPU.
493 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
497 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
499 if (msg->ms_flags & MSGF_SYNC) {
501 * If a synchronous completion has been requested, just wakeup
502 * the message without bothering to queue it to the target port.
504 * Assume the target thread is non-preemptive, so no critical
505 * section is required.
508 if (port->mpu_td->td_gd == mycpu) {
510 flags = msg->ms_flags;
512 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
513 if (port->mp_flags & MSGPORTF_WAITING)
514 _lwkt_schedule_msg(port->mpu_td, flags);
518 msg->ms_flags |= MSGF_INTRANSIT;
520 msg->ms_flags |= MSGF_REPLY;
521 lwkt_send_ipiq(port->mpu_td->td_gd,
522 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
527 * If an asynchronous completion has been requested the message
528 * must be queued to the reply port.
530 * A critical section is required to interlock the port queue.
533 if (port->mpu_td->td_gd == mycpu) {
536 _lwkt_enqueue_reply(port, msg);
537 if (port->mp_flags & MSGPORTF_WAITING)
538 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
543 msg->ms_flags |= MSGF_INTRANSIT;
545 msg->ms_flags |= MSGF_REPLY;
546 lwkt_send_ipiq(port->mpu_td->td_gd,
547 (ipifunc1_t)lwkt_thread_replyport_remote, msg);
554 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
556 * This function could _only_ be used when caller is in the same thread
557 * as the message's target port owner thread.
560 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
562 KASSERT(port->mpu_td == curthread,
563 ("message could only be dropped in the same thread "
564 "as the message target port thread"));
565 crit_enter_quick(port->mpu_td);
566 _lwkt_pullmsg(port, msg);
567 msg->ms_flags |= MSGF_DONE;
568 crit_exit_quick(port->mpu_td);
572 * lwkt_thread_putport() - Backend to lwkt_beginmsg()
574 * Called with the target port as an argument but in the context of the
575 * reply port. This function always implements an asynchronous put to
576 * the target message port, and thus returns EASYNC.
578 * The message must already have cleared MSGF_DONE and MSGF_REPLY
585 lwkt_thread_putport_remote(lwkt_msg_t msg)
587 lwkt_port_t port = msg->ms_target_port;
590 * Chase any thread migration that occurs
592 if (port->mpu_td->td_gd != mycpu) {
593 lwkt_send_ipiq(port->mpu_td->td_gd,
594 (ipifunc1_t)lwkt_thread_putport_remote, msg);
602 KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
603 msg->ms_flags &= ~MSGF_INTRANSIT;
605 _lwkt_pushmsg(port, msg);
606 if (port->mp_flags & MSGPORTF_WAITING)
607 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
614 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
616 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
618 msg->ms_target_port = port;
620 if (port->mpu_td->td_gd == mycpu) {
623 _lwkt_pushmsg(port, msg);
624 if (port->mp_flags & MSGPORTF_WAITING)
625 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
630 msg->ms_flags |= MSGF_INTRANSIT;
632 lwkt_send_ipiq(port->mpu_td->td_gd,
633 (ipifunc1_t)lwkt_thread_putport_remote, msg);
640 * lwkt_thread_getport()
642 * Retrieve the next message from the port or NULL if no messages
647 lwkt_thread_getport(lwkt_port_t port)
651 KKASSERT(port->mpu_td == curthread);
653 crit_enter_quick(port->mpu_td);
654 if ((msg = _lwkt_pollmsg(port)) != NULL)
655 _lwkt_pullmsg(port, msg);
656 crit_exit_quick(port->mpu_td);
661 * lwkt_thread_waitmsg()
663 * Wait for a particular message to be replied. We must be the only
664 * thread waiting on the message. The port must be owned by the
669 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
671 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
672 ("can't wait dropable message"));
674 if ((msg->ms_flags & MSGF_DONE) == 0) {
676 * If the done bit was not set we have to block until it is.
678 lwkt_port_t port = msg->ms_reply_port;
679 thread_t td = curthread;
682 KKASSERT(port->mpu_td == td);
683 crit_enter_quick(td);
686 while ((msg->ms_flags & MSGF_DONE) == 0) {
687 port->mp_flags |= MSGPORTF_WAITING;
688 if (sentabort == 0) {
689 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
693 lwkt_sleep("waitabt", 0);
695 port->mp_flags &= ~MSGPORTF_WAITING;
697 if (msg->ms_flags & MSGF_QUEUED)
698 _lwkt_pullmsg(port, msg);
702 * If the done bit was set we only have to mess around with the
703 * message if it is queued on the reply port.
705 if (msg->ms_flags & MSGF_QUEUED) {
706 lwkt_port_t port = msg->ms_reply_port;
707 thread_t td = curthread;
709 KKASSERT(port->mpu_td == td);
710 crit_enter_quick(td);
711 _lwkt_pullmsg(port, msg);
715 return(msg->ms_error);
719 * lwkt_thread_waitport()
721 * Wait for a new message to be available on the port. We must be the
722 * the only thread waiting on the port. The port must be owned by caller.
726 lwkt_thread_waitport(lwkt_port_t port, int flags)
728 thread_t td = curthread;
732 KKASSERT(port->mpu_td == td);
733 crit_enter_quick(td);
734 while ((msg = _lwkt_pollmsg(port)) == NULL) {
735 port->mp_flags |= MSGPORTF_WAITING;
736 error = lwkt_sleep("waitport", flags);
737 port->mp_flags &= ~MSGPORTF_WAITING;
741 _lwkt_pullmsg(port, msg);
747 /************************************************************************
748 * SPIN PORT BACKEND *
749 ************************************************************************
751 * This backend uses spinlocks instead of making assumptions about which
752 * thread is accessing the port. It must be used when a port is not owned
753 * by a particular thread. This is less optimal then thread ports but
754 * you don't have a choice if there are multiple threads accessing the port.
756 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
757 * on the message port, it is the responsibility of the code doing the
758 * wakeup to clear this flag rather then the blocked threads. Some
759 * superfluous wakeups may occur, which is ok.
761 * XXX synchronous message wakeups are not current optimized.
766 lwkt_spin_getport(lwkt_port_t port)
770 spin_lock(&port->mpu_spin);
771 if ((msg = _lwkt_pollmsg(port)) != NULL)
772 _lwkt_pullmsg(port, msg);
773 spin_unlock(&port->mpu_spin);
779 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
783 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
785 msg->ms_target_port = port;
786 spin_lock(&port->mpu_spin);
787 _lwkt_pushmsg(port, msg);
789 if (port->mp_flags & MSGPORTF_WAITING) {
790 port->mp_flags &= ~MSGPORTF_WAITING;
793 spin_unlock(&port->mpu_spin);
801 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
807 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
808 ("can't wait dropable message"));
810 if ((msg->ms_flags & MSGF_DONE) == 0) {
811 port = msg->ms_reply_port;
813 spin_lock(&port->mpu_spin);
814 while ((msg->ms_flags & MSGF_DONE) == 0) {
818 * If message was sent synchronously from the beginning
819 * the wakeup will be on the message structure, else it
820 * will be on the port structure.
822 if (msg->ms_flags & MSGF_SYNC) {
824 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
827 port->mp_flags |= MSGPORTF_WAITING;
831 * Only messages which support abort can be interrupted.
832 * We must still wait for message completion regardless.
834 if ((flags & PCATCH) && sentabort == 0) {
835 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
838 spin_unlock(&port->mpu_spin);
840 spin_lock(&port->mpu_spin);
843 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
845 /* see note at the top on the MSGPORTF_WAITING flag */
848 * Turn EINTR into ERESTART if the signal indicates.
850 if (sentabort && msg->ms_error == EINTR)
851 msg->ms_error = sentabort;
852 if (msg->ms_flags & MSGF_QUEUED)
853 _lwkt_pullmsg(port, msg);
854 spin_unlock(&port->mpu_spin);
856 if (msg->ms_flags & MSGF_QUEUED) {
857 port = msg->ms_reply_port;
858 spin_lock(&port->mpu_spin);
859 _lwkt_pullmsg(port, msg);
860 spin_unlock(&port->mpu_spin);
863 return(msg->ms_error);
868 lwkt_spin_waitport(lwkt_port_t port, int flags)
873 spin_lock(&port->mpu_spin);
874 while ((msg = _lwkt_pollmsg(port)) == NULL) {
875 port->mp_flags |= MSGPORTF_WAITING;
876 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
877 /* see note at the top on the MSGPORTF_WAITING flag */
879 spin_unlock(&port->mpu_spin);
883 _lwkt_pullmsg(port, msg);
884 spin_unlock(&port->mpu_spin);
890 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
894 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
896 if (msg->ms_flags & MSGF_SYNC) {
898 * If a synchronous completion has been requested, just wakeup
899 * the message without bothering to queue it to the target port.
901 spin_lock(&port->mpu_spin);
902 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
904 if (msg->ms_flags & MSGF_WAITING) {
905 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
908 spin_unlock(&port->mpu_spin);
913 * If an asynchronous completion has been requested the message
914 * must be queued to the reply port.
916 spin_lock(&port->mpu_spin);
917 _lwkt_enqueue_reply(port, msg);
919 if (port->mp_flags & MSGPORTF_WAITING) {
920 port->mp_flags &= ~MSGPORTF_WAITING;
923 spin_unlock(&port->mpu_spin);
930 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
932 * This function could _only_ be used when caller is in the same thread
933 * as the message's target port owner thread.
936 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
938 KASSERT(port->mpu_td == curthread,
939 ("message could only be dropped in the same thread "
940 "as the message target port thread\n"));
941 spin_lock(&port->mpu_spin);
942 _lwkt_pullmsg(port, msg);
943 msg->ms_flags |= MSGF_DONE;
944 spin_unlock(&port->mpu_spin);
947 /************************************************************************
948 * SERIALIZER PORT BACKEND *
949 ************************************************************************
951 * This backend uses serializer to protect port accessing. Callers are
952 * assumed to have serializer held. This kind of port is usually created
953 * by network device driver along with _one_ lwkt thread to pipeline
954 * operations which may temporarily release serializer.
956 * Implementation is based on SPIN PORT BACKEND.
961 lwkt_serialize_getport(lwkt_port_t port)
965 ASSERT_SERIALIZED(port->mpu_serialize);
967 if ((msg = _lwkt_pollmsg(port)) != NULL)
968 _lwkt_pullmsg(port, msg);
974 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
976 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
977 ASSERT_SERIALIZED(port->mpu_serialize);
979 msg->ms_target_port = port;
980 _lwkt_pushmsg(port, msg);
981 if (port->mp_flags & MSGPORTF_WAITING) {
982 port->mp_flags &= ~MSGPORTF_WAITING;
990 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
996 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
997 ("can't wait dropable message"));
999 if ((msg->ms_flags & MSGF_DONE) == 0) {
1000 port = msg->ms_reply_port;
1002 ASSERT_SERIALIZED(port->mpu_serialize);
1005 while ((msg->ms_flags & MSGF_DONE) == 0) {
1009 * If message was sent synchronously from the beginning
1010 * the wakeup will be on the message structure, else it
1011 * will be on the port structure.
1013 if (msg->ms_flags & MSGF_SYNC) {
1017 port->mp_flags |= MSGPORTF_WAITING;
1021 * Only messages which support abort can be interrupted.
1022 * We must still wait for message completion regardless.
1024 if ((flags & PCATCH) && sentabort == 0) {
1025 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1028 lwkt_serialize_exit(port->mpu_serialize);
1030 lwkt_serialize_enter(port->mpu_serialize);
1033 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1035 /* see note at the top on the MSGPORTF_WAITING flag */
1038 * Turn EINTR into ERESTART if the signal indicates.
1040 if (sentabort && msg->ms_error == EINTR)
1041 msg->ms_error = sentabort;
1042 if (msg->ms_flags & MSGF_QUEUED)
1043 _lwkt_pullmsg(port, msg);
1045 if (msg->ms_flags & MSGF_QUEUED) {
1046 port = msg->ms_reply_port;
1048 ASSERT_SERIALIZED(port->mpu_serialize);
1049 _lwkt_pullmsg(port, msg);
1052 return(msg->ms_error);
1057 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1062 ASSERT_SERIALIZED(port->mpu_serialize);
1064 while ((msg = _lwkt_pollmsg(port)) == NULL) {
1065 port->mp_flags |= MSGPORTF_WAITING;
1066 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1067 /* see note at the top on the MSGPORTF_WAITING flag */
1071 _lwkt_pullmsg(port, msg);
1077 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1079 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1080 ASSERT_SERIALIZED(port->mpu_serialize);
1082 if (msg->ms_flags & MSGF_SYNC) {
1084 * If a synchronous completion has been requested, just wakeup
1085 * the message without bothering to queue it to the target port.
1087 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1091 * If an asynchronous completion has been requested the message
1092 * must be queued to the reply port.
1094 _lwkt_enqueue_reply(port, msg);
1095 if (port->mp_flags & MSGPORTF_WAITING) {
1096 port->mp_flags &= ~MSGPORTF_WAITING;
1102 /************************************************************************
1103 * PANIC AND SPECIAL PORT FUNCTIONS *
1104 ************************************************************************/
1107 * You can point a port's reply vector at this function if you just want
1108 * the message marked done, without any queueing or signaling. This is
1109 * often used for structure-embedded messages.
1113 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1115 msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1120 lwkt_panic_getport(lwkt_port_t port)
1122 panic("lwkt_getport() illegal on port %p", port);
1127 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1129 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1134 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1136 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1141 lwkt_panic_waitport(lwkt_port_t port, int flags)
1143 panic("port %p cannot be waited on", port);
1148 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1150 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1155 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1157 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);