Merge from vendor branch FILE:
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.42 2007/05/24 20:51:16 dillon Exp $
38  */
39
40 #ifdef _KERNEL
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/proc.h>
46 #include <sys/rtprio.h>
47 #include <sys/queue.h>
48 #include <sys/sysctl.h>
49 #include <sys/kthread.h>
50 #include <sys/signalvar.h>
51 #include <sys/signal2.h>
52 #include <machine/cpu.h>
53 #include <sys/lock.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_kern.h>
58 #include <vm/vm_object.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_pager.h>
62 #include <vm/vm_extern.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/thread2.h>
66 #include <sys/msgport2.h>
67 #include <sys/spinlock2.h>
68
69 #include <machine/stdarg.h>
70 #include <machine/cpufunc.h>
71 #ifdef SMP
72 #include <machine/smp.h>
73 #endif
74
75 #include <sys/malloc.h>
76 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77
78 #else
79
80 #include <sys/stdint.h>
81 #include <libcaps/thread.h>
82 #include <sys/thread.h>
83 #include <sys/msgport.h>
84 #include <sys/errno.h>
85 #include <libcaps/globaldata.h>
86 #include <machine/cpufunc.h>
87 #include <sys/thread2.h>
88 #include <sys/msgport2.h>
89 #include <string.h>
90
91 #endif /* _KERNEL */
92
93
94 /************************************************************************
95  *                              MESSAGE FUNCTIONS                       *
96  ************************************************************************/
97
98 /*
99  * lwkt_sendmsg()
100  *
101  *      Request asynchronous completion and call lwkt_beginmsg().  The
102  *      target port can opt to execute the message synchronously or
103  *      asynchronously and this function will automatically queue the
104  *      response if the target executes the message synchronously.
105  *
106  *      NOTE: The message is in an indeterminant state until this call
107  *      returns.  The caller should not mess with it (e.g. try to abort it)
108  *      until then.
109  */
110 void
111 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
112 {
113     int error;
114
115     KKASSERT(msg->ms_reply_port != NULL &&
116              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
117     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
118     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
119         lwkt_replymsg(msg, error);
120     }
121 }
122
123 /*
124  * lwkt_domsg()
125  *
126  *      Request asynchronous completion and call lwkt_beginmsg().  The
127  *      target port can opt to execute the message synchronously or
128  *      asynchronously and this function will automatically queue the
129  *      response if the target executes the message synchronously.
130  */
131 int
132 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
133 {
134     int error;
135
136     KKASSERT(msg->ms_reply_port != NULL &&
137              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
138     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
139     msg->ms_flags |= MSGF_SYNC;
140     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
141         error = lwkt_waitmsg(msg, flags);
142     } else {
143         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
144     }
145     return(error);
146 }
147
148 /*
149  * lwkt_forwardmsg()
150  *
151  * Forward a message received on one port to another port.
152  */
153 int
154 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
155 {   
156     int error;
157
158     crit_enter();
159     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
160     if ((error = port->mp_putport(port, msg)) != EASYNC)
161         lwkt_replymsg(msg, error);
162     crit_exit();
163     return(error);
164 }
165
166 /*
167  * lwkt_abortmsg()
168  *
169  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
170  * The caller must ensure that the message will not be both replied AND
171  * destroyed while the abort is in progress.
172  *
173  * This function issues a callback which might block!
174  */
175 void
176 lwkt_abortmsg(lwkt_msg_t msg)
177 {
178     /*
179      * A critical section protects us from reply IPIs on this cpu.
180      */
181     crit_enter();
182
183     /*
184      * Shortcut the operation if the message has already been returned.
185      * The callback typically constructs a lwkt_msg with the abort request,
186      * issues it synchronously, and waits for completion.  The callback
187      * is not required to actually abort the message and the target port,
188      * upon receiving an abort request message generated by the callback
189      * should check whether the original message has already completed or
190      * not.
191      */
192     if (msg->ms_flags & MSGF_ABORTABLE) {
193         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
194             msg->ms_abortfn(msg);
195     }
196     crit_exit();
197 }
198
199 /************************************************************************
200  *                      PORT INITIALIZATION API                         *
201  ************************************************************************/
202
203 static void *lwkt_thread_getport(lwkt_port_t port);
204 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
205 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
206 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
207 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
208
209 static void *lwkt_spin_getport(lwkt_port_t port);
210 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
211 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
212 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
213 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
214
215 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
216 static void *lwkt_panic_getport(lwkt_port_t port);
217 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
218 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
219 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
220 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
221
222 /*
223  * Core port initialization (internal)
224  */
225 static __inline
226 void
227 _lwkt_initport(lwkt_port_t port,
228                void *(*gportfn)(lwkt_port_t),
229                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
230                int (*wmsgfn)(lwkt_msg_t, int),
231                void *(*wportfn)(lwkt_port_t, int),
232                void (*rportfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     port->mp_getport = gportfn;
237     port->mp_putport = pportfn;
238     port->mp_waitmsg =  wmsgfn;
239     port->mp_waitport =  wportfn;
240     port->mp_replyport = rportfn;
241 }
242
243 /*
244  * lwkt_initport_thread()
245  *
246  *      Initialize a port for use by a particular thread.  The port may
247  *      only be used by <td>.
248  */
249 void
250 lwkt_initport_thread(lwkt_port_t port, thread_t td)
251 {
252     _lwkt_initport(port,
253                    lwkt_thread_getport,
254                    lwkt_thread_putport,
255                    lwkt_thread_waitmsg,
256                    lwkt_thread_waitport,
257                    lwkt_thread_replyport);
258     port->mpu_td = td;
259 }
260
261 /*
262  * lwkt_initport_spin()
263  *
264  *      Initialize a port for use with descriptors that might be accessed
265  *      via multiple LWPs, processes, or threads.  Has somewhat more
266  *      overhead then thread ports.
267  */
268 void
269 lwkt_initport_spin(lwkt_port_t port)
270 {
271     _lwkt_initport(port,
272                    lwkt_spin_getport,
273                    lwkt_spin_putport,
274                    lwkt_spin_waitmsg,
275                    lwkt_spin_waitport,
276                    lwkt_spin_replyport);
277     spin_init(&port->mpu_spin);
278 }
279
280 /*
281  * Similar to the standard initport, this function simply marks the message
282  * as being done and does not attempt to return it to an originating port.
283  */
284 void
285 lwkt_initport_replyonly_null(lwkt_port_t port)
286 {
287     _lwkt_initport(port,
288                    lwkt_panic_getport,
289                    lwkt_panic_putport,
290                    lwkt_panic_waitmsg,
291                    lwkt_panic_waitport,
292                    lwkt_null_replyport);
293 }
294
295 /*
296  * Initialize a reply-only port, typically used as a message sink.  Such
297  * ports can only be used as a reply port.
298  */
299 void
300 lwkt_initport_replyonly(lwkt_port_t port,
301                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
302 {
303     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
304                          lwkt_panic_waitmsg, lwkt_panic_waitport,
305                          rportfn);
306 }
307
308 void
309 lwkt_initport_putonly(lwkt_port_t port,
310                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
311 {
312     _lwkt_initport(port, lwkt_panic_getport, pportfn,
313                          lwkt_panic_waitmsg, lwkt_panic_waitport,
314                          lwkt_panic_replyport);
315 }
316
317 void
318 lwkt_initport_panic(lwkt_port_t port)
319 {
320     _lwkt_initport(port,
321                    lwkt_panic_getport, lwkt_panic_putport,
322                    lwkt_panic_waitmsg, lwkt_panic_waitport,
323                    lwkt_panic_replyport);
324 }
325
326 /*
327  * lwkt_getport()
328  *
329  *      Retrieve the next message from the port's message queue, return NULL
330  *      if no messages are pending.  The retrieved message will either be a
331  *      request or a reply based on the MSGF_REPLY bit.
332  *
333  *      The calling thread MUST own the port.
334  */
335
336 static __inline
337 void
338 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
339 {
340     /*
341      * normal case, remove and return the message.
342      */
343     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
344     msg->ms_flags &= ~MSGF_QUEUED;
345 }
346
347 /************************************************************************
348  *                      THREAD PORT BACKEND                             *
349  ************************************************************************
350  *
351  * This backend is used when the port a message is retrieved from is owned
352  * by a single thread (the calling thread).  Messages are IPId to the
353  * correct cpu before being enqueued to a port.  Note that this is fairly
354  * optimal since scheduling would have had to do an IPI anyway if the
355  * message were headed to a different cpu.
356  */
357
358 #ifdef SMP
359
360 /*
361  * This function completes reply processing for the default case in the
362  * context of the originating cpu.
363  */
364 static
365 void
366 lwkt_thread_replyport_remote(lwkt_msg_t msg)
367 {
368     lwkt_port_t port = msg->ms_reply_port;
369
370     /*
371      * Chase any thread migration that occurs
372      */
373     if (port->mpu_td->td_gd != mycpu) {
374         lwkt_send_ipiq(port->mpu_td->td_gd,
375                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
376         return;
377     }
378
379     /*
380      * Cleanup
381      */
382 #ifdef INVARIANTS
383     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
384     msg->ms_flags &= ~MSGF_INTRANSIT;
385 #endif
386     if (msg->ms_flags & MSGF_SYNC) {
387             msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
388     } else {
389             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
390             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
391     }
392     if (port->mp_flags & MSGPORTF_WAITING)
393         lwkt_schedule(port->mpu_td);
394 }
395
396 #endif
397
398 /*
399  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
400  *
401  * Called with the reply port as an argument but in the context of the
402  * original target port.  Completion must occur on the target port's
403  * cpu.
404  *
405  * The critical section protects us from IPIs on the this CPU.
406  */
407 void
408 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
409 {
410     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
411
412     if (msg->ms_flags & MSGF_SYNC) {
413         /*
414          * If a synchronous completion has been requested, just wakeup
415          * the message without bothering to queue it to the target port.
416          *
417          * Assume the target thread is non-preemptive, so no critical
418          * section is required.
419          */
420 #ifdef SMP
421         if (port->mpu_td->td_gd == mycpu) {
422 #endif
423             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
424             if (port->mp_flags & MSGPORTF_WAITING)
425                 lwkt_schedule(port->mpu_td);
426 #ifdef SMP
427         } else {
428 #ifdef INVARIANTS
429             msg->ms_flags |= MSGF_INTRANSIT;
430 #endif
431             msg->ms_flags |= MSGF_REPLY;
432             lwkt_send_ipiq(port->mpu_td->td_gd,
433                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
434         }
435 #endif
436     } else {
437         /*
438          * If an asynchronous completion has been requested the message
439          * must be queued to the reply port.  MSGF_REPLY cannot be set
440          * until the message actually gets queued.
441          *
442          * A critical section is required to interlock the port queue.
443          */
444 #ifdef SMP
445         if (port->mpu_td->td_gd == mycpu) {
446 #endif
447             crit_enter();
448             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
449             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
450             if (port->mp_flags & MSGPORTF_WAITING)
451                 lwkt_schedule(port->mpu_td);
452             crit_exit();
453 #ifdef SMP
454         } else {
455 #ifdef INVARIANTS
456             msg->ms_flags |= MSGF_INTRANSIT;
457 #endif
458             msg->ms_flags |= MSGF_REPLY;
459             lwkt_send_ipiq(port->mpu_td->td_gd,
460                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
461         }
462 #endif
463     }
464 }
465
466 /*
467  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
468  *
469  * Called with the target port as an argument but in the context of the
470  * reply port.  This function always implements an asynchronous put to
471  * the target message port, and thus returns EASYNC.
472  *
473  * The message must already have cleared MSGF_DONE and MSGF_REPLY
474  */
475
476 #ifdef SMP
477
478 static
479 void
480 lwkt_thread_putport_remote(lwkt_msg_t msg)
481 {
482     lwkt_port_t port = msg->ms_target_port;
483
484     /*
485      * Chase any thread migration that occurs
486      */
487     if (port->mpu_td->td_gd != mycpu) {
488         lwkt_send_ipiq(port->mpu_td->td_gd,
489                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
490         return;
491     }
492
493     /*
494      * Cleanup
495      */
496 #ifdef INVARIANTS
497     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
498     msg->ms_flags &= ~MSGF_INTRANSIT;
499 #endif
500     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
501     msg->ms_flags |= MSGF_QUEUED;
502     if (port->mp_flags & MSGPORTF_WAITING)
503         lwkt_schedule(port->mpu_td);
504 }
505
506 #endif
507
508 static
509 int
510 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
511 {
512     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
513
514     msg->ms_target_port = port;
515 #ifdef SMP
516     if (port->mpu_td->td_gd == mycpu) {
517 #endif
518         crit_enter();
519         msg->ms_flags |= MSGF_QUEUED;
520         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
521         if (port->mp_flags & MSGPORTF_WAITING)
522             lwkt_schedule(port->mpu_td);
523         crit_exit();
524 #ifdef SMP
525     } else {
526 #ifdef INVARIANTS
527         msg->ms_flags |= MSGF_INTRANSIT;
528 #endif
529         lwkt_send_ipiq(port->mpu_td->td_gd,
530                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
531     }
532 #endif
533     return (EASYNC);
534 }
535
536 /*
537  * lwkt_thread_getport()
538  *
539  *      Retrieve the next message from the port or NULL if no messages
540  *      are ready.
541  */
542 void *
543 lwkt_thread_getport(lwkt_port_t port)
544 {
545     lwkt_msg_t msg;
546
547     KKASSERT(port->mpu_td == curthread);
548
549     crit_enter_quick(port->mpu_td);
550     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
551         _lwkt_pullmsg(port, msg);
552     crit_exit_quick(port->mpu_td);
553     return(msg);
554 }
555
556 /*
557  * lwkt_thread_waitmsg()
558  *
559  *      Wait for a particular message to be replied.  We must be the only
560  *      thread waiting on the message.  The port must be owned by the
561  *      caller.
562  */
563 int
564 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
565 {
566     if ((msg->ms_flags & MSGF_DONE) == 0) {
567         /*
568          * If the done bit was not set we have to block until it is.
569          */
570         lwkt_port_t port = msg->ms_reply_port;
571         thread_t td = curthread;
572         int sentabort;
573
574         KKASSERT(port->mpu_td == td);
575         KKASSERT(msg->ms_reply_port == port);
576         crit_enter_quick(td);
577         sentabort = 0;
578
579         while ((msg->ms_flags & MSGF_DONE) == 0) {
580             port->mp_flags |= MSGPORTF_WAITING;
581             if (sentabort == 0) {
582                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
583                     lwkt_abortmsg(msg);
584                 }
585             } else {
586                 lwkt_sleep("waitabt", 0);
587             }
588             port->mp_flags &= ~MSGPORTF_WAITING;
589         }
590         if (msg->ms_flags & MSGF_QUEUED)
591             _lwkt_pullmsg(port, msg);
592         crit_exit_quick(td);
593     } else {
594         /*
595          * If the done bit was set we only have to mess around with the
596          * message if it is queued on the reply port.
597          */
598         if (msg->ms_flags & MSGF_QUEUED) {
599             lwkt_port_t port = msg->ms_reply_port;
600             thread_t td = curthread;
601
602             KKASSERT(port->mpu_td == td);
603             KKASSERT(msg->ms_reply_port == port);
604             crit_enter_quick(td);
605             _lwkt_pullmsg(port, msg);
606             crit_exit_quick(td);
607         }
608     }
609     return(msg->ms_error);
610 }
611
612 void *
613 lwkt_thread_waitport(lwkt_port_t port, int flags)
614 {
615     thread_t td = curthread;
616     lwkt_msg_t msg;
617     int error;
618
619     KKASSERT(port->mpu_td == td);
620     crit_enter_quick(td);
621     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
622         port->mp_flags |= MSGPORTF_WAITING;
623         error = lwkt_sleep("waitport", flags);
624         port->mp_flags &= ~MSGPORTF_WAITING;
625         if (error)
626                 goto done;
627     }
628     _lwkt_pullmsg(port, msg);
629 done:
630     crit_exit_quick(td);
631     return(msg);
632 }
633
634 /************************************************************************
635  *                         SPIN PORT BACKEND                            *
636  ************************************************************************
637  *
638  * This backend uses spinlocks instead of making assumptions about which
639  * thread is accessing the port.  It must be used when a port is not owned
640  * by a particular thread.  This is less optimal then thread ports but
641  * you don't have a choice if there are multiple threads accessing the port.
642  *
643  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
644  * on the message port, it is the responsibility of the code doing the
645  * wakeup to clear this flag rather then the blocked threads.  Some
646  * superfluous wakeups may occur, which is ok.
647  *
648  * XXX synchronous message wakeups are not current optimized.
649  */
650
651 static
652 void *
653 lwkt_spin_getport(lwkt_port_t port)
654 {
655     lwkt_msg_t msg;
656
657     spin_lock_wr(&port->mpu_spin);
658     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
659         _lwkt_pullmsg(port, msg);
660     spin_unlock_wr(&port->mpu_spin);
661     return(msg);
662 }
663
664 static
665 int
666 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
667 {
668     int dowakeup;
669
670     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
671
672     msg->ms_target_port = port;
673     spin_lock_wr(&port->mpu_spin);
674     msg->ms_flags |= MSGF_QUEUED;
675     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
676     dowakeup = 0;
677     if (port->mp_flags & MSGPORTF_WAITING) {
678         port->mp_flags &= ~MSGPORTF_WAITING;
679         dowakeup = 1;
680     }
681     spin_unlock_wr(&port->mpu_spin);
682     if (dowakeup)
683         wakeup(port);
684     return (EASYNC);
685 }
686
687 static
688 int
689 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
690 {
691     lwkt_port_t port;
692     int sentabort;
693     int error;
694
695     if ((msg->ms_flags & MSGF_DONE) == 0) {
696         port = msg->ms_reply_port;
697         sentabort = 0;
698         spin_lock_wr(&port->mpu_spin);
699         while ((msg->ms_flags & MSGF_DONE) == 0) {
700             void *won;
701
702             /*
703              * If message was sent synchronously from the beginning
704              * the wakeup will be on the message structure, else it
705              * will be on the port structure.
706              */
707             if (msg->ms_flags & MSGF_SYNC) {
708                 won = msg;
709             } else {
710                 won = port;
711                 port->mp_flags |= MSGPORTF_WAITING;
712             }
713
714             /*
715              * Only messages which support abort can be interrupted.
716              * We must still wait for message completion regardless.
717              */
718             if ((flags & PCATCH) && sentabort == 0) {
719                 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
720                 if (error) {
721                     sentabort = error;
722                     spin_unlock_wr(&port->mpu_spin);
723                     lwkt_abortmsg(msg);
724                     spin_lock_wr(&port->mpu_spin);
725                 }
726             } else {
727                 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
728             }
729             /* see note at the top on the MSGPORTF_WAITING flag */
730         }
731         /*
732          * Turn EINTR into ERESTART if the signal indicates.
733          */
734         if (sentabort && msg->ms_error == EINTR)
735             msg->ms_error = sentabort;
736         if (msg->ms_flags & MSGF_QUEUED)
737                 _lwkt_pullmsg(port, msg);
738         spin_unlock_wr(&port->mpu_spin);
739     } else {
740         if (msg->ms_flags & MSGF_QUEUED) {
741             port = msg->ms_reply_port;
742             spin_lock_wr(&port->mpu_spin);
743             _lwkt_pullmsg(port, msg);
744             spin_unlock_wr(&port->mpu_spin);
745         }
746     }
747     return(msg->ms_error);
748 }
749
750 static
751 void *
752 lwkt_spin_waitport(lwkt_port_t port, int flags)
753 {
754     lwkt_msg_t msg;
755     int error;
756
757     spin_lock_wr(&port->mpu_spin);
758     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
759         port->mp_flags |= MSGPORTF_WAITING;
760         error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
761         /* see note at the top on the MSGPORTF_WAITING flag */
762         if (error) {
763             spin_unlock_wr(&port->mpu_spin);
764             return(NULL);
765         }
766     }
767     _lwkt_pullmsg(port, msg);
768     spin_unlock_wr(&port->mpu_spin);
769     return(msg);
770 }
771
772 static
773 void
774 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
775 {
776     int dowakeup;
777
778     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
779
780     if (msg->ms_flags & MSGF_SYNC) {
781         /*
782          * If a synchronous completion has been requested, just wakeup
783          * the message without bothering to queue it to the target port.
784          */
785         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
786         wakeup(msg);
787     } else {
788         /*
789          * If an asynchronous completion has been requested the message
790          * must be queued to the reply port.  MSGF_REPLY cannot be set
791          * until the message actually gets queued.
792          */
793         spin_lock_wr(&port->mpu_spin);
794         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
795         msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
796         dowakeup = 0;
797         if (port->mp_flags & MSGPORTF_WAITING) {
798             port->mp_flags &= ~MSGPORTF_WAITING;
799             dowakeup = 1;
800         }
801         spin_unlock_wr(&port->mpu_spin);
802         if (dowakeup)
803             wakeup(port);
804     }
805 }
806
807 /************************************************************************
808  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
809  ************************************************************************/
810
811 /*
812  * You can point a port's reply vector at this function if you just want
813  * the message marked done, without any queueing or signaling.  This is
814  * often used for structure-embedded messages.
815  */
816 static
817 void
818 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
819 {
820     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
821 }
822
823 static
824 void *
825 lwkt_panic_getport(lwkt_port_t port)
826 {
827     panic("lwkt_getport() illegal on port %p", port);
828 }
829
830 static
831 int
832 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
833 {
834     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
835 }
836
837 static
838 int
839 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
840 {
841     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
842 }
843
844 static
845 void *
846 lwkt_panic_waitport(lwkt_port_t port, int flags)
847 {
848     panic("port %p cannot be waited on", port);
849 }
850
851 static
852 void
853 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
854 {
855     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
856 }
857