a6a7cc0f01ca3194c8adbb63e101b80709264c23
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
38  */
39
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76
77 /************************************************************************
78  *                              MESSAGE FUNCTIONS                       *
79  ************************************************************************/
80
81 /*
82  * lwkt_sendmsg()
83  *
84  *      Request asynchronous completion and call lwkt_beginmsg().  The
85  *      target port can opt to execute the message synchronously or
86  *      asynchronously and this function will automatically queue the
87  *      response if the target executes the message synchronously.
88  *
89  *      NOTE: The message is in an indeterminant state until this call
90  *      returns.  The caller should not mess with it (e.g. try to abort it)
91  *      until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97
98     KKASSERT(msg->ms_reply_port != NULL &&
99              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102         lwkt_replymsg(msg, error);
103     }
104 }
105
106 /*
107  * lwkt_domsg()
108  *
109  *      Request asynchronous completion and call lwkt_beginmsg().  The
110  *      target port can opt to execute the message synchronously or
111  *      asynchronously and this function will automatically queue the
112  *      response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118
119     KKASSERT(msg->ms_reply_port != NULL &&
120              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124         error = lwkt_waitmsg(msg, flags);
125     } else {
126         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {   
139     int error;
140
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144         lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177             msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181
182 /************************************************************************
183  *                      PORT INITIALIZATION API                         *
184  ************************************************************************/
185
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
192
193 static void *lwkt_spin_getport(lwkt_port_t port);
194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
199
200 static void *lwkt_serialize_getport(lwkt_port_t port);
201 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
204 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
205
206 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
207 static void *lwkt_panic_getport(lwkt_port_t port);
208 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
211 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
213
214 /*
215  * Core port initialization (internal)
216  */
217 static __inline
218 void
219 _lwkt_initport(lwkt_port_t port,
220                void *(*gportfn)(lwkt_port_t),
221                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
222                int (*wmsgfn)(lwkt_msg_t, int),
223                void *(*wportfn)(lwkt_port_t, int),
224                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
225                void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
226 {
227     bzero(port, sizeof(*port));
228     TAILQ_INIT(&port->mp_msgq);
229     TAILQ_INIT(&port->mp_msgq_prio);
230     port->mp_getport = gportfn;
231     port->mp_putport = pportfn;
232     port->mp_waitmsg =  wmsgfn;
233     port->mp_waitport =  wportfn;
234     port->mp_replyport = rportfn;
235     port->mp_dropmsg = dmsgfn;
236 }
237
238 /*
239  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
240  * we tell the scheduler not to reschedule if td is at a higher priority.
241  *
242  * This routine is called even if the thread is already scheduled.
243  */
244 static __inline
245 void
246 _lwkt_schedule_msg(thread_t td, int flags)
247 {
248     lwkt_schedule(td);
249 }
250
251 /*
252  * lwkt_initport_thread()
253  *
254  *      Initialize a port for use by a particular thread.  The port may
255  *      only be used by <td>.
256  */
257 void
258 lwkt_initport_thread(lwkt_port_t port, thread_t td)
259 {
260     _lwkt_initport(port,
261                    lwkt_thread_getport,
262                    lwkt_thread_putport,
263                    lwkt_thread_waitmsg,
264                    lwkt_thread_waitport,
265                    lwkt_thread_replyport,
266                    lwkt_thread_dropmsg);
267     port->mpu_td = td;
268 }
269
270 /*
271  * lwkt_initport_spin()
272  *
273  *      Initialize a port for use with descriptors that might be accessed
274  *      via multiple LWPs, processes, or threads.  Has somewhat more
275  *      overhead then thread ports.
276  */
277 void
278 lwkt_initport_spin(lwkt_port_t port)
279 {
280     _lwkt_initport(port,
281                    lwkt_spin_getport,
282                    lwkt_spin_putport,
283                    lwkt_spin_waitmsg,
284                    lwkt_spin_waitport,
285                    lwkt_spin_replyport,
286                    lwkt_spin_dropmsg);
287     spin_init(&port->mpu_spin);
288 }
289
290 /*
291  * lwkt_initport_serialize()
292  *
293  *      Initialize a port for use with descriptors that might be accessed
294  *      via multiple LWPs, processes, or threads.  Callers are assumed to
295  *      have held the serializer (slz).
296  */
297 void
298 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
299 {
300     _lwkt_initport(port,
301                    lwkt_serialize_getport,
302                    lwkt_serialize_putport,
303                    lwkt_serialize_waitmsg,
304                    lwkt_serialize_waitport,
305                    lwkt_serialize_replyport,
306                    lwkt_panic_dropmsg);
307     port->mpu_serialize = slz;
308 }
309
310 /*
311  * Similar to the standard initport, this function simply marks the message
312  * as being done and does not attempt to return it to an originating port.
313  */
314 void
315 lwkt_initport_replyonly_null(lwkt_port_t port)
316 {
317     _lwkt_initport(port,
318                    lwkt_panic_getport,
319                    lwkt_panic_putport,
320                    lwkt_panic_waitmsg,
321                    lwkt_panic_waitport,
322                    lwkt_null_replyport,
323                    lwkt_panic_dropmsg);
324 }
325
326 /*
327  * Initialize a reply-only port, typically used as a message sink.  Such
328  * ports can only be used as a reply port.
329  */
330 void
331 lwkt_initport_replyonly(lwkt_port_t port,
332                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
333 {
334     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
335                          lwkt_panic_waitmsg, lwkt_panic_waitport,
336                          rportfn, lwkt_panic_dropmsg);
337 }
338
339 void
340 lwkt_initport_putonly(lwkt_port_t port,
341                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
342 {
343     _lwkt_initport(port, lwkt_panic_getport, pportfn,
344                          lwkt_panic_waitmsg, lwkt_panic_waitport,
345                          lwkt_panic_replyport, lwkt_panic_dropmsg);
346 }
347
348 void
349 lwkt_initport_panic(lwkt_port_t port)
350 {
351     _lwkt_initport(port,
352                    lwkt_panic_getport, lwkt_panic_putport,
353                    lwkt_panic_waitmsg, lwkt_panic_waitport,
354                    lwkt_panic_replyport, lwkt_panic_dropmsg);
355 }
356
357 static __inline
358 void
359 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
360 {
361     lwkt_msg_queue *queue;
362
363     /*
364      * normal case, remove and return the message.
365      */
366     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
367         queue = &port->mp_msgq_prio;
368     else
369         queue = &port->mp_msgq;
370     TAILQ_REMOVE(queue, msg, ms_node);
371
372     /*
373      * atomic op needed for spin ports
374      */
375     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
376 }
377
378 static __inline
379 void
380 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
381 {
382     lwkt_msg_queue *queue;
383
384     /*
385      * atomic op needed for spin ports
386      */
387     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
388     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
389         queue = &port->mp_msgq_prio;
390     else
391         queue = &port->mp_msgq;
392     TAILQ_INSERT_TAIL(queue, msg, ms_node);
393 }
394
395 static __inline
396 lwkt_msg_t
397 _lwkt_pollmsg(lwkt_port_t port)
398 {
399     lwkt_msg_t msg;
400
401     msg = TAILQ_FIRST(&port->mp_msgq_prio);
402     if (__predict_false(msg != NULL))
403         return msg;
404
405     /*
406      * Priority queue has no message, fallback to non-priority queue.
407      */
408     return TAILQ_FIRST(&port->mp_msgq);
409 }
410
411 static __inline
412 void
413 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
414 {
415     /*
416      * atomic op needed for spin ports
417      */
418     _lwkt_pushmsg(port, msg);
419     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
420 }
421
422 /************************************************************************
423  *                      THREAD PORT BACKEND                             *
424  ************************************************************************
425  *
426  * This backend is used when the port a message is retrieved from is owned
427  * by a single thread (the calling thread).  Messages are IPId to the
428  * correct cpu before being enqueued to a port.  Note that this is fairly
429  * optimal since scheduling would have had to do an IPI anyway if the
430  * message were headed to a different cpu.
431  */
432
433 #ifdef SMP
434
435 /*
436  * This function completes reply processing for the default case in the
437  * context of the originating cpu.
438  */
439 static
440 void
441 lwkt_thread_replyport_remote(lwkt_msg_t msg)
442 {
443     lwkt_port_t port = msg->ms_reply_port;
444     int flags;
445
446     /*
447      * Chase any thread migration that occurs
448      */
449     if (port->mpu_td->td_gd != mycpu) {
450         lwkt_send_ipiq(port->mpu_td->td_gd,
451                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
452         return;
453     }
454
455     /*
456      * Cleanup
457      */
458 #ifdef INVARIANTS
459     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
460     msg->ms_flags &= ~MSGF_INTRANSIT;
461 #endif
462     flags = msg->ms_flags;
463     if (msg->ms_flags & MSGF_SYNC) {
464         cpu_sfence();
465         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
466     } else {
467         _lwkt_enqueue_reply(port, msg);
468     }
469     if (port->mp_flags & MSGPORTF_WAITING)
470         _lwkt_schedule_msg(port->mpu_td, flags);
471 }
472
473 #endif
474
475 /*
476  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
477  *
478  * Called with the reply port as an argument but in the context of the
479  * original target port.  Completion must occur on the target port's
480  * cpu.
481  *
482  * The critical section protects us from IPIs on the this CPU.
483  */
484 static
485 void
486 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
487 {
488     int flags;
489
490     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
491
492     if (msg->ms_flags & MSGF_SYNC) {
493         /*
494          * If a synchronous completion has been requested, just wakeup
495          * the message without bothering to queue it to the target port.
496          *
497          * Assume the target thread is non-preemptive, so no critical
498          * section is required.
499          */
500 #ifdef SMP
501         if (port->mpu_td->td_gd == mycpu) {
502 #endif
503             flags = msg->ms_flags;
504             cpu_sfence();
505             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
506             if (port->mp_flags & MSGPORTF_WAITING)
507                 _lwkt_schedule_msg(port->mpu_td, flags);
508 #ifdef SMP
509         } else {
510 #ifdef INVARIANTS
511             msg->ms_flags |= MSGF_INTRANSIT;
512 #endif
513             msg->ms_flags |= MSGF_REPLY;
514             lwkt_send_ipiq(port->mpu_td->td_gd,
515                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
516         }
517 #endif
518     } else {
519         /*
520          * If an asynchronous completion has been requested the message
521          * must be queued to the reply port.
522          *
523          * A critical section is required to interlock the port queue.
524          */
525 #ifdef SMP
526         if (port->mpu_td->td_gd == mycpu) {
527 #endif
528             crit_enter();
529             _lwkt_enqueue_reply(port, msg);
530             if (port->mp_flags & MSGPORTF_WAITING)
531                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
532             crit_exit();
533 #ifdef SMP
534         } else {
535 #ifdef INVARIANTS
536             msg->ms_flags |= MSGF_INTRANSIT;
537 #endif
538             msg->ms_flags |= MSGF_REPLY;
539             lwkt_send_ipiq(port->mpu_td->td_gd,
540                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
541         }
542 #endif
543     }
544 }
545
546 /*
547  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
548  *
549  * This function could _only_ be used when caller is in the same thread
550  * as the message's target port owner thread.
551  */
552 static void
553 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
554 {
555     KASSERT(port->mpu_td == curthread,
556             ("message could only be dropped in the same thread "
557              "as the message target port thread\n"));
558     crit_enter_quick(port->mpu_td);
559     _lwkt_pullmsg(port, msg);
560     msg->ms_flags |= MSGF_DONE;
561     crit_exit_quick(port->mpu_td);
562 }
563
564 /*
565  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
566  *
567  * Called with the target port as an argument but in the context of the
568  * reply port.  This function always implements an asynchronous put to
569  * the target message port, and thus returns EASYNC.
570  *
571  * The message must already have cleared MSGF_DONE and MSGF_REPLY
572  */
573
574 #ifdef SMP
575
576 static
577 void
578 lwkt_thread_putport_remote(lwkt_msg_t msg)
579 {
580     lwkt_port_t port = msg->ms_target_port;
581
582     /*
583      * Chase any thread migration that occurs
584      */
585     if (port->mpu_td->td_gd != mycpu) {
586         lwkt_send_ipiq(port->mpu_td->td_gd,
587                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
588         return;
589     }
590
591     /*
592      * Cleanup
593      */
594 #ifdef INVARIANTS
595     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
596     msg->ms_flags &= ~MSGF_INTRANSIT;
597 #endif
598     _lwkt_pushmsg(port, msg);
599     if (port->mp_flags & MSGPORTF_WAITING)
600         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
601 }
602
603 #endif
604
605 static
606 int
607 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
608 {
609     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
610
611     msg->ms_target_port = port;
612 #ifdef SMP
613     if (port->mpu_td->td_gd == mycpu) {
614 #endif
615         crit_enter();
616         _lwkt_pushmsg(port, msg);
617         if (port->mp_flags & MSGPORTF_WAITING)
618             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
619         crit_exit();
620 #ifdef SMP
621     } else {
622 #ifdef INVARIANTS
623         msg->ms_flags |= MSGF_INTRANSIT;
624 #endif
625         lwkt_send_ipiq(port->mpu_td->td_gd,
626                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
627     }
628 #endif
629     return (EASYNC);
630 }
631
632 /*
633  * lwkt_thread_getport()
634  *
635  *      Retrieve the next message from the port or NULL if no messages
636  *      are ready.
637  */
638 static
639 void *
640 lwkt_thread_getport(lwkt_port_t port)
641 {
642     lwkt_msg_t msg;
643
644     KKASSERT(port->mpu_td == curthread);
645
646     crit_enter_quick(port->mpu_td);
647     if ((msg = _lwkt_pollmsg(port)) != NULL)
648         _lwkt_pullmsg(port, msg);
649     crit_exit_quick(port->mpu_td);
650     return(msg);
651 }
652
653 /*
654  * lwkt_thread_waitmsg()
655  *
656  *      Wait for a particular message to be replied.  We must be the only
657  *      thread waiting on the message.  The port must be owned by the
658  *      caller.
659  */
660 static
661 int
662 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
663 {
664     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
665             ("can't wait dropable message\n"));
666
667     if ((msg->ms_flags & MSGF_DONE) == 0) {
668         /*
669          * If the done bit was not set we have to block until it is.
670          */
671         lwkt_port_t port = msg->ms_reply_port;
672         thread_t td = curthread;
673         int sentabort;
674
675         KKASSERT(port->mpu_td == td);
676         crit_enter_quick(td);
677         sentabort = 0;
678
679         while ((msg->ms_flags & MSGF_DONE) == 0) {
680             port->mp_flags |= MSGPORTF_WAITING;
681             if (sentabort == 0) {
682                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
683                     lwkt_abortmsg(msg);
684                 }
685             } else {
686                 lwkt_sleep("waitabt", 0);
687             }
688             port->mp_flags &= ~MSGPORTF_WAITING;
689         }
690         if (msg->ms_flags & MSGF_QUEUED)
691             _lwkt_pullmsg(port, msg);
692         crit_exit_quick(td);
693     } else {
694         /*
695          * If the done bit was set we only have to mess around with the
696          * message if it is queued on the reply port.
697          */
698         if (msg->ms_flags & MSGF_QUEUED) {
699             lwkt_port_t port = msg->ms_reply_port;
700             thread_t td = curthread;
701
702             KKASSERT(port->mpu_td == td);
703             crit_enter_quick(td);
704             _lwkt_pullmsg(port, msg);
705             crit_exit_quick(td);
706         }
707     }
708     return(msg->ms_error);
709 }
710
711 static
712 void *
713 lwkt_thread_waitport(lwkt_port_t port, int flags)
714 {
715     thread_t td = curthread;
716     lwkt_msg_t msg;
717     int error;
718
719     KKASSERT(port->mpu_td == td);
720     crit_enter_quick(td);
721     while ((msg = _lwkt_pollmsg(port)) == NULL) {
722         port->mp_flags |= MSGPORTF_WAITING;
723         error = lwkt_sleep("waitport", flags);
724         port->mp_flags &= ~MSGPORTF_WAITING;
725         if (error)
726             goto done;
727     }
728     _lwkt_pullmsg(port, msg);
729 done:
730     crit_exit_quick(td);
731     return(msg);
732 }
733
734 /************************************************************************
735  *                         SPIN PORT BACKEND                            *
736  ************************************************************************
737  *
738  * This backend uses spinlocks instead of making assumptions about which
739  * thread is accessing the port.  It must be used when a port is not owned
740  * by a particular thread.  This is less optimal then thread ports but
741  * you don't have a choice if there are multiple threads accessing the port.
742  *
743  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
744  * on the message port, it is the responsibility of the code doing the
745  * wakeup to clear this flag rather then the blocked threads.  Some
746  * superfluous wakeups may occur, which is ok.
747  *
748  * XXX synchronous message wakeups are not current optimized.
749  */
750
751 static
752 void *
753 lwkt_spin_getport(lwkt_port_t port)
754 {
755     lwkt_msg_t msg;
756
757     spin_lock(&port->mpu_spin);
758     if ((msg = _lwkt_pollmsg(port)) != NULL)
759         _lwkt_pullmsg(port, msg);
760     spin_unlock(&port->mpu_spin);
761     return(msg);
762 }
763
764 static
765 int
766 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
767 {
768     int dowakeup;
769
770     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
771
772     msg->ms_target_port = port;
773     spin_lock(&port->mpu_spin);
774     _lwkt_pushmsg(port, msg);
775     dowakeup = 0;
776     if (port->mp_flags & MSGPORTF_WAITING) {
777         port->mp_flags &= ~MSGPORTF_WAITING;
778         dowakeup = 1;
779     }
780     spin_unlock(&port->mpu_spin);
781     if (dowakeup)
782         wakeup(port);
783     return (EASYNC);
784 }
785
786 static
787 int
788 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
789 {
790     lwkt_port_t port;
791     int sentabort;
792     int error;
793
794     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
795             ("can't wait dropable message\n"));
796
797     if ((msg->ms_flags & MSGF_DONE) == 0) {
798         port = msg->ms_reply_port;
799         sentabort = 0;
800         spin_lock(&port->mpu_spin);
801         while ((msg->ms_flags & MSGF_DONE) == 0) {
802             void *won;
803
804             /*
805              * If message was sent synchronously from the beginning
806              * the wakeup will be on the message structure, else it
807              * will be on the port structure.
808              */
809             if (msg->ms_flags & MSGF_SYNC) {
810                 won = msg;
811                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
812             } else {
813                 won = port;
814                 port->mp_flags |= MSGPORTF_WAITING;
815             }
816
817             /*
818              * Only messages which support abort can be interrupted.
819              * We must still wait for message completion regardless.
820              */
821             if ((flags & PCATCH) && sentabort == 0) {
822                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
823                 if (error) {
824                     sentabort = error;
825                     spin_unlock(&port->mpu_spin);
826                     lwkt_abortmsg(msg);
827                     spin_lock(&port->mpu_spin);
828                 }
829             } else {
830                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
831             }
832             /* see note at the top on the MSGPORTF_WAITING flag */
833         }
834         /*
835          * Turn EINTR into ERESTART if the signal indicates.
836          */
837         if (sentabort && msg->ms_error == EINTR)
838             msg->ms_error = sentabort;
839         if (msg->ms_flags & MSGF_QUEUED)
840             _lwkt_pullmsg(port, msg);
841         spin_unlock(&port->mpu_spin);
842     } else {
843         if (msg->ms_flags & MSGF_QUEUED) {
844             port = msg->ms_reply_port;
845             spin_lock(&port->mpu_spin);
846             _lwkt_pullmsg(port, msg);
847             spin_unlock(&port->mpu_spin);
848         }
849     }
850     return(msg->ms_error);
851 }
852
853 static
854 void *
855 lwkt_spin_waitport(lwkt_port_t port, int flags)
856 {
857     lwkt_msg_t msg;
858     int error;
859
860     spin_lock(&port->mpu_spin);
861     while ((msg = _lwkt_pollmsg(port)) == NULL) {
862         port->mp_flags |= MSGPORTF_WAITING;
863         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
864         /* see note at the top on the MSGPORTF_WAITING flag */
865         if (error) {
866             spin_unlock(&port->mpu_spin);
867             return(NULL);
868         }
869     }
870     _lwkt_pullmsg(port, msg);
871     spin_unlock(&port->mpu_spin);
872     return(msg);
873 }
874
875 static
876 void
877 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
878 {
879     int dowakeup;
880
881     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
882
883     if (msg->ms_flags & MSGF_SYNC) {
884         /*
885          * If a synchronous completion has been requested, just wakeup
886          * the message without bothering to queue it to the target port.
887          */
888         spin_lock(&port->mpu_spin);
889         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
890         dowakeup = 0;
891         if (msg->ms_flags & MSGF_WAITING) {
892                 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
893                 dowakeup = 1;
894         }
895         spin_unlock(&port->mpu_spin);
896         if (dowakeup)
897                 wakeup(msg);
898     } else {
899         /*
900          * If an asynchronous completion has been requested the message
901          * must be queued to the reply port.
902          */
903         spin_lock(&port->mpu_spin);
904         _lwkt_enqueue_reply(port, msg);
905         dowakeup = 0;
906         if (port->mp_flags & MSGPORTF_WAITING) {
907             port->mp_flags &= ~MSGPORTF_WAITING;
908             dowakeup = 1;
909         }
910         spin_unlock(&port->mpu_spin);
911         if (dowakeup)
912             wakeup(port);
913     }
914 }
915
916 /*
917  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
918  *
919  * This function could _only_ be used when caller is in the same thread
920  * as the message's target port owner thread.
921  */
922 static void
923 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
924 {
925     KASSERT(port->mpu_td == curthread,
926             ("message could only be dropped in the same thread "
927              "as the message target port thread\n"));
928     spin_lock(&port->mpu_spin);
929     _lwkt_pullmsg(port, msg);
930     msg->ms_flags |= MSGF_DONE;
931     spin_unlock(&port->mpu_spin);
932 }
933
934 /************************************************************************
935  *                        SERIALIZER PORT BACKEND                       *
936  ************************************************************************
937  *
938  * This backend uses serializer to protect port accessing.  Callers are
939  * assumed to have serializer held.  This kind of port is usually created
940  * by network device driver along with _one_ lwkt thread to pipeline
941  * operations which may temporarily release serializer.
942  *
943  * Implementation is based on SPIN PORT BACKEND.
944  */
945
946 static
947 void *
948 lwkt_serialize_getport(lwkt_port_t port)
949 {
950     lwkt_msg_t msg;
951
952     ASSERT_SERIALIZED(port->mpu_serialize);
953
954     if ((msg = _lwkt_pollmsg(port)) != NULL)
955         _lwkt_pullmsg(port, msg);
956     return(msg);
957 }
958
959 static
960 int
961 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
962 {
963     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
964     ASSERT_SERIALIZED(port->mpu_serialize);
965
966     msg->ms_target_port = port;
967     _lwkt_pushmsg(port, msg);
968     if (port->mp_flags & MSGPORTF_WAITING) {
969         port->mp_flags &= ~MSGPORTF_WAITING;
970         wakeup(port);
971     }
972     return (EASYNC);
973 }
974
975 static
976 int
977 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
978 {
979     lwkt_port_t port;
980     int sentabort;
981     int error;
982
983     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
984             ("can't wait dropable message\n"));
985
986     if ((msg->ms_flags & MSGF_DONE) == 0) {
987         port = msg->ms_reply_port;
988
989         ASSERT_SERIALIZED(port->mpu_serialize);
990
991         sentabort = 0;
992         while ((msg->ms_flags & MSGF_DONE) == 0) {
993             void *won;
994
995             /*
996              * If message was sent synchronously from the beginning
997              * the wakeup will be on the message structure, else it
998              * will be on the port structure.
999              */
1000             if (msg->ms_flags & MSGF_SYNC) {
1001                 won = msg;
1002             } else {
1003                 won = port;
1004                 port->mp_flags |= MSGPORTF_WAITING;
1005             }
1006
1007             /*
1008              * Only messages which support abort can be interrupted.
1009              * We must still wait for message completion regardless.
1010              */
1011             if ((flags & PCATCH) && sentabort == 0) {
1012                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1013                 if (error) {
1014                     sentabort = error;
1015                     lwkt_serialize_exit(port->mpu_serialize);
1016                     lwkt_abortmsg(msg);
1017                     lwkt_serialize_enter(port->mpu_serialize);
1018                 }
1019             } else {
1020                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1021             }
1022             /* see note at the top on the MSGPORTF_WAITING flag */
1023         }
1024         /*
1025          * Turn EINTR into ERESTART if the signal indicates.
1026          */
1027         if (sentabort && msg->ms_error == EINTR)
1028             msg->ms_error = sentabort;
1029         if (msg->ms_flags & MSGF_QUEUED)
1030             _lwkt_pullmsg(port, msg);
1031     } else {
1032         if (msg->ms_flags & MSGF_QUEUED) {
1033             port = msg->ms_reply_port;
1034
1035             ASSERT_SERIALIZED(port->mpu_serialize);
1036             _lwkt_pullmsg(port, msg);
1037         }
1038     }
1039     return(msg->ms_error);
1040 }
1041
1042 static
1043 void *
1044 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1045 {
1046     lwkt_msg_t msg;
1047     int error;
1048
1049     ASSERT_SERIALIZED(port->mpu_serialize);
1050
1051     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1052         port->mp_flags |= MSGPORTF_WAITING;
1053         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1054         /* see note at the top on the MSGPORTF_WAITING flag */
1055         if (error)
1056             return(NULL);
1057     }
1058     _lwkt_pullmsg(port, msg);
1059     return(msg);
1060 }
1061
1062 static
1063 void
1064 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1065 {
1066     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1067     ASSERT_SERIALIZED(port->mpu_serialize);
1068
1069     if (msg->ms_flags & MSGF_SYNC) {
1070         /*
1071          * If a synchronous completion has been requested, just wakeup
1072          * the message without bothering to queue it to the target port.
1073          */
1074         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1075         wakeup(msg);
1076     } else {
1077         /*
1078          * If an asynchronous completion has been requested the message
1079          * must be queued to the reply port.
1080          */
1081         _lwkt_enqueue_reply(port, msg);
1082         if (port->mp_flags & MSGPORTF_WAITING) {
1083             port->mp_flags &= ~MSGPORTF_WAITING;
1084             wakeup(port);
1085         }
1086     }
1087 }
1088
1089 /************************************************************************
1090  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1091  ************************************************************************/
1092
1093 /*
1094  * You can point a port's reply vector at this function if you just want
1095  * the message marked done, without any queueing or signaling.  This is
1096  * often used for structure-embedded messages.
1097  */
1098 static
1099 void
1100 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1101 {
1102     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1103 }
1104
1105 static
1106 void *
1107 lwkt_panic_getport(lwkt_port_t port)
1108 {
1109     panic("lwkt_getport() illegal on port %p", port);
1110 }
1111
1112 static
1113 int
1114 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1115 {
1116     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1117 }
1118
1119 static
1120 int
1121 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1122 {
1123     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1124 }
1125
1126 static
1127 void *
1128 lwkt_panic_waitport(lwkt_port_t port, int flags)
1129 {
1130     panic("port %p cannot be waited on", port);
1131 }
1132
1133 static
1134 void
1135 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1136 {
1137     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1138 }
1139
1140 static
1141 void
1142 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1143 {
1144     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1145 }