Initial import of binutils 2.22 on the new vendor branch
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
38  */
39
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76
77 /************************************************************************
78  *                              MESSAGE FUNCTIONS                       *
79  ************************************************************************/
80
81 /*
82  * lwkt_sendmsg()
83  *
84  *      Request asynchronous completion and call lwkt_beginmsg().  The
85  *      target port can opt to execute the message synchronously or
86  *      asynchronously and this function will automatically queue the
87  *      response if the target executes the message synchronously.
88  *
89  *      NOTE: The message is in an indeterminant state until this call
90  *      returns.  The caller should not mess with it (e.g. try to abort it)
91  *      until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97
98     KKASSERT(msg->ms_reply_port != NULL &&
99              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102         lwkt_replymsg(msg, error);
103     }
104 }
105
106 /*
107  * lwkt_domsg()
108  *
109  *      Request asynchronous completion and call lwkt_beginmsg().  The
110  *      target port can opt to execute the message synchronously or
111  *      asynchronously and this function will automatically queue the
112  *      response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118
119     KKASSERT(msg->ms_reply_port != NULL &&
120              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124         error = lwkt_waitmsg(msg, flags);
125     } else {
126         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {   
139     int error;
140
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144         lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177             msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181
182 /************************************************************************
183  *                      PORT INITIALIZATION API                         *
184  ************************************************************************/
185
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
192
193 static void *lwkt_spin_getport(lwkt_port_t port);
194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198
199 static void *lwkt_serialize_getport(lwkt_port_t port);
200 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
202 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
203 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204
205 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
206 static void *lwkt_panic_getport(lwkt_port_t port);
207 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
208 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
209 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
210 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
211 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
212
213 /*
214  * Core port initialization (internal)
215  */
216 static __inline
217 void
218 _lwkt_initport(lwkt_port_t port,
219                void *(*gportfn)(lwkt_port_t),
220                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
221                int (*wmsgfn)(lwkt_msg_t, int),
222                void *(*wportfn)(lwkt_port_t, int),
223                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
224                void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
225 {
226     bzero(port, sizeof(*port));
227     TAILQ_INIT(&port->mp_msgq);
228     TAILQ_INIT(&port->mp_msgq_prio);
229     port->mp_getport = gportfn;
230     port->mp_putport = pportfn;
231     port->mp_waitmsg =  wmsgfn;
232     port->mp_waitport =  wportfn;
233     port->mp_replyport = rportfn;
234     port->mp_dropmsg = dmsgfn;
235 }
236
237 /*
238  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
239  * we tell the scheduler not to reschedule if td is at a higher priority.
240  *
241  * This routine is called even if the thread is already scheduled.
242  */
243 static __inline
244 void
245 _lwkt_schedule_msg(thread_t td, int flags)
246 {
247     lwkt_schedule(td);
248 }
249
250 /*
251  * lwkt_initport_thread()
252  *
253  *      Initialize a port for use by a particular thread.  The port may
254  *      only be used by <td>.
255  */
256 void
257 lwkt_initport_thread(lwkt_port_t port, thread_t td)
258 {
259     _lwkt_initport(port,
260                    lwkt_thread_getport,
261                    lwkt_thread_putport,
262                    lwkt_thread_waitmsg,
263                    lwkt_thread_waitport,
264                    lwkt_thread_replyport,
265                    lwkt_thread_dropmsg);
266     port->mpu_td = td;
267 }
268
269 /*
270  * lwkt_initport_spin()
271  *
272  *      Initialize a port for use with descriptors that might be accessed
273  *      via multiple LWPs, processes, or threads.  Has somewhat more
274  *      overhead then thread ports.
275  */
276 void
277 lwkt_initport_spin(lwkt_port_t port)
278 {
279     _lwkt_initport(port,
280                    lwkt_spin_getport,
281                    lwkt_spin_putport,
282                    lwkt_spin_waitmsg,
283                    lwkt_spin_waitport,
284                    lwkt_spin_replyport,
285                    lwkt_panic_dropmsg);
286     spin_init(&port->mpu_spin);
287 }
288
289 /*
290  * lwkt_initport_serialize()
291  *
292  *      Initialize a port for use with descriptors that might be accessed
293  *      via multiple LWPs, processes, or threads.  Callers are assumed to
294  *      have held the serializer (slz).
295  */
296 void
297 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
298 {
299     _lwkt_initport(port,
300                    lwkt_serialize_getport,
301                    lwkt_serialize_putport,
302                    lwkt_serialize_waitmsg,
303                    lwkt_serialize_waitport,
304                    lwkt_serialize_replyport,
305                    lwkt_panic_dropmsg);
306     port->mpu_serialize = slz;
307 }
308
309 /*
310  * Similar to the standard initport, this function simply marks the message
311  * as being done and does not attempt to return it to an originating port.
312  */
313 void
314 lwkt_initport_replyonly_null(lwkt_port_t port)
315 {
316     _lwkt_initport(port,
317                    lwkt_panic_getport,
318                    lwkt_panic_putport,
319                    lwkt_panic_waitmsg,
320                    lwkt_panic_waitport,
321                    lwkt_null_replyport,
322                    lwkt_panic_dropmsg);
323 }
324
325 /*
326  * Initialize a reply-only port, typically used as a message sink.  Such
327  * ports can only be used as a reply port.
328  */
329 void
330 lwkt_initport_replyonly(lwkt_port_t port,
331                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
332 {
333     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
334                          lwkt_panic_waitmsg, lwkt_panic_waitport,
335                          rportfn, lwkt_panic_dropmsg);
336 }
337
338 void
339 lwkt_initport_putonly(lwkt_port_t port,
340                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
341 {
342     _lwkt_initport(port, lwkt_panic_getport, pportfn,
343                          lwkt_panic_waitmsg, lwkt_panic_waitport,
344                          lwkt_panic_replyport, lwkt_panic_dropmsg);
345 }
346
347 void
348 lwkt_initport_panic(lwkt_port_t port)
349 {
350     _lwkt_initport(port,
351                    lwkt_panic_getport, lwkt_panic_putport,
352                    lwkt_panic_waitmsg, lwkt_panic_waitport,
353                    lwkt_panic_replyport, lwkt_panic_dropmsg);
354 }
355
356 static __inline
357 void
358 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
359 {
360     lwkt_msg_queue *queue;
361
362     /*
363      * normal case, remove and return the message.
364      */
365     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
366         queue = &port->mp_msgq_prio;
367     else
368         queue = &port->mp_msgq;
369     TAILQ_REMOVE(queue, msg, ms_node);
370
371     /*
372      * atomic op needed for spin ports
373      */
374     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
375 }
376
377 static __inline
378 void
379 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
380 {
381     lwkt_msg_queue *queue;
382
383     /*
384      * atomic op needed for spin ports
385      */
386     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
387     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
388         queue = &port->mp_msgq_prio;
389     else
390         queue = &port->mp_msgq;
391     TAILQ_INSERT_TAIL(queue, msg, ms_node);
392 }
393
394 static __inline
395 lwkt_msg_t
396 _lwkt_pollmsg(lwkt_port_t port)
397 {
398     lwkt_msg_t msg;
399
400     msg = TAILQ_FIRST(&port->mp_msgq_prio);
401     if (__predict_false(msg != NULL))
402         return msg;
403
404     /*
405      * Priority queue has no message, fallback to non-priority queue.
406      */
407     return TAILQ_FIRST(&port->mp_msgq);
408 }
409
410 static __inline
411 void
412 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
413 {
414     /*
415      * atomic op needed for spin ports
416      */
417     _lwkt_pushmsg(port, msg);
418     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
419 }
420
421 /************************************************************************
422  *                      THREAD PORT BACKEND                             *
423  ************************************************************************
424  *
425  * This backend is used when the port a message is retrieved from is owned
426  * by a single thread (the calling thread).  Messages are IPId to the
427  * correct cpu before being enqueued to a port.  Note that this is fairly
428  * optimal since scheduling would have had to do an IPI anyway if the
429  * message were headed to a different cpu.
430  */
431
432 #ifdef SMP
433
434 /*
435  * This function completes reply processing for the default case in the
436  * context of the originating cpu.
437  */
438 static
439 void
440 lwkt_thread_replyport_remote(lwkt_msg_t msg)
441 {
442     lwkt_port_t port = msg->ms_reply_port;
443     int flags;
444
445     /*
446      * Chase any thread migration that occurs
447      */
448     if (port->mpu_td->td_gd != mycpu) {
449         lwkt_send_ipiq(port->mpu_td->td_gd,
450                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
451         return;
452     }
453
454     /*
455      * Cleanup
456      */
457 #ifdef INVARIANTS
458     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
459     msg->ms_flags &= ~MSGF_INTRANSIT;
460 #endif
461     flags = msg->ms_flags;
462     if (msg->ms_flags & MSGF_SYNC) {
463         cpu_sfence();
464         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
465     } else {
466         _lwkt_enqueue_reply(port, msg);
467     }
468     if (port->mp_flags & MSGPORTF_WAITING)
469         _lwkt_schedule_msg(port->mpu_td, flags);
470 }
471
472 #endif
473
474 /*
475  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
476  *
477  * Called with the reply port as an argument but in the context of the
478  * original target port.  Completion must occur on the target port's
479  * cpu.
480  *
481  * The critical section protects us from IPIs on the this CPU.
482  */
483 static
484 void
485 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
486 {
487     int flags;
488
489     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
490
491     if (msg->ms_flags & MSGF_SYNC) {
492         /*
493          * If a synchronous completion has been requested, just wakeup
494          * the message without bothering to queue it to the target port.
495          *
496          * Assume the target thread is non-preemptive, so no critical
497          * section is required.
498          */
499 #ifdef SMP
500         if (port->mpu_td->td_gd == mycpu) {
501 #endif
502             flags = msg->ms_flags;
503             cpu_sfence();
504             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
505             if (port->mp_flags & MSGPORTF_WAITING)
506                 _lwkt_schedule_msg(port->mpu_td, flags);
507 #ifdef SMP
508         } else {
509 #ifdef INVARIANTS
510             msg->ms_flags |= MSGF_INTRANSIT;
511 #endif
512             msg->ms_flags |= MSGF_REPLY;
513             lwkt_send_ipiq(port->mpu_td->td_gd,
514                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
515         }
516 #endif
517     } else {
518         /*
519          * If an asynchronous completion has been requested the message
520          * must be queued to the reply port.
521          *
522          * A critical section is required to interlock the port queue.
523          */
524 #ifdef SMP
525         if (port->mpu_td->td_gd == mycpu) {
526 #endif
527             crit_enter();
528             _lwkt_enqueue_reply(port, msg);
529             if (port->mp_flags & MSGPORTF_WAITING)
530                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
531             crit_exit();
532 #ifdef SMP
533         } else {
534 #ifdef INVARIANTS
535             msg->ms_flags |= MSGF_INTRANSIT;
536 #endif
537             msg->ms_flags |= MSGF_REPLY;
538             lwkt_send_ipiq(port->mpu_td->td_gd,
539                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
540         }
541 #endif
542     }
543 }
544
545 /*
546  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
547  *
548  * This function could _only_ be used when caller is in the same thread
549  * as the message's target port owner thread.
550  */
551 static void
552 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
553 {
554     KASSERT(port->mpu_td == curthread,
555             ("message could only be dropped in the same thread "
556              "as the message target port thread\n"));
557     crit_enter_quick(port->mpu_td);
558     _lwkt_pullmsg(port, msg);
559     msg->ms_flags |= MSGF_DONE;
560     crit_exit_quick(port->mpu_td);
561 }
562
563 /*
564  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
565  *
566  * Called with the target port as an argument but in the context of the
567  * reply port.  This function always implements an asynchronous put to
568  * the target message port, and thus returns EASYNC.
569  *
570  * The message must already have cleared MSGF_DONE and MSGF_REPLY
571  */
572
573 #ifdef SMP
574
575 static
576 void
577 lwkt_thread_putport_remote(lwkt_msg_t msg)
578 {
579     lwkt_port_t port = msg->ms_target_port;
580
581     /*
582      * Chase any thread migration that occurs
583      */
584     if (port->mpu_td->td_gd != mycpu) {
585         lwkt_send_ipiq(port->mpu_td->td_gd,
586                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
587         return;
588     }
589
590     /*
591      * Cleanup
592      */
593 #ifdef INVARIANTS
594     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
595     msg->ms_flags &= ~MSGF_INTRANSIT;
596 #endif
597     _lwkt_pushmsg(port, msg);
598     if (port->mp_flags & MSGPORTF_WAITING)
599         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
600 }
601
602 #endif
603
604 static
605 int
606 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
607 {
608     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
609
610     msg->ms_target_port = port;
611 #ifdef SMP
612     if (port->mpu_td->td_gd == mycpu) {
613 #endif
614         crit_enter();
615         _lwkt_pushmsg(port, msg);
616         if (port->mp_flags & MSGPORTF_WAITING)
617             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
618         crit_exit();
619 #ifdef SMP
620     } else {
621 #ifdef INVARIANTS
622         msg->ms_flags |= MSGF_INTRANSIT;
623 #endif
624         lwkt_send_ipiq(port->mpu_td->td_gd,
625                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
626     }
627 #endif
628     return (EASYNC);
629 }
630
631 /*
632  * lwkt_thread_getport()
633  *
634  *      Retrieve the next message from the port or NULL if no messages
635  *      are ready.
636  */
637 static
638 void *
639 lwkt_thread_getport(lwkt_port_t port)
640 {
641     lwkt_msg_t msg;
642
643     KKASSERT(port->mpu_td == curthread);
644
645     crit_enter_quick(port->mpu_td);
646     if ((msg = _lwkt_pollmsg(port)) != NULL)
647         _lwkt_pullmsg(port, msg);
648     crit_exit_quick(port->mpu_td);
649     return(msg);
650 }
651
652 /*
653  * lwkt_thread_waitmsg()
654  *
655  *      Wait for a particular message to be replied.  We must be the only
656  *      thread waiting on the message.  The port must be owned by the
657  *      caller.
658  */
659 static
660 int
661 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
662 {
663     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
664             ("can't wait dropable message\n"));
665
666     if ((msg->ms_flags & MSGF_DONE) == 0) {
667         /*
668          * If the done bit was not set we have to block until it is.
669          */
670         lwkt_port_t port = msg->ms_reply_port;
671         thread_t td = curthread;
672         int sentabort;
673
674         KKASSERT(port->mpu_td == td);
675         crit_enter_quick(td);
676         sentabort = 0;
677
678         while ((msg->ms_flags & MSGF_DONE) == 0) {
679             port->mp_flags |= MSGPORTF_WAITING;
680             if (sentabort == 0) {
681                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
682                     lwkt_abortmsg(msg);
683                 }
684             } else {
685                 lwkt_sleep("waitabt", 0);
686             }
687             port->mp_flags &= ~MSGPORTF_WAITING;
688         }
689         if (msg->ms_flags & MSGF_QUEUED)
690             _lwkt_pullmsg(port, msg);
691         crit_exit_quick(td);
692     } else {
693         /*
694          * If the done bit was set we only have to mess around with the
695          * message if it is queued on the reply port.
696          */
697         if (msg->ms_flags & MSGF_QUEUED) {
698             lwkt_port_t port = msg->ms_reply_port;
699             thread_t td = curthread;
700
701             KKASSERT(port->mpu_td == td);
702             crit_enter_quick(td);
703             _lwkt_pullmsg(port, msg);
704             crit_exit_quick(td);
705         }
706     }
707     return(msg->ms_error);
708 }
709
710 static
711 void *
712 lwkt_thread_waitport(lwkt_port_t port, int flags)
713 {
714     thread_t td = curthread;
715     lwkt_msg_t msg;
716     int error;
717
718     KKASSERT(port->mpu_td == td);
719     crit_enter_quick(td);
720     while ((msg = _lwkt_pollmsg(port)) == NULL) {
721         port->mp_flags |= MSGPORTF_WAITING;
722         error = lwkt_sleep("waitport", flags);
723         port->mp_flags &= ~MSGPORTF_WAITING;
724         if (error)
725             goto done;
726     }
727     _lwkt_pullmsg(port, msg);
728 done:
729     crit_exit_quick(td);
730     return(msg);
731 }
732
733 /************************************************************************
734  *                         SPIN PORT BACKEND                            *
735  ************************************************************************
736  *
737  * This backend uses spinlocks instead of making assumptions about which
738  * thread is accessing the port.  It must be used when a port is not owned
739  * by a particular thread.  This is less optimal then thread ports but
740  * you don't have a choice if there are multiple threads accessing the port.
741  *
742  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
743  * on the message port, it is the responsibility of the code doing the
744  * wakeup to clear this flag rather then the blocked threads.  Some
745  * superfluous wakeups may occur, which is ok.
746  *
747  * XXX synchronous message wakeups are not current optimized.
748  */
749
750 static
751 void *
752 lwkt_spin_getport(lwkt_port_t port)
753 {
754     lwkt_msg_t msg;
755
756     spin_lock(&port->mpu_spin);
757     if ((msg = _lwkt_pollmsg(port)) != NULL)
758         _lwkt_pullmsg(port, msg);
759     spin_unlock(&port->mpu_spin);
760     return(msg);
761 }
762
763 static
764 int
765 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
766 {
767     int dowakeup;
768
769     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
770
771     msg->ms_target_port = port;
772     spin_lock(&port->mpu_spin);
773     _lwkt_pushmsg(port, msg);
774     dowakeup = 0;
775     if (port->mp_flags & MSGPORTF_WAITING) {
776         port->mp_flags &= ~MSGPORTF_WAITING;
777         dowakeup = 1;
778     }
779     spin_unlock(&port->mpu_spin);
780     if (dowakeup)
781         wakeup(port);
782     return (EASYNC);
783 }
784
785 static
786 int
787 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
788 {
789     lwkt_port_t port;
790     int sentabort;
791     int error;
792
793     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
794             ("can't wait dropable message\n"));
795
796     if ((msg->ms_flags & MSGF_DONE) == 0) {
797         port = msg->ms_reply_port;
798         sentabort = 0;
799         spin_lock(&port->mpu_spin);
800         while ((msg->ms_flags & MSGF_DONE) == 0) {
801             void *won;
802
803             /*
804              * If message was sent synchronously from the beginning
805              * the wakeup will be on the message structure, else it
806              * will be on the port structure.
807              */
808             if (msg->ms_flags & MSGF_SYNC) {
809                 won = msg;
810                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
811             } else {
812                 won = port;
813                 port->mp_flags |= MSGPORTF_WAITING;
814             }
815
816             /*
817              * Only messages which support abort can be interrupted.
818              * We must still wait for message completion regardless.
819              */
820             if ((flags & PCATCH) && sentabort == 0) {
821                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
822                 if (error) {
823                     sentabort = error;
824                     spin_unlock(&port->mpu_spin);
825                     lwkt_abortmsg(msg);
826                     spin_lock(&port->mpu_spin);
827                 }
828             } else {
829                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
830             }
831             /* see note at the top on the MSGPORTF_WAITING flag */
832         }
833         /*
834          * Turn EINTR into ERESTART if the signal indicates.
835          */
836         if (sentabort && msg->ms_error == EINTR)
837             msg->ms_error = sentabort;
838         if (msg->ms_flags & MSGF_QUEUED)
839             _lwkt_pullmsg(port, msg);
840         spin_unlock(&port->mpu_spin);
841     } else {
842         if (msg->ms_flags & MSGF_QUEUED) {
843             port = msg->ms_reply_port;
844             spin_lock(&port->mpu_spin);
845             _lwkt_pullmsg(port, msg);
846             spin_unlock(&port->mpu_spin);
847         }
848     }
849     return(msg->ms_error);
850 }
851
852 static
853 void *
854 lwkt_spin_waitport(lwkt_port_t port, int flags)
855 {
856     lwkt_msg_t msg;
857     int error;
858
859     spin_lock(&port->mpu_spin);
860     while ((msg = _lwkt_pollmsg(port)) == NULL) {
861         port->mp_flags |= MSGPORTF_WAITING;
862         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
863         /* see note at the top on the MSGPORTF_WAITING flag */
864         if (error) {
865             spin_unlock(&port->mpu_spin);
866             return(NULL);
867         }
868     }
869     _lwkt_pullmsg(port, msg);
870     spin_unlock(&port->mpu_spin);
871     return(msg);
872 }
873
874 static
875 void
876 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
877 {
878     int dowakeup;
879
880     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
881
882     if (msg->ms_flags & MSGF_SYNC) {
883         /*
884          * If a synchronous completion has been requested, just wakeup
885          * the message without bothering to queue it to the target port.
886          */
887         spin_lock(&port->mpu_spin);
888         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
889         dowakeup = 0;
890         if (msg->ms_flags & MSGF_WAITING) {
891                 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
892                 dowakeup = 1;
893         }
894         spin_unlock(&port->mpu_spin);
895         if (dowakeup)
896                 wakeup(msg);
897     } else {
898         /*
899          * If an asynchronous completion has been requested the message
900          * must be queued to the reply port.
901          */
902         spin_lock(&port->mpu_spin);
903         _lwkt_enqueue_reply(port, msg);
904         dowakeup = 0;
905         if (port->mp_flags & MSGPORTF_WAITING) {
906             port->mp_flags &= ~MSGPORTF_WAITING;
907             dowakeup = 1;
908         }
909         spin_unlock(&port->mpu_spin);
910         if (dowakeup)
911             wakeup(port);
912     }
913 }
914
915 /************************************************************************
916  *                        SERIALIZER PORT BACKEND                       *
917  ************************************************************************
918  *
919  * This backend uses serializer to protect port accessing.  Callers are
920  * assumed to have serializer held.  This kind of port is usually created
921  * by network device driver along with _one_ lwkt thread to pipeline
922  * operations which may temporarily release serializer.
923  *
924  * Implementation is based on SPIN PORT BACKEND.
925  */
926
927 static
928 void *
929 lwkt_serialize_getport(lwkt_port_t port)
930 {
931     lwkt_msg_t msg;
932
933     ASSERT_SERIALIZED(port->mpu_serialize);
934
935     if ((msg = _lwkt_pollmsg(port)) != NULL)
936         _lwkt_pullmsg(port, msg);
937     return(msg);
938 }
939
940 static
941 int
942 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
943 {
944     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
945     ASSERT_SERIALIZED(port->mpu_serialize);
946
947     msg->ms_target_port = port;
948     _lwkt_pushmsg(port, msg);
949     if (port->mp_flags & MSGPORTF_WAITING) {
950         port->mp_flags &= ~MSGPORTF_WAITING;
951         wakeup(port);
952     }
953     return (EASYNC);
954 }
955
956 static
957 int
958 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
959 {
960     lwkt_port_t port;
961     int sentabort;
962     int error;
963
964     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
965             ("can't wait dropable message\n"));
966
967     if ((msg->ms_flags & MSGF_DONE) == 0) {
968         port = msg->ms_reply_port;
969
970         ASSERT_SERIALIZED(port->mpu_serialize);
971
972         sentabort = 0;
973         while ((msg->ms_flags & MSGF_DONE) == 0) {
974             void *won;
975
976             /*
977              * If message was sent synchronously from the beginning
978              * the wakeup will be on the message structure, else it
979              * will be on the port structure.
980              */
981             if (msg->ms_flags & MSGF_SYNC) {
982                 won = msg;
983             } else {
984                 won = port;
985                 port->mp_flags |= MSGPORTF_WAITING;
986             }
987
988             /*
989              * Only messages which support abort can be interrupted.
990              * We must still wait for message completion regardless.
991              */
992             if ((flags & PCATCH) && sentabort == 0) {
993                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
994                 if (error) {
995                     sentabort = error;
996                     lwkt_serialize_exit(port->mpu_serialize);
997                     lwkt_abortmsg(msg);
998                     lwkt_serialize_enter(port->mpu_serialize);
999                 }
1000             } else {
1001                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1002             }
1003             /* see note at the top on the MSGPORTF_WAITING flag */
1004         }
1005         /*
1006          * Turn EINTR into ERESTART if the signal indicates.
1007          */
1008         if (sentabort && msg->ms_error == EINTR)
1009             msg->ms_error = sentabort;
1010         if (msg->ms_flags & MSGF_QUEUED)
1011             _lwkt_pullmsg(port, msg);
1012     } else {
1013         if (msg->ms_flags & MSGF_QUEUED) {
1014             port = msg->ms_reply_port;
1015
1016             ASSERT_SERIALIZED(port->mpu_serialize);
1017             _lwkt_pullmsg(port, msg);
1018         }
1019     }
1020     return(msg->ms_error);
1021 }
1022
1023 static
1024 void *
1025 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1026 {
1027     lwkt_msg_t msg;
1028     int error;
1029
1030     ASSERT_SERIALIZED(port->mpu_serialize);
1031
1032     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1033         port->mp_flags |= MSGPORTF_WAITING;
1034         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1035         /* see note at the top on the MSGPORTF_WAITING flag */
1036         if (error)
1037             return(NULL);
1038     }
1039     _lwkt_pullmsg(port, msg);
1040     return(msg);
1041 }
1042
1043 static
1044 void
1045 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1046 {
1047     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1048     ASSERT_SERIALIZED(port->mpu_serialize);
1049
1050     if (msg->ms_flags & MSGF_SYNC) {
1051         /*
1052          * If a synchronous completion has been requested, just wakeup
1053          * the message without bothering to queue it to the target port.
1054          */
1055         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1056         wakeup(msg);
1057     } else {
1058         /*
1059          * If an asynchronous completion has been requested the message
1060          * must be queued to the reply port.
1061          */
1062         _lwkt_enqueue_reply(port, msg);
1063         if (port->mp_flags & MSGPORTF_WAITING) {
1064             port->mp_flags &= ~MSGPORTF_WAITING;
1065             wakeup(port);
1066         }
1067     }
1068 }
1069
1070 /************************************************************************
1071  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1072  ************************************************************************/
1073
1074 /*
1075  * You can point a port's reply vector at this function if you just want
1076  * the message marked done, without any queueing or signaling.  This is
1077  * often used for structure-embedded messages.
1078  */
1079 static
1080 void
1081 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1082 {
1083     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1084 }
1085
1086 static
1087 void *
1088 lwkt_panic_getport(lwkt_port_t port)
1089 {
1090     panic("lwkt_getport() illegal on port %p", port);
1091 }
1092
1093 static
1094 int
1095 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1096 {
1097     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1098 }
1099
1100 static
1101 int
1102 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1103 {
1104     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1105 }
1106
1107 static
1108 void *
1109 lwkt_panic_waitport(lwkt_port_t port, int flags)
1110 {
1111     panic("port %p cannot be waited on", port);
1112 }
1113
1114 static
1115 void
1116 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1117 {
1118     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1119 }
1120
1121 static
1122 void
1123 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1124 {
1125     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1126 }