31b7aa69762fbd7f61aed438ec3a777c3f3def7a
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.43 2007/05/31 11:00:25 sephe Exp $
38  */
39
40 #ifdef _KERNEL
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/proc.h>
46 #include <sys/rtprio.h>
47 #include <sys/queue.h>
48 #include <sys/sysctl.h>
49 #include <sys/kthread.h>
50 #include <sys/signalvar.h>
51 #include <sys/signal2.h>
52 #include <machine/cpu.h>
53 #include <sys/lock.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_kern.h>
58 #include <vm/vm_object.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_pager.h>
62 #include <vm/vm_extern.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/thread2.h>
66 #include <sys/msgport2.h>
67 #include <sys/spinlock2.h>
68
69 #include <machine/stdarg.h>
70 #include <machine/cpufunc.h>
71 #ifdef SMP
72 #include <machine/smp.h>
73 #endif
74
75 #include <sys/malloc.h>
76 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77
78 #else
79
80 #include <sys/stdint.h>
81 #include <libcaps/thread.h>
82 #include <sys/thread.h>
83 #include <sys/msgport.h>
84 #include <sys/errno.h>
85 #include <libcaps/globaldata.h>
86 #include <machine/cpufunc.h>
87 #include <sys/thread2.h>
88 #include <sys/msgport2.h>
89 #include <string.h>
90
91 #endif /* _KERNEL */
92
93
94 /************************************************************************
95  *                              MESSAGE FUNCTIONS                       *
96  ************************************************************************/
97
98 /*
99  * lwkt_sendmsg()
100  *
101  *      Request asynchronous completion and call lwkt_beginmsg().  The
102  *      target port can opt to execute the message synchronously or
103  *      asynchronously and this function will automatically queue the
104  *      response if the target executes the message synchronously.
105  *
106  *      NOTE: The message is in an indeterminant state until this call
107  *      returns.  The caller should not mess with it (e.g. try to abort it)
108  *      until then.
109  */
110 void
111 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
112 {
113     int error;
114
115     KKASSERT(msg->ms_reply_port != NULL &&
116              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
117     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
118     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
119         lwkt_replymsg(msg, error);
120     }
121 }
122
123 /*
124  * lwkt_domsg()
125  *
126  *      Request asynchronous completion and call lwkt_beginmsg().  The
127  *      target port can opt to execute the message synchronously or
128  *      asynchronously and this function will automatically queue the
129  *      response if the target executes the message synchronously.
130  */
131 int
132 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
133 {
134     int error;
135
136     KKASSERT(msg->ms_reply_port != NULL &&
137              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
138     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
139     msg->ms_flags |= MSGF_SYNC;
140     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
141         error = lwkt_waitmsg(msg, flags);
142     } else {
143         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
144     }
145     return(error);
146 }
147
148 /*
149  * lwkt_forwardmsg()
150  *
151  * Forward a message received on one port to another port.
152  */
153 int
154 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
155 {   
156     int error;
157
158     crit_enter();
159     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
160     if ((error = port->mp_putport(port, msg)) != EASYNC)
161         lwkt_replymsg(msg, error);
162     crit_exit();
163     return(error);
164 }
165
166 /*
167  * lwkt_abortmsg()
168  *
169  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
170  * The caller must ensure that the message will not be both replied AND
171  * destroyed while the abort is in progress.
172  *
173  * This function issues a callback which might block!
174  */
175 void
176 lwkt_abortmsg(lwkt_msg_t msg)
177 {
178     /*
179      * A critical section protects us from reply IPIs on this cpu.
180      */
181     crit_enter();
182
183     /*
184      * Shortcut the operation if the message has already been returned.
185      * The callback typically constructs a lwkt_msg with the abort request,
186      * issues it synchronously, and waits for completion.  The callback
187      * is not required to actually abort the message and the target port,
188      * upon receiving an abort request message generated by the callback
189      * should check whether the original message has already completed or
190      * not.
191      */
192     if (msg->ms_flags & MSGF_ABORTABLE) {
193         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
194             msg->ms_abortfn(msg);
195     }
196     crit_exit();
197 }
198
199 /************************************************************************
200  *                      PORT INITIALIZATION API                         *
201  ************************************************************************/
202
203 static void *lwkt_thread_getport(lwkt_port_t port);
204 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
205 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
206 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
207 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
208
209 static void *lwkt_spin_getport(lwkt_port_t port);
210 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
211 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
212 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
213 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
214
215 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
216 static void *lwkt_panic_getport(lwkt_port_t port);
217 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
218 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
219 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
220 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
221
222 /*
223  * Core port initialization (internal)
224  */
225 static __inline
226 void
227 _lwkt_initport(lwkt_port_t port,
228                void *(*gportfn)(lwkt_port_t),
229                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
230                int (*wmsgfn)(lwkt_msg_t, int),
231                void *(*wportfn)(lwkt_port_t, int),
232                void (*rportfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     port->mp_getport = gportfn;
237     port->mp_putport = pportfn;
238     port->mp_waitmsg =  wmsgfn;
239     port->mp_waitport =  wportfn;
240     port->mp_replyport = rportfn;
241 }
242
243 /*
244  * lwkt_initport_thread()
245  *
246  *      Initialize a port for use by a particular thread.  The port may
247  *      only be used by <td>.
248  */
249 void
250 lwkt_initport_thread(lwkt_port_t port, thread_t td)
251 {
252     _lwkt_initport(port,
253                    lwkt_thread_getport,
254                    lwkt_thread_putport,
255                    lwkt_thread_waitmsg,
256                    lwkt_thread_waitport,
257                    lwkt_thread_replyport);
258     port->mpu_td = td;
259 }
260
261 /*
262  * lwkt_initport_spin()
263  *
264  *      Initialize a port for use with descriptors that might be accessed
265  *      via multiple LWPs, processes, or threads.  Has somewhat more
266  *      overhead then thread ports.
267  */
268 void
269 lwkt_initport_spin(lwkt_port_t port)
270 {
271     _lwkt_initport(port,
272                    lwkt_spin_getport,
273                    lwkt_spin_putport,
274                    lwkt_spin_waitmsg,
275                    lwkt_spin_waitport,
276                    lwkt_spin_replyport);
277     spin_init(&port->mpu_spin);
278 }
279
280 /*
281  * Similar to the standard initport, this function simply marks the message
282  * as being done and does not attempt to return it to an originating port.
283  */
284 void
285 lwkt_initport_replyonly_null(lwkt_port_t port)
286 {
287     _lwkt_initport(port,
288                    lwkt_panic_getport,
289                    lwkt_panic_putport,
290                    lwkt_panic_waitmsg,
291                    lwkt_panic_waitport,
292                    lwkt_null_replyport);
293 }
294
295 /*
296  * Initialize a reply-only port, typically used as a message sink.  Such
297  * ports can only be used as a reply port.
298  */
299 void
300 lwkt_initport_replyonly(lwkt_port_t port,
301                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
302 {
303     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
304                          lwkt_panic_waitmsg, lwkt_panic_waitport,
305                          rportfn);
306 }
307
308 void
309 lwkt_initport_putonly(lwkt_port_t port,
310                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
311 {
312     _lwkt_initport(port, lwkt_panic_getport, pportfn,
313                          lwkt_panic_waitmsg, lwkt_panic_waitport,
314                          lwkt_panic_replyport);
315 }
316
317 void
318 lwkt_initport_panic(lwkt_port_t port)
319 {
320     _lwkt_initport(port,
321                    lwkt_panic_getport, lwkt_panic_putport,
322                    lwkt_panic_waitmsg, lwkt_panic_waitport,
323                    lwkt_panic_replyport);
324 }
325
326 /*
327  * lwkt_getport()
328  *
329  *      Retrieve the next message from the port's message queue, return NULL
330  *      if no messages are pending.  The retrieved message will either be a
331  *      request or a reply based on the MSGF_REPLY bit.
332  *
333  *      The calling thread MUST own the port.
334  */
335
336 static __inline
337 void
338 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
339 {
340     /*
341      * normal case, remove and return the message.
342      */
343     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
344     msg->ms_flags &= ~MSGF_QUEUED;
345 }
346
347 /************************************************************************
348  *                      THREAD PORT BACKEND                             *
349  ************************************************************************
350  *
351  * This backend is used when the port a message is retrieved from is owned
352  * by a single thread (the calling thread).  Messages are IPId to the
353  * correct cpu before being enqueued to a port.  Note that this is fairly
354  * optimal since scheduling would have had to do an IPI anyway if the
355  * message were headed to a different cpu.
356  */
357
358 #ifdef SMP
359
360 /*
361  * This function completes reply processing for the default case in the
362  * context of the originating cpu.
363  */
364 static
365 void
366 lwkt_thread_replyport_remote(lwkt_msg_t msg)
367 {
368     lwkt_port_t port = msg->ms_reply_port;
369
370     /*
371      * Chase any thread migration that occurs
372      */
373     if (port->mpu_td->td_gd != mycpu) {
374         lwkt_send_ipiq(port->mpu_td->td_gd,
375                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
376         return;
377     }
378
379     /*
380      * Cleanup
381      */
382 #ifdef INVARIANTS
383     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
384     msg->ms_flags &= ~MSGF_INTRANSIT;
385 #endif
386     if (msg->ms_flags & MSGF_SYNC) {
387             msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
388     } else {
389             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
390             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
391     }
392     if (port->mp_flags & MSGPORTF_WAITING)
393         lwkt_schedule(port->mpu_td);
394 }
395
396 #endif
397
398 /*
399  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
400  *
401  * Called with the reply port as an argument but in the context of the
402  * original target port.  Completion must occur on the target port's
403  * cpu.
404  *
405  * The critical section protects us from IPIs on the this CPU.
406  */
407 void
408 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
409 {
410     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
411
412     if (msg->ms_flags & MSGF_SYNC) {
413         /*
414          * If a synchronous completion has been requested, just wakeup
415          * the message without bothering to queue it to the target port.
416          *
417          * Assume the target thread is non-preemptive, so no critical
418          * section is required.
419          */
420 #ifdef SMP
421         if (port->mpu_td->td_gd == mycpu) {
422 #endif
423             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
424             if (port->mp_flags & MSGPORTF_WAITING)
425                 lwkt_schedule(port->mpu_td);
426 #ifdef SMP
427         } else {
428 #ifdef INVARIANTS
429             msg->ms_flags |= MSGF_INTRANSIT;
430 #endif
431             msg->ms_flags |= MSGF_REPLY;
432             lwkt_send_ipiq(port->mpu_td->td_gd,
433                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
434         }
435 #endif
436     } else {
437         /*
438          * If an asynchronous completion has been requested the message
439          * must be queued to the reply port.
440          *
441          * A critical section is required to interlock the port queue.
442          */
443 #ifdef SMP
444         if (port->mpu_td->td_gd == mycpu) {
445 #endif
446             crit_enter();
447             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
448             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
449             if (port->mp_flags & MSGPORTF_WAITING)
450                 lwkt_schedule(port->mpu_td);
451             crit_exit();
452 #ifdef SMP
453         } else {
454 #ifdef INVARIANTS
455             msg->ms_flags |= MSGF_INTRANSIT;
456 #endif
457             msg->ms_flags |= MSGF_REPLY;
458             lwkt_send_ipiq(port->mpu_td->td_gd,
459                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
460         }
461 #endif
462     }
463 }
464
465 /*
466  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
467  *
468  * Called with the target port as an argument but in the context of the
469  * reply port.  This function always implements an asynchronous put to
470  * the target message port, and thus returns EASYNC.
471  *
472  * The message must already have cleared MSGF_DONE and MSGF_REPLY
473  */
474
475 #ifdef SMP
476
477 static
478 void
479 lwkt_thread_putport_remote(lwkt_msg_t msg)
480 {
481     lwkt_port_t port = msg->ms_target_port;
482
483     /*
484      * Chase any thread migration that occurs
485      */
486     if (port->mpu_td->td_gd != mycpu) {
487         lwkt_send_ipiq(port->mpu_td->td_gd,
488                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
489         return;
490     }
491
492     /*
493      * Cleanup
494      */
495 #ifdef INVARIANTS
496     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
497     msg->ms_flags &= ~MSGF_INTRANSIT;
498 #endif
499     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
500     msg->ms_flags |= MSGF_QUEUED;
501     if (port->mp_flags & MSGPORTF_WAITING)
502         lwkt_schedule(port->mpu_td);
503 }
504
505 #endif
506
507 static
508 int
509 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
510 {
511     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
512
513     msg->ms_target_port = port;
514 #ifdef SMP
515     if (port->mpu_td->td_gd == mycpu) {
516 #endif
517         crit_enter();
518         msg->ms_flags |= MSGF_QUEUED;
519         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
520         if (port->mp_flags & MSGPORTF_WAITING)
521             lwkt_schedule(port->mpu_td);
522         crit_exit();
523 #ifdef SMP
524     } else {
525 #ifdef INVARIANTS
526         msg->ms_flags |= MSGF_INTRANSIT;
527 #endif
528         lwkt_send_ipiq(port->mpu_td->td_gd,
529                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
530     }
531 #endif
532     return (EASYNC);
533 }
534
535 /*
536  * lwkt_thread_getport()
537  *
538  *      Retrieve the next message from the port or NULL if no messages
539  *      are ready.
540  */
541 void *
542 lwkt_thread_getport(lwkt_port_t port)
543 {
544     lwkt_msg_t msg;
545
546     KKASSERT(port->mpu_td == curthread);
547
548     crit_enter_quick(port->mpu_td);
549     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
550         _lwkt_pullmsg(port, msg);
551     crit_exit_quick(port->mpu_td);
552     return(msg);
553 }
554
555 /*
556  * lwkt_thread_waitmsg()
557  *
558  *      Wait for a particular message to be replied.  We must be the only
559  *      thread waiting on the message.  The port must be owned by the
560  *      caller.
561  */
562 int
563 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
564 {
565     if ((msg->ms_flags & MSGF_DONE) == 0) {
566         /*
567          * If the done bit was not set we have to block until it is.
568          */
569         lwkt_port_t port = msg->ms_reply_port;
570         thread_t td = curthread;
571         int sentabort;
572
573         KKASSERT(port->mpu_td == td);
574         crit_enter_quick(td);
575         sentabort = 0;
576
577         while ((msg->ms_flags & MSGF_DONE) == 0) {
578             port->mp_flags |= MSGPORTF_WAITING;
579             if (sentabort == 0) {
580                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
581                     lwkt_abortmsg(msg);
582                 }
583             } else {
584                 lwkt_sleep("waitabt", 0);
585             }
586             port->mp_flags &= ~MSGPORTF_WAITING;
587         }
588         if (msg->ms_flags & MSGF_QUEUED)
589             _lwkt_pullmsg(port, msg);
590         crit_exit_quick(td);
591     } else {
592         /*
593          * If the done bit was set we only have to mess around with the
594          * message if it is queued on the reply port.
595          */
596         if (msg->ms_flags & MSGF_QUEUED) {
597             lwkt_port_t port = msg->ms_reply_port;
598             thread_t td = curthread;
599
600             KKASSERT(port->mpu_td == td);
601             crit_enter_quick(td);
602             _lwkt_pullmsg(port, msg);
603             crit_exit_quick(td);
604         }
605     }
606     return(msg->ms_error);
607 }
608
609 void *
610 lwkt_thread_waitport(lwkt_port_t port, int flags)
611 {
612     thread_t td = curthread;
613     lwkt_msg_t msg;
614     int error;
615
616     KKASSERT(port->mpu_td == td);
617     crit_enter_quick(td);
618     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
619         port->mp_flags |= MSGPORTF_WAITING;
620         error = lwkt_sleep("waitport", flags);
621         port->mp_flags &= ~MSGPORTF_WAITING;
622         if (error)
623                 goto done;
624     }
625     _lwkt_pullmsg(port, msg);
626 done:
627     crit_exit_quick(td);
628     return(msg);
629 }
630
631 /************************************************************************
632  *                         SPIN PORT BACKEND                            *
633  ************************************************************************
634  *
635  * This backend uses spinlocks instead of making assumptions about which
636  * thread is accessing the port.  It must be used when a port is not owned
637  * by a particular thread.  This is less optimal then thread ports but
638  * you don't have a choice if there are multiple threads accessing the port.
639  *
640  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
641  * on the message port, it is the responsibility of the code doing the
642  * wakeup to clear this flag rather then the blocked threads.  Some
643  * superfluous wakeups may occur, which is ok.
644  *
645  * XXX synchronous message wakeups are not current optimized.
646  */
647
648 static
649 void *
650 lwkt_spin_getport(lwkt_port_t port)
651 {
652     lwkt_msg_t msg;
653
654     spin_lock_wr(&port->mpu_spin);
655     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
656         _lwkt_pullmsg(port, msg);
657     spin_unlock_wr(&port->mpu_spin);
658     return(msg);
659 }
660
661 static
662 int
663 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
664 {
665     int dowakeup;
666
667     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
668
669     msg->ms_target_port = port;
670     spin_lock_wr(&port->mpu_spin);
671     msg->ms_flags |= MSGF_QUEUED;
672     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
673     dowakeup = 0;
674     if (port->mp_flags & MSGPORTF_WAITING) {
675         port->mp_flags &= ~MSGPORTF_WAITING;
676         dowakeup = 1;
677     }
678     spin_unlock_wr(&port->mpu_spin);
679     if (dowakeup)
680         wakeup(port);
681     return (EASYNC);
682 }
683
684 static
685 int
686 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
687 {
688     lwkt_port_t port;
689     int sentabort;
690     int error;
691
692     if ((msg->ms_flags & MSGF_DONE) == 0) {
693         port = msg->ms_reply_port;
694         sentabort = 0;
695         spin_lock_wr(&port->mpu_spin);
696         while ((msg->ms_flags & MSGF_DONE) == 0) {
697             void *won;
698
699             /*
700              * If message was sent synchronously from the beginning
701              * the wakeup will be on the message structure, else it
702              * will be on the port structure.
703              */
704             if (msg->ms_flags & MSGF_SYNC) {
705                 won = msg;
706             } else {
707                 won = port;
708                 port->mp_flags |= MSGPORTF_WAITING;
709             }
710
711             /*
712              * Only messages which support abort can be interrupted.
713              * We must still wait for message completion regardless.
714              */
715             if ((flags & PCATCH) && sentabort == 0) {
716                 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
717                 if (error) {
718                     sentabort = error;
719                     spin_unlock_wr(&port->mpu_spin);
720                     lwkt_abortmsg(msg);
721                     spin_lock_wr(&port->mpu_spin);
722                 }
723             } else {
724                 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
725             }
726             /* see note at the top on the MSGPORTF_WAITING flag */
727         }
728         /*
729          * Turn EINTR into ERESTART if the signal indicates.
730          */
731         if (sentabort && msg->ms_error == EINTR)
732             msg->ms_error = sentabort;
733         if (msg->ms_flags & MSGF_QUEUED)
734                 _lwkt_pullmsg(port, msg);
735         spin_unlock_wr(&port->mpu_spin);
736     } else {
737         if (msg->ms_flags & MSGF_QUEUED) {
738             port = msg->ms_reply_port;
739             spin_lock_wr(&port->mpu_spin);
740             _lwkt_pullmsg(port, msg);
741             spin_unlock_wr(&port->mpu_spin);
742         }
743     }
744     return(msg->ms_error);
745 }
746
747 static
748 void *
749 lwkt_spin_waitport(lwkt_port_t port, int flags)
750 {
751     lwkt_msg_t msg;
752     int error;
753
754     spin_lock_wr(&port->mpu_spin);
755     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
756         port->mp_flags |= MSGPORTF_WAITING;
757         error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
758         /* see note at the top on the MSGPORTF_WAITING flag */
759         if (error) {
760             spin_unlock_wr(&port->mpu_spin);
761             return(NULL);
762         }
763     }
764     _lwkt_pullmsg(port, msg);
765     spin_unlock_wr(&port->mpu_spin);
766     return(msg);
767 }
768
769 static
770 void
771 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
772 {
773     int dowakeup;
774
775     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
776
777     if (msg->ms_flags & MSGF_SYNC) {
778         /*
779          * If a synchronous completion has been requested, just wakeup
780          * the message without bothering to queue it to the target port.
781          */
782         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
783         wakeup(msg);
784     } else {
785         /*
786          * If an asynchronous completion has been requested the message
787          * must be queued to the reply port.
788          */
789         spin_lock_wr(&port->mpu_spin);
790         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
791         msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
792         dowakeup = 0;
793         if (port->mp_flags & MSGPORTF_WAITING) {
794             port->mp_flags &= ~MSGPORTF_WAITING;
795             dowakeup = 1;
796         }
797         spin_unlock_wr(&port->mpu_spin);
798         if (dowakeup)
799             wakeup(port);
800     }
801 }
802
803 /************************************************************************
804  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
805  ************************************************************************/
806
807 /*
808  * You can point a port's reply vector at this function if you just want
809  * the message marked done, without any queueing or signaling.  This is
810  * often used for structure-embedded messages.
811  */
812 static
813 void
814 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
815 {
816     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
817 }
818
819 static
820 void *
821 lwkt_panic_getport(lwkt_port_t port)
822 {
823     panic("lwkt_getport() illegal on port %p", port);
824 }
825
826 static
827 int
828 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
829 {
830     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
831 }
832
833 static
834 int
835 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
836 {
837     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
838 }
839
840 static
841 void *
842 lwkt_panic_waitport(lwkt_port_t port, int flags)
843 {
844     panic("port %p cannot be waited on", port);
845 }
846
847 static
848 void
849 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
850 {
851     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
852 }
853