LWKT message ports contain a number of function pointers which abstract
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.41 2007/05/24 05:51:27 dillon Exp $
38  */
39
40 #ifdef _KERNEL
41
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/proc.h>
46 #include <sys/rtprio.h>
47 #include <sys/queue.h>
48 #include <sys/sysctl.h>
49 #include <sys/kthread.h>
50 #include <sys/signalvar.h>
51 #include <sys/signal2.h>
52 #include <machine/cpu.h>
53 #include <sys/lock.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_kern.h>
58 #include <vm/vm_object.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_pager.h>
62 #include <vm/vm_extern.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/thread2.h>
66 #include <sys/msgport2.h>
67 #include <sys/spinlock2.h>
68
69 #include <machine/stdarg.h>
70 #include <machine/cpufunc.h>
71 #ifdef SMP
72 #include <machine/smp.h>
73 #endif
74
75 #include <sys/malloc.h>
76 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77
78 #else
79
80 #include <sys/stdint.h>
81 #include <libcaps/thread.h>
82 #include <sys/thread.h>
83 #include <sys/msgport.h>
84 #include <sys/errno.h>
85 #include <libcaps/globaldata.h>
86 #include <machine/cpufunc.h>
87 #include <sys/thread2.h>
88 #include <sys/msgport2.h>
89 #include <string.h>
90
91 #endif /* _KERNEL */
92
93
94 /************************************************************************
95  *                              MESSAGE FUNCTIONS                       *
96  ************************************************************************/
97
98 /*
99  * lwkt_sendmsg()
100  *
101  *      Request asynchronous completion and call lwkt_beginmsg().  The
102  *      target port can opt to execute the message synchronously or
103  *      asynchronously and this function will automatically queue the
104  *      response if the target executes the message synchronously.
105  *
106  *      NOTE: The message is in an indeterminant state until this call
107  *      returns.  The caller should not mess with it (e.g. try to abort it)
108  *      until then.
109  */
110 void
111 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
112 {
113     int error;
114
115     KKASSERT(msg->ms_reply_port != NULL &&
116              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
117     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
118     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
119         lwkt_replymsg(msg, error);
120     }
121 }
122
123 /*
124  * lwkt_domsg()
125  *
126  *      Request asynchronous completion and call lwkt_beginmsg().  The
127  *      target port can opt to execute the message synchronously or
128  *      asynchronously and this function will automatically queue the
129  *      response if the target executes the message synchronously.
130  */
131 int
132 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg)
133 {
134     int error;
135
136     KKASSERT(msg->ms_reply_port != NULL &&
137              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
138     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
139     msg->ms_flags |= MSGF_SYNC;
140     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
141         error = lwkt_waitmsg(msg);
142     } else {
143         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
144     }
145     return(error);
146 }
147
148 /*
149  * lwkt_forwardmsg()
150  *
151  * Forward a message received on one port to another port.
152  */
153 int
154 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
155 {   
156     int error;
157
158     crit_enter();
159     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
160     if ((error = port->mp_putport(port, msg)) != EASYNC)
161         lwkt_replymsg(msg, error);
162     crit_exit();
163     return(error);
164 }
165
166 /*
167  * lwkt_abortmsg()
168  *
169  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
170  * The caller must ensure that the message will not be both replied AND
171  * destroyed while the abort is in progress.
172  *
173  * This function issues a callback which might block!
174  */
175 void
176 lwkt_abortmsg(lwkt_msg_t msg)
177 {
178     /*
179      * A critical section protects us from reply IPIs on this cpu.
180      */
181     crit_enter();
182
183     /*
184      * Shortcut the operation if the message has already been returned.
185      * The callback typically constructs a lwkt_msg with the abort request,
186      * issues it synchronously, and waits for completion.  The callback
187      * is not required to actually abort the message and the target port,
188      * upon receiving an abort request message generated by the callback
189      * should check whether the original message has already completed or
190      * not.
191      */
192     if (msg->ms_flags & MSGF_ABORTABLE) {
193         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
194             msg->ms_abortfn(msg);
195     }
196     crit_exit();
197 }
198
199 /************************************************************************
200  *                      PORT INITIALIZATION API                         *
201  ************************************************************************/
202
203 static void *lwkt_thread_getport(lwkt_port_t port);
204 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_thread_waitport(lwkt_port_t port, lwkt_msg_t msg);
206 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
207
208 static void *lwkt_spin_getport(lwkt_port_t port);
209 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
210 static void *lwkt_spin_waitport(lwkt_port_t port, lwkt_msg_t msg);
211 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
212
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static void *lwkt_panic_waitport(lwkt_port_t port, lwkt_msg_t msg);
217 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
218
219 /*
220  * Core port initialization (internal)
221  */
222 static __inline
223 void
224 _lwkt_initport(lwkt_port_t port,
225                void *(*gportfn)(lwkt_port_t),
226                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
227                void *(*wportfn)(lwkt_port_t, lwkt_msg_t),
228                void (*rportfn)(lwkt_port_t, lwkt_msg_t))
229 {
230     bzero(port, sizeof(*port));
231     TAILQ_INIT(&port->mp_msgq);
232     port->mp_getport = gportfn;
233     port->mp_putport = pportfn;
234     port->mp_waitport =  wportfn;
235     port->mp_replyport = rportfn;
236 }
237
238 /*
239  * lwkt_initport_thread()
240  *
241  *      Initialize a port for use by a particular thread.  The port may
242  *      only be used by <td>.
243  */
244 void
245 lwkt_initport_thread(lwkt_port_t port, thread_t td)
246 {
247     _lwkt_initport(port,
248                    lwkt_thread_getport,
249                    lwkt_thread_putport,
250                    lwkt_thread_waitport,
251                    lwkt_thread_replyport);
252     port->mpu_td = td;
253 }
254
255 /*
256  * lwkt_initport_spin()
257  *
258  *      Initialize a port for use with descriptors that might be accessed
259  *      via multiple LWPs, processes, or threads.  Has somewhat more
260  *      overhead then thread ports.
261  */
262 void
263 lwkt_initport_spin(lwkt_port_t port)
264 {
265     _lwkt_initport(port,
266                    lwkt_spin_getport,
267                    lwkt_spin_putport,
268                    lwkt_spin_waitport,
269                    lwkt_spin_replyport);
270     spin_init(&port->mpu_spin);
271 }
272
273 /*
274  * Similar to the standard initport, this function simply marks the message
275  * as being done and does not attempt to return it to an originating port.
276  */
277 void
278 lwkt_initport_replyonly_null(lwkt_port_t port)
279 {
280     _lwkt_initport(port,
281                    lwkt_panic_getport,
282                    lwkt_panic_putport,
283                    lwkt_panic_waitport,
284                    lwkt_null_replyport);
285 }
286
287 /*
288  * Initialize a reply-only port, typically used as a message sink.  Such
289  * ports can only be used as a reply port.
290  */
291 void
292 lwkt_initport_replyonly(lwkt_port_t port,
293                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
294 {
295     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
296                          lwkt_panic_waitport, rportfn);
297 }
298
299 void
300 lwkt_initport_putonly(lwkt_port_t port,
301                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
302 {
303     _lwkt_initport(port, lwkt_panic_getport, pportfn,
304                          lwkt_panic_waitport, lwkt_panic_replyport);
305 }
306
307 void
308 lwkt_initport_panic(lwkt_port_t port)
309 {
310     _lwkt_initport(port,
311                    lwkt_panic_getport,
312                    lwkt_panic_putport,
313                    lwkt_panic_waitport,
314                    lwkt_panic_replyport);
315 }
316
317 /*
318  * lwkt_getport()
319  *
320  *      Retrieve the next message from the port's message queue, return NULL
321  *      if no messages are pending.  The retrieved message will either be a
322  *      request or a reply based on the MSGF_REPLY bit.
323  *
324  *      The calling thread MUST own the port.
325  */
326
327 static __inline
328 void
329 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
330 {
331     /*
332      * normal case, remove and return the message.
333      */
334     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
335     msg->ms_flags &= ~MSGF_QUEUED;
336 }
337
338 /************************************************************************
339  *                      THREAD PORT BACKEND                             *
340  ************************************************************************
341  *
342  * This backend is used when the port a message is retrieved from is owned
343  * by a single thread (the calling thread).  Messages are IPId to the
344  * correct cpu before being enqueued to a port.  Note that this is fairly
345  * optimal since scheduling would have had to do an IPI anyway if the
346  * message were headed to a different cpu.
347  */
348
349 #ifdef SMP
350
351 /*
352  * This function completes reply processing for the default case in the
353  * context of the originating cpu.
354  */
355 static
356 void
357 lwkt_thread_replyport_remote(lwkt_msg_t msg)
358 {
359     lwkt_port_t port = msg->ms_reply_port;
360
361     /*
362      * Chase any thread migration that occurs
363      */
364     if (port->mpu_td->td_gd != mycpu) {
365         lwkt_send_ipiq(port->mpu_td->td_gd,
366                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
367         return;
368     }
369
370     /*
371      * Cleanup
372      */
373 #ifdef INVARIANTS
374     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
375     msg->ms_flags &= ~MSGF_INTRANSIT;
376 #endif
377     if (msg->ms_flags & MSGF_SYNC) {
378             msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
379     } else {
380             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
381             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
382     }
383     if (port->mp_flags & MSGPORTF_WAITING)
384         lwkt_schedule(port->mpu_td);
385 }
386
387 #endif
388
389 /*
390  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
391  *
392  * Called with the reply port as an argument but in the context of the
393  * original target port.  Completion must occur on the target port's
394  * cpu.
395  *
396  * The critical section protects us from IPIs on the this CPU.
397  */
398 void
399 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
400 {
401     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
402
403     if (msg->ms_flags & MSGF_SYNC) {
404         /*
405          * If a synchronous completion has been requested, just wakeup
406          * the message without bothering to queue it to the target port.
407          *
408          * Assume the target thread is non-preemptive, so no critical
409          * section is required.
410          */
411 #ifdef SMP
412         if (port->mpu_td->td_gd == mycpu) {
413 #endif
414             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
415             if (port->mp_flags & MSGPORTF_WAITING)
416                 lwkt_schedule(port->mpu_td);
417 #ifdef SMP
418         } else {
419 #ifdef INVARIANTS
420             msg->ms_flags |= MSGF_INTRANSIT;
421 #endif
422             msg->ms_flags |= MSGF_REPLY;
423             lwkt_send_ipiq(port->mpu_td->td_gd,
424                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
425         }
426 #endif
427     } else {
428         /*
429          * If an asynchronous completion has been requested the message
430          * must be queued to the reply port.  MSGF_REPLY cannot be set
431          * until the message actually gets queued.
432          *
433          * A critical section is required to interlock the port queue.
434          */
435 #ifdef SMP
436         if (port->mpu_td->td_gd == mycpu) {
437 #endif
438             crit_enter();
439             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
440             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
441             if (port->mp_flags & MSGPORTF_WAITING)
442                 lwkt_schedule(port->mpu_td);
443             crit_exit();
444 #ifdef SMP
445         } else {
446 #ifdef INVARIANTS
447             msg->ms_flags |= MSGF_INTRANSIT;
448 #endif
449             msg->ms_flags |= MSGF_REPLY;
450             lwkt_send_ipiq(port->mpu_td->td_gd,
451                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
452         }
453 #endif
454     }
455 }
456
457 /*
458  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
459  *
460  * Called with the target port as an argument but in the context of the
461  * reply port.  This function always implements an asynchronous put to
462  * the target message port, and thus returns EASYNC.
463  *
464  * The message must already have cleared MSGF_DONE and MSGF_REPLY
465  */
466
467 #ifdef SMP
468
469 static
470 void
471 lwkt_thread_putport_remote(lwkt_msg_t msg)
472 {
473     lwkt_port_t port = msg->ms_target_port;
474
475     /*
476      * Chase any thread migration that occurs
477      */
478     if (port->mpu_td->td_gd != mycpu) {
479         lwkt_send_ipiq(port->mpu_td->td_gd,
480                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
481         return;
482     }
483
484     /*
485      * Cleanup
486      */
487 #ifdef INVARIANTS
488     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
489     msg->ms_flags &= ~MSGF_INTRANSIT;
490 #endif
491     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
492     msg->ms_flags |= MSGF_QUEUED;
493     if (port->mp_flags & MSGPORTF_WAITING)
494         lwkt_schedule(port->mpu_td);
495 }
496
497 #endif
498
499 static
500 int
501 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
502 {
503     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
504
505     msg->ms_target_port = port;
506 #ifdef SMP
507     if (port->mpu_td->td_gd == mycpu) {
508 #endif
509         crit_enter();
510         msg->ms_flags |= MSGF_QUEUED;
511         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
512         if (port->mp_flags & MSGPORTF_WAITING)
513             lwkt_schedule(port->mpu_td);
514         crit_exit();
515 #ifdef SMP
516     } else {
517 #ifdef INVARIANTS
518         msg->ms_flags |= MSGF_INTRANSIT;
519 #endif
520         lwkt_send_ipiq(port->mpu_td->td_gd,
521                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
522     }
523 #endif
524     return (EASYNC);
525 }
526
527 /*
528  * lwkt_thread_getport()
529  *
530  *      Retrieve the next message from the port or NULL if no messages
531  *      are ready.
532  */
533 void *
534 lwkt_thread_getport(lwkt_port_t port)
535 {
536     lwkt_msg_t msg;
537
538     KKASSERT(port->mpu_td == curthread);
539
540     crit_enter_quick(port->mpu_td);
541     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
542         _lwkt_pullmsg(port, msg);
543     crit_exit_quick(port->mpu_td);
544     return(msg);
545 }
546
547 /*
548  * lwkt_thread_waitport()
549  *
550  *      If msg is NULL, dequeue the next message from the port's message
551  *      queue, block until a message is ready.  This function never
552  *      returns NULL.
553  *
554  *      If msg is non-NULL, block until the requested message has been
555  *      replied, then dequeue and return it.
556  *
557  *      NOTE: This function should not be used to wait for specific
558  *      incoming requests because MSGF_DONE only applies to replies.
559  *
560  *      Note that the API does not currently support multiple threads waiting
561  *      on a single port.  The port must be owned by the caller.
562  */
563 void *
564 lwkt_thread_waitport(lwkt_port_t port, lwkt_msg_t msg)
565 {
566     thread_t td = curthread;
567     int sentabort;
568
569     KKASSERT(port->mpu_td == td);
570     crit_enter_quick(td);
571     if (msg == NULL) {
572         /*
573          * Wait for any message
574          */
575         if ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
576             port->mp_flags |= MSGPORTF_WAITING;
577             td->td_flags |= TDF_BLOCKED;
578             do {
579                 lwkt_deschedule_self(td);
580                 lwkt_switch();
581             } while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL);
582             td->td_flags &= ~TDF_BLOCKED;
583             port->mp_flags &= ~MSGPORTF_WAITING;
584         }
585         _lwkt_pullmsg(port, msg);
586     } else {
587         /*
588          * Wait for a specific message.
589          */
590         KKASSERT(msg->ms_reply_port == port);
591         if ((msg->ms_flags & MSGF_DONE) == 0) {
592             sentabort = 0;
593             while ((msg->ms_flags & MSGF_DONE) == 0) {
594                 /*
595                  * MSGF_PCATCH is only set by processes which wish to
596                  * abort the message they are blocked on when a signal
597                  * occurs.  Note that we still must wait for message
598                  * completion after sending an abort request.
599                  */
600                 if (msg->ms_flags & MSGF_PCATCH) {
601                     if (sentabort == 0 && CURSIG(port->mpu_td->td_lwp)) {
602                         sentabort = 1;
603                         lwkt_abortmsg(msg);
604                         continue;
605                     }
606                 }
607
608                 /*
609                  * XXX set TDF_SINTR so 'ps' knows the difference between
610                  * an interruptable wait and a disk wait.  YYY eventually
611                  * move LWP_SINTR to TDF_SINTR to reduce duplication.
612                  */
613                 port->mp_flags |= MSGPORTF_WAITING;
614                 td->td_flags |= TDF_SINTR | TDF_BLOCKED;
615                 lwkt_deschedule_self(td);
616                 lwkt_switch();
617                 td->td_flags &= ~(TDF_SINTR | TDF_BLOCKED);
618                 port->mp_flags &= ~MSGPORTF_WAITING;
619             }
620         }
621
622         /*
623          * Once the MSGF_DONE bit is set, the message is stable.  We
624          * can just check MSGF_QUEUED to determine
625          */
626         if (msg->ms_flags & MSGF_QUEUED)
627             _lwkt_pullmsg(port, msg);
628     }
629     crit_exit_quick(td);
630     return(msg);
631 }
632
633 /************************************************************************
634  *                         SPIN PORT BACKEND                            *
635  ************************************************************************
636  *
637  * This backend uses spinlocks instead of making assumptions about which
638  * thread is accessing the port.  It must be used when a port is not owned
639  * by a particular thread.  This is less optimal then thread ports but
640  * you don't have a choice if there are multiple threads accessing the port.
641  */
642
643 static
644 void *
645 lwkt_spin_getport(lwkt_port_t port)
646 {
647     lwkt_msg_t msg;
648
649     spin_lock_wr(&port->mpu_spin);
650     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
651         _lwkt_pullmsg(port, msg);
652     spin_unlock_wr(&port->mpu_spin);
653     return(msg);
654 }
655
656 static
657 int
658 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
659 {
660     int dowakeup;
661
662     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
663
664     msg->ms_target_port = port;
665     spin_lock_wr(&port->mpu_spin);
666     msg->ms_flags |= MSGF_QUEUED;
667     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
668     dowakeup = 0;
669     if (port->mp_flags & MSGPORTF_WAITING) {
670         port->mp_flags &= ~MSGPORTF_WAITING;
671         dowakeup = 1;
672     }
673     spin_unlock_wr(&port->mpu_spin);
674     if (dowakeup)
675         wakeup(port);
676     return (EASYNC);
677 }
678
679 static
680 void *
681 lwkt_spin_waitport(lwkt_port_t port, lwkt_msg_t msg)
682 {
683     int sentabort;
684     int error;
685
686     spin_lock_wr(&port->mpu_spin);
687     if (msg == NULL) {
688         /*
689          * Wait for any message
690          */
691         while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
692             port->mp_flags |= MSGPORTF_WAITING;
693             msleep(port, &port->mpu_spin, 0, "wport", 0);
694             /* see note at the top on the MSGPORTF_WAITING flag */
695         }
696         _lwkt_pullmsg(port, msg);
697     } else {
698         /*
699          * Wait for a specific message.
700          */
701         KKASSERT(msg->ms_reply_port == port);
702         if ((msg->ms_flags & MSGF_DONE) == 0) {
703             sentabort = 0;
704             while ((msg->ms_flags & MSGF_DONE) == 0) {
705                 void *won;
706
707                 /*
708                  * If message was sent synchronously from the beginning
709                  * the wakeup will be on the message structure, else it
710                  * will be on the port structure.
711                  */
712                 if (msg->ms_flags & MSGF_SYNC) {
713                     won = msg;
714                 } else {
715                     won = port;
716                     port->mp_flags |= MSGPORTF_WAITING;
717                 }
718
719                 /*
720                  * MSGF_PCATCH is only set by processes which wish to
721                  * abort the message they are blocked on when a signal
722                  * occurs.  Note that we still must wait for message
723                  * completion after sending an abort request.
724                  *
725                  * XXX ERESTART not handled.
726                  */
727                 if ((msg->ms_flags & MSGF_PCATCH) && sentabort == 0) {
728                     error = msleep(won, &port->mpu_spin, PCATCH, "wmsg", 0);
729                     if (error) {
730                         sentabort = error;
731                         spin_unlock_wr(&port->mpu_spin);
732                         lwkt_abortmsg(msg);
733                         spin_lock_wr(&port->mpu_spin);
734                     }
735                 } else {
736                     error = msleep(won, &port->mpu_spin, 0, "wmsg", 0);
737                 }
738                 /* see note at the top on the MSGPORTF_WAITING flag */
739             }
740             /*
741              * Turn EINTR into ERESTART if the signal indicates.
742              */
743             if (sentabort && msg->ms_error == EINTR)
744                 msg->ms_error = sentabort;
745
746             /*
747              * Once the MSGF_DONE bit is set, the message is stable.  We
748              * can just check MSGF_QUEUED to determine
749              */
750             if (msg->ms_flags & MSGF_QUEUED)
751                     _lwkt_pullmsg(port, msg);
752         }
753     }
754     spin_unlock_wr(&port->mpu_spin);
755     return(msg);
756 }
757
758 static
759 void
760 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
761 {
762     int dowakeup;
763
764     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
765
766     if (msg->ms_flags & MSGF_SYNC) {
767         /*
768          * If a synchronous completion has been requested, just wakeup
769          * the message without bothering to queue it to the target port.
770          */
771         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
772         wakeup(msg);
773     } else {
774         /*
775          * If an asynchronous completion has been requested the message
776          * must be queued to the reply port.  MSGF_REPLY cannot be set
777          * until the message actually gets queued.
778          */
779         spin_lock_wr(&port->mpu_spin);
780         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
781         msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
782         dowakeup = 0;
783         if (port->mp_flags & MSGPORTF_WAITING) {
784             port->mp_flags &= ~MSGPORTF_WAITING;
785             dowakeup = 1;
786         }
787         spin_unlock_wr(&port->mpu_spin);
788         if (dowakeup)
789             wakeup(port);
790     }
791 }
792
793 /************************************************************************
794  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
795  ************************************************************************/
796
797 /*
798  * You can point a port's reply vector at this function if you just want
799  * the message marked done, without any queueing or signaling.  This is
800  * often used for structure-embedded messages.
801  */
802 static
803 void
804 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
805 {
806     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
807 }
808
809 static
810 void *
811 lwkt_panic_getport(lwkt_port_t port)
812 {
813     panic("lwkt_getport() illegal on port %p", port);
814 }
815
816 static
817 int
818 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
819 {
820     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
821 }
822
823 static
824 void *
825 lwkt_panic_waitport(lwkt_port_t port, lwkt_msg_t msg)
826 {
827     panic("port %p cannot be waited on msg %p", port, msg);
828 }
829
830 static
831 void
832 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
833 {
834     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
835 }
836