Merge branch 'vendor/OPENSSH'
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
38  */
39
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76
77 /************************************************************************
78  *                              MESSAGE FUNCTIONS                       *
79  ************************************************************************/
80
81 /*
82  * lwkt_sendmsg()
83  *
84  *      Request asynchronous completion and call lwkt_beginmsg().  The
85  *      target port can opt to execute the message synchronously or
86  *      asynchronously and this function will automatically queue the
87  *      response if the target executes the message synchronously.
88  *
89  *      NOTE: The message is in an indeterminant state until this call
90  *      returns.  The caller should not mess with it (e.g. try to abort it)
91  *      until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97
98     KKASSERT(msg->ms_reply_port != NULL &&
99              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102         lwkt_replymsg(msg, error);
103     }
104 }
105
106 /*
107  * lwkt_domsg()
108  *
109  *      Request asynchronous completion and call lwkt_beginmsg().  The
110  *      target port can opt to execute the message synchronously or
111  *      asynchronously and this function will automatically queue the
112  *      response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118
119     KKASSERT(msg->ms_reply_port != NULL &&
120              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124         error = lwkt_waitmsg(msg, flags);
125     } else {
126         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {   
139     int error;
140
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144         lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177             msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181
182 /************************************************************************
183  *                      PORT INITIALIZATION API                         *
184  ************************************************************************/
185
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
192
193 static void *lwkt_spin_getport(lwkt_port_t port);
194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198
199 static void *lwkt_serialize_getport(lwkt_port_t port);
200 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
202 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
203 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204
205 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
206 static void *lwkt_panic_getport(lwkt_port_t port);
207 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
208 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
209 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
210 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
211 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
212
213 /*
214  * Core port initialization (internal)
215  */
216 static __inline
217 void
218 _lwkt_initport(lwkt_port_t port,
219                void *(*gportfn)(lwkt_port_t),
220                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
221                int (*wmsgfn)(lwkt_msg_t, int),
222                void *(*wportfn)(lwkt_port_t, int),
223                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
224                void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
225 {
226     bzero(port, sizeof(*port));
227     TAILQ_INIT(&port->mp_msgq);
228     TAILQ_INIT(&port->mp_msgq_prio);
229     port->mp_getport = gportfn;
230     port->mp_putport = pportfn;
231     port->mp_waitmsg =  wmsgfn;
232     port->mp_waitport =  wportfn;
233     port->mp_replyport = rportfn;
234     port->mp_dropmsg = dmsgfn;
235 }
236
237 /*
238  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
239  * we tell the scheduler not to reschedule if td is at a higher priority.
240  *
241  * This routine is called even if the thread is already scheduled so messages
242  * without NORESCHED will cause the target thread to be rescheduled even if
243  * prior messages did not.
244  */
245 static __inline
246 void
247 _lwkt_schedule_msg(thread_t td, int flags)
248 {
249     if (flags & MSGF_NORESCHED)
250         lwkt_schedule_noresched(td);
251     else
252         lwkt_schedule(td);
253 }
254
255 /*
256  * lwkt_initport_thread()
257  *
258  *      Initialize a port for use by a particular thread.  The port may
259  *      only be used by <td>.
260  */
261 void
262 lwkt_initport_thread(lwkt_port_t port, thread_t td)
263 {
264     _lwkt_initport(port,
265                    lwkt_thread_getport,
266                    lwkt_thread_putport,
267                    lwkt_thread_waitmsg,
268                    lwkt_thread_waitport,
269                    lwkt_thread_replyport,
270                    lwkt_thread_dropmsg);
271     port->mpu_td = td;
272 }
273
274 /*
275  * lwkt_initport_spin()
276  *
277  *      Initialize a port for use with descriptors that might be accessed
278  *      via multiple LWPs, processes, or threads.  Has somewhat more
279  *      overhead then thread ports.
280  */
281 void
282 lwkt_initport_spin(lwkt_port_t port)
283 {
284     _lwkt_initport(port,
285                    lwkt_spin_getport,
286                    lwkt_spin_putport,
287                    lwkt_spin_waitmsg,
288                    lwkt_spin_waitport,
289                    lwkt_spin_replyport,
290                    lwkt_panic_dropmsg);
291     spin_init(&port->mpu_spin);
292 }
293
294 /*
295  * lwkt_initport_serialize()
296  *
297  *      Initialize a port for use with descriptors that might be accessed
298  *      via multiple LWPs, processes, or threads.  Callers are assumed to
299  *      have held the serializer (slz).
300  */
301 void
302 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
303 {
304     _lwkt_initport(port,
305                    lwkt_serialize_getport,
306                    lwkt_serialize_putport,
307                    lwkt_serialize_waitmsg,
308                    lwkt_serialize_waitport,
309                    lwkt_serialize_replyport,
310                    lwkt_panic_dropmsg);
311     port->mpu_serialize = slz;
312 }
313
314 /*
315  * Similar to the standard initport, this function simply marks the message
316  * as being done and does not attempt to return it to an originating port.
317  */
318 void
319 lwkt_initport_replyonly_null(lwkt_port_t port)
320 {
321     _lwkt_initport(port,
322                    lwkt_panic_getport,
323                    lwkt_panic_putport,
324                    lwkt_panic_waitmsg,
325                    lwkt_panic_waitport,
326                    lwkt_null_replyport,
327                    lwkt_panic_dropmsg);
328 }
329
330 /*
331  * Initialize a reply-only port, typically used as a message sink.  Such
332  * ports can only be used as a reply port.
333  */
334 void
335 lwkt_initport_replyonly(lwkt_port_t port,
336                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
337 {
338     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
339                          lwkt_panic_waitmsg, lwkt_panic_waitport,
340                          rportfn, lwkt_panic_dropmsg);
341 }
342
343 void
344 lwkt_initport_putonly(lwkt_port_t port,
345                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
346 {
347     _lwkt_initport(port, lwkt_panic_getport, pportfn,
348                          lwkt_panic_waitmsg, lwkt_panic_waitport,
349                          lwkt_panic_replyport, lwkt_panic_dropmsg);
350 }
351
352 void
353 lwkt_initport_panic(lwkt_port_t port)
354 {
355     _lwkt_initport(port,
356                    lwkt_panic_getport, lwkt_panic_putport,
357                    lwkt_panic_waitmsg, lwkt_panic_waitport,
358                    lwkt_panic_replyport, lwkt_panic_dropmsg);
359 }
360
361 static __inline
362 void
363 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
364 {
365     lwkt_msg_queue *queue;
366
367     /*
368      * normal case, remove and return the message.
369      */
370     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
371         queue = &port->mp_msgq_prio;
372     else
373         queue = &port->mp_msgq;
374     TAILQ_REMOVE(queue, msg, ms_node);
375     msg->ms_flags &= ~MSGF_QUEUED;
376 }
377
378 static __inline
379 void
380 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
381 {
382     lwkt_msg_queue *queue;
383
384     msg->ms_flags |= MSGF_QUEUED;
385     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
386         queue = &port->mp_msgq_prio;
387     else
388         queue = &port->mp_msgq;
389     TAILQ_INSERT_TAIL(queue, msg, ms_node);
390 }
391
392 static __inline
393 lwkt_msg_t
394 _lwkt_pollmsg(lwkt_port_t port)
395 {
396     lwkt_msg_t msg;
397
398     msg = TAILQ_FIRST(&port->mp_msgq_prio);
399     if (__predict_false(msg != NULL))
400         return msg;
401
402     /*
403      * Priority queue has no message, fallback to non-priority queue.
404      */
405     return TAILQ_FIRST(&port->mp_msgq);
406 }
407
408 static __inline
409 void
410 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
411 {
412     _lwkt_pushmsg(port, msg);
413     msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
414 }
415
416 /************************************************************************
417  *                      THREAD PORT BACKEND                             *
418  ************************************************************************
419  *
420  * This backend is used when the port a message is retrieved from is owned
421  * by a single thread (the calling thread).  Messages are IPId to the
422  * correct cpu before being enqueued to a port.  Note that this is fairly
423  * optimal since scheduling would have had to do an IPI anyway if the
424  * message were headed to a different cpu.
425  */
426
427 #ifdef SMP
428
429 /*
430  * This function completes reply processing for the default case in the
431  * context of the originating cpu.
432  */
433 static
434 void
435 lwkt_thread_replyport_remote(lwkt_msg_t msg)
436 {
437     lwkt_port_t port = msg->ms_reply_port;
438     int flags;
439
440     /*
441      * Chase any thread migration that occurs
442      */
443     if (port->mpu_td->td_gd != mycpu) {
444         lwkt_send_ipiq(port->mpu_td->td_gd,
445                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
446         return;
447     }
448
449     /*
450      * Cleanup
451      */
452 #ifdef INVARIANTS
453     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
454     msg->ms_flags &= ~MSGF_INTRANSIT;
455 #endif
456     flags = msg->ms_flags;
457     if (msg->ms_flags & MSGF_SYNC) {
458         cpu_sfence();
459         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
460     } else {
461         _lwkt_enqueue_reply(port, msg);
462     }
463     if (port->mp_flags & MSGPORTF_WAITING)
464         _lwkt_schedule_msg(port->mpu_td, flags);
465 }
466
467 #endif
468
469 /*
470  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
471  *
472  * Called with the reply port as an argument but in the context of the
473  * original target port.  Completion must occur on the target port's
474  * cpu.
475  *
476  * The critical section protects us from IPIs on the this CPU.
477  */
478 static
479 void
480 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
481 {
482     int flags;
483
484     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
485
486     if (msg->ms_flags & MSGF_SYNC) {
487         /*
488          * If a synchronous completion has been requested, just wakeup
489          * the message without bothering to queue it to the target port.
490          *
491          * Assume the target thread is non-preemptive, so no critical
492          * section is required.
493          */
494 #ifdef SMP
495         if (port->mpu_td->td_gd == mycpu) {
496 #endif
497             flags = msg->ms_flags;
498             cpu_sfence();
499             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
500             if (port->mp_flags & MSGPORTF_WAITING)
501                 _lwkt_schedule_msg(port->mpu_td, flags);
502 #ifdef SMP
503         } else {
504 #ifdef INVARIANTS
505             msg->ms_flags |= MSGF_INTRANSIT;
506 #endif
507             msg->ms_flags |= MSGF_REPLY;
508             lwkt_send_ipiq(port->mpu_td->td_gd,
509                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
510         }
511 #endif
512     } else {
513         /*
514          * If an asynchronous completion has been requested the message
515          * must be queued to the reply port.
516          *
517          * A critical section is required to interlock the port queue.
518          */
519 #ifdef SMP
520         if (port->mpu_td->td_gd == mycpu) {
521 #endif
522             crit_enter();
523             _lwkt_enqueue_reply(port, msg);
524             if (port->mp_flags & MSGPORTF_WAITING)
525                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
526             crit_exit();
527 #ifdef SMP
528         } else {
529 #ifdef INVARIANTS
530             msg->ms_flags |= MSGF_INTRANSIT;
531 #endif
532             msg->ms_flags |= MSGF_REPLY;
533             lwkt_send_ipiq(port->mpu_td->td_gd,
534                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
535         }
536 #endif
537     }
538 }
539
540 /*
541  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
542  *
543  * This function could _only_ be used when caller is in the same thread
544  * as the message's target port owner thread.
545  */
546 static void
547 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
548 {
549     KASSERT(port->mpu_td == curthread,
550             ("message could only be dropped in the same thread "
551              "as the message target port thread\n"));
552     crit_enter_quick(port->mpu_td);
553     _lwkt_pullmsg(port, msg);
554     msg->ms_flags |= MSGF_DONE;
555     crit_exit_quick(port->mpu_td);
556 }
557
558 /*
559  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
560  *
561  * Called with the target port as an argument but in the context of the
562  * reply port.  This function always implements an asynchronous put to
563  * the target message port, and thus returns EASYNC.
564  *
565  * The message must already have cleared MSGF_DONE and MSGF_REPLY
566  */
567
568 #ifdef SMP
569
570 static
571 void
572 lwkt_thread_putport_remote(lwkt_msg_t msg)
573 {
574     lwkt_port_t port = msg->ms_target_port;
575
576     /*
577      * Chase any thread migration that occurs
578      */
579     if (port->mpu_td->td_gd != mycpu) {
580         lwkt_send_ipiq(port->mpu_td->td_gd,
581                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
582         return;
583     }
584
585     /*
586      * Cleanup
587      */
588 #ifdef INVARIANTS
589     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
590     msg->ms_flags &= ~MSGF_INTRANSIT;
591 #endif
592     _lwkt_pushmsg(port, msg);
593     if (port->mp_flags & MSGPORTF_WAITING)
594         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
595 }
596
597 #endif
598
599 static
600 int
601 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
602 {
603     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
604
605     msg->ms_target_port = port;
606 #ifdef SMP
607     if (port->mpu_td->td_gd == mycpu) {
608 #endif
609         crit_enter();
610         _lwkt_pushmsg(port, msg);
611         if (port->mp_flags & MSGPORTF_WAITING)
612             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
613         crit_exit();
614 #ifdef SMP
615     } else {
616 #ifdef INVARIANTS
617         msg->ms_flags |= MSGF_INTRANSIT;
618 #endif
619         lwkt_send_ipiq(port->mpu_td->td_gd,
620                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
621     }
622 #endif
623     return (EASYNC);
624 }
625
626 /*
627  * lwkt_thread_getport()
628  *
629  *      Retrieve the next message from the port or NULL if no messages
630  *      are ready.
631  */
632 static
633 void *
634 lwkt_thread_getport(lwkt_port_t port)
635 {
636     lwkt_msg_t msg;
637
638     KKASSERT(port->mpu_td == curthread);
639
640     crit_enter_quick(port->mpu_td);
641     if ((msg = _lwkt_pollmsg(port)) != NULL)
642         _lwkt_pullmsg(port, msg);
643     crit_exit_quick(port->mpu_td);
644     return(msg);
645 }
646
647 /*
648  * lwkt_thread_waitmsg()
649  *
650  *      Wait for a particular message to be replied.  We must be the only
651  *      thread waiting on the message.  The port must be owned by the
652  *      caller.
653  */
654 static
655 int
656 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
657 {
658     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
659             ("can't wait dropable message\n"));
660
661     if ((msg->ms_flags & MSGF_DONE) == 0) {
662         /*
663          * If the done bit was not set we have to block until it is.
664          */
665         lwkt_port_t port = msg->ms_reply_port;
666         thread_t td = curthread;
667         int sentabort;
668
669         KKASSERT(port->mpu_td == td);
670         crit_enter_quick(td);
671         sentabort = 0;
672
673         while ((msg->ms_flags & MSGF_DONE) == 0) {
674             port->mp_flags |= MSGPORTF_WAITING;
675             if (sentabort == 0) {
676                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
677                     lwkt_abortmsg(msg);
678                 }
679             } else {
680                 lwkt_sleep("waitabt", 0);
681             }
682             port->mp_flags &= ~MSGPORTF_WAITING;
683         }
684         if (msg->ms_flags & MSGF_QUEUED)
685             _lwkt_pullmsg(port, msg);
686         crit_exit_quick(td);
687     } else {
688         /*
689          * If the done bit was set we only have to mess around with the
690          * message if it is queued on the reply port.
691          */
692         if (msg->ms_flags & MSGF_QUEUED) {
693             lwkt_port_t port = msg->ms_reply_port;
694             thread_t td = curthread;
695
696             KKASSERT(port->mpu_td == td);
697             crit_enter_quick(td);
698             _lwkt_pullmsg(port, msg);
699             crit_exit_quick(td);
700         }
701     }
702     return(msg->ms_error);
703 }
704
705 static
706 void *
707 lwkt_thread_waitport(lwkt_port_t port, int flags)
708 {
709     thread_t td = curthread;
710     lwkt_msg_t msg;
711     int error;
712
713     KKASSERT(port->mpu_td == td);
714     crit_enter_quick(td);
715     while ((msg = _lwkt_pollmsg(port)) == NULL) {
716         port->mp_flags |= MSGPORTF_WAITING;
717         error = lwkt_sleep("waitport", flags);
718         port->mp_flags &= ~MSGPORTF_WAITING;
719         if (error)
720             goto done;
721     }
722     _lwkt_pullmsg(port, msg);
723 done:
724     crit_exit_quick(td);
725     return(msg);
726 }
727
728 /************************************************************************
729  *                         SPIN PORT BACKEND                            *
730  ************************************************************************
731  *
732  * This backend uses spinlocks instead of making assumptions about which
733  * thread is accessing the port.  It must be used when a port is not owned
734  * by a particular thread.  This is less optimal then thread ports but
735  * you don't have a choice if there are multiple threads accessing the port.
736  *
737  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
738  * on the message port, it is the responsibility of the code doing the
739  * wakeup to clear this flag rather then the blocked threads.  Some
740  * superfluous wakeups may occur, which is ok.
741  *
742  * XXX synchronous message wakeups are not current optimized.
743  */
744
745 static
746 void *
747 lwkt_spin_getport(lwkt_port_t port)
748 {
749     lwkt_msg_t msg;
750
751     spin_lock_wr(&port->mpu_spin);
752     if ((msg = _lwkt_pollmsg(port)) != NULL)
753         _lwkt_pullmsg(port, msg);
754     spin_unlock_wr(&port->mpu_spin);
755     return(msg);
756 }
757
758 static
759 int
760 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
761 {
762     int dowakeup;
763
764     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
765
766     msg->ms_target_port = port;
767     spin_lock_wr(&port->mpu_spin);
768     _lwkt_pushmsg(port, msg);
769     dowakeup = 0;
770     if (port->mp_flags & MSGPORTF_WAITING) {
771         port->mp_flags &= ~MSGPORTF_WAITING;
772         dowakeup = 1;
773     }
774     spin_unlock_wr(&port->mpu_spin);
775     if (dowakeup)
776         wakeup(port);
777     return (EASYNC);
778 }
779
780 static
781 int
782 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
783 {
784     lwkt_port_t port;
785     int sentabort;
786     int error;
787
788     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
789             ("can't wait dropable message\n"));
790
791     if ((msg->ms_flags & MSGF_DONE) == 0) {
792         port = msg->ms_reply_port;
793         sentabort = 0;
794         spin_lock_wr(&port->mpu_spin);
795         while ((msg->ms_flags & MSGF_DONE) == 0) {
796             void *won;
797
798             /*
799              * If message was sent synchronously from the beginning
800              * the wakeup will be on the message structure, else it
801              * will be on the port structure.
802              */
803             if (msg->ms_flags & MSGF_SYNC) {
804                 won = msg;
805             } else {
806                 won = port;
807                 port->mp_flags |= MSGPORTF_WAITING;
808             }
809
810             /*
811              * Only messages which support abort can be interrupted.
812              * We must still wait for message completion regardless.
813              */
814             if ((flags & PCATCH) && sentabort == 0) {
815                 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
816                 if (error) {
817                     sentabort = error;
818                     spin_unlock_wr(&port->mpu_spin);
819                     lwkt_abortmsg(msg);
820                     spin_lock_wr(&port->mpu_spin);
821                 }
822             } else {
823                 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
824             }
825             /* see note at the top on the MSGPORTF_WAITING flag */
826         }
827         /*
828          * Turn EINTR into ERESTART if the signal indicates.
829          */
830         if (sentabort && msg->ms_error == EINTR)
831             msg->ms_error = sentabort;
832         if (msg->ms_flags & MSGF_QUEUED)
833             _lwkt_pullmsg(port, msg);
834         spin_unlock_wr(&port->mpu_spin);
835     } else {
836         if (msg->ms_flags & MSGF_QUEUED) {
837             port = msg->ms_reply_port;
838             spin_lock_wr(&port->mpu_spin);
839             _lwkt_pullmsg(port, msg);
840             spin_unlock_wr(&port->mpu_spin);
841         }
842     }
843     return(msg->ms_error);
844 }
845
846 static
847 void *
848 lwkt_spin_waitport(lwkt_port_t port, int flags)
849 {
850     lwkt_msg_t msg;
851     int error;
852
853     spin_lock_wr(&port->mpu_spin);
854     while ((msg = _lwkt_pollmsg(port)) == NULL) {
855         port->mp_flags |= MSGPORTF_WAITING;
856         error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
857         /* see note at the top on the MSGPORTF_WAITING flag */
858         if (error) {
859             spin_unlock_wr(&port->mpu_spin);
860             return(NULL);
861         }
862     }
863     _lwkt_pullmsg(port, msg);
864     spin_unlock_wr(&port->mpu_spin);
865     return(msg);
866 }
867
868 static
869 void
870 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
871 {
872     int dowakeup;
873
874     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
875
876     if (msg->ms_flags & MSGF_SYNC) {
877         /*
878          * If a synchronous completion has been requested, just wakeup
879          * the message without bothering to queue it to the target port.
880          */
881         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
882         wakeup(msg);
883     } else {
884         /*
885          * If an asynchronous completion has been requested the message
886          * must be queued to the reply port.
887          */
888         spin_lock_wr(&port->mpu_spin);
889         _lwkt_enqueue_reply(port, msg);
890         dowakeup = 0;
891         if (port->mp_flags & MSGPORTF_WAITING) {
892             port->mp_flags &= ~MSGPORTF_WAITING;
893             dowakeup = 1;
894         }
895         spin_unlock_wr(&port->mpu_spin);
896         if (dowakeup)
897             wakeup(port);
898     }
899 }
900
901 /************************************************************************
902  *                        SERIALIZER PORT BACKEND                       *
903  ************************************************************************
904  *
905  * This backend uses serializer to protect port accessing.  Callers are
906  * assumed to have serializer held.  This kind of port is usually created
907  * by network device driver along with _one_ lwkt thread to pipeline
908  * operations which may temporarily release serializer.
909  *
910  * Implementation is based on SPIN PORT BACKEND.
911  */
912
913 static
914 void *
915 lwkt_serialize_getport(lwkt_port_t port)
916 {
917     lwkt_msg_t msg;
918
919     ASSERT_SERIALIZED(port->mpu_serialize);
920
921     if ((msg = _lwkt_pollmsg(port)) != NULL)
922         _lwkt_pullmsg(port, msg);
923     return(msg);
924 }
925
926 static
927 int
928 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
929 {
930     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
931     ASSERT_SERIALIZED(port->mpu_serialize);
932
933     msg->ms_target_port = port;
934     _lwkt_pushmsg(port, msg);
935     if (port->mp_flags & MSGPORTF_WAITING) {
936         port->mp_flags &= ~MSGPORTF_WAITING;
937         wakeup(port);
938     }
939     return (EASYNC);
940 }
941
942 static
943 int
944 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
945 {
946     lwkt_port_t port;
947     int sentabort;
948     int error;
949
950     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
951             ("can't wait dropable message\n"));
952
953     if ((msg->ms_flags & MSGF_DONE) == 0) {
954         port = msg->ms_reply_port;
955
956         ASSERT_SERIALIZED(port->mpu_serialize);
957
958         sentabort = 0;
959         while ((msg->ms_flags & MSGF_DONE) == 0) {
960             void *won;
961
962             /*
963              * If message was sent synchronously from the beginning
964              * the wakeup will be on the message structure, else it
965              * will be on the port structure.
966              */
967             if (msg->ms_flags & MSGF_SYNC) {
968                 won = msg;
969             } else {
970                 won = port;
971                 port->mp_flags |= MSGPORTF_WAITING;
972             }
973
974             /*
975              * Only messages which support abort can be interrupted.
976              * We must still wait for message completion regardless.
977              */
978             if ((flags & PCATCH) && sentabort == 0) {
979                 error = serialize_sleep(won, port->mpu_serialize, PCATCH,
980                                         "waitmsg", 0);
981                 if (error) {
982                     sentabort = error;
983                     lwkt_serialize_exit(port->mpu_serialize);
984                     lwkt_abortmsg(msg);
985                     lwkt_serialize_enter(port->mpu_serialize);
986                 }
987             } else {
988                 error = serialize_sleep(won, port->mpu_serialize, 0,
989                                         "waitmsg", 0);
990             }
991             /* see note at the top on the MSGPORTF_WAITING flag */
992         }
993         /*
994          * Turn EINTR into ERESTART if the signal indicates.
995          */
996         if (sentabort && msg->ms_error == EINTR)
997             msg->ms_error = sentabort;
998         if (msg->ms_flags & MSGF_QUEUED)
999             _lwkt_pullmsg(port, msg);
1000     } else {
1001         if (msg->ms_flags & MSGF_QUEUED) {
1002             port = msg->ms_reply_port;
1003
1004             ASSERT_SERIALIZED(port->mpu_serialize);
1005             _lwkt_pullmsg(port, msg);
1006         }
1007     }
1008     return(msg->ms_error);
1009 }
1010
1011 static
1012 void *
1013 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1014 {
1015     lwkt_msg_t msg;
1016     int error;
1017
1018     ASSERT_SERIALIZED(port->mpu_serialize);
1019
1020     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1021         port->mp_flags |= MSGPORTF_WAITING;
1022         error = serialize_sleep(port, port->mpu_serialize, flags,
1023                                 "waitport", 0);
1024         /* see note at the top on the MSGPORTF_WAITING flag */
1025         if (error)
1026             return(NULL);
1027     }
1028     _lwkt_pullmsg(port, msg);
1029     return(msg);
1030 }
1031
1032 static
1033 void
1034 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1035 {
1036     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1037     ASSERT_SERIALIZED(port->mpu_serialize);
1038
1039     if (msg->ms_flags & MSGF_SYNC) {
1040         /*
1041          * If a synchronous completion has been requested, just wakeup
1042          * the message without bothering to queue it to the target port.
1043          */
1044         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1045         wakeup(msg);
1046     } else {
1047         /*
1048          * If an asynchronous completion has been requested the message
1049          * must be queued to the reply port.
1050          */
1051         _lwkt_enqueue_reply(port, msg);
1052         if (port->mp_flags & MSGPORTF_WAITING) {
1053             port->mp_flags &= ~MSGPORTF_WAITING;
1054             wakeup(port);
1055         }
1056     }
1057 }
1058
1059 /************************************************************************
1060  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1061  ************************************************************************/
1062
1063 /*
1064  * You can point a port's reply vector at this function if you just want
1065  * the message marked done, without any queueing or signaling.  This is
1066  * often used for structure-embedded messages.
1067  */
1068 static
1069 void
1070 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1071 {
1072     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1073 }
1074
1075 static
1076 void *
1077 lwkt_panic_getport(lwkt_port_t port)
1078 {
1079     panic("lwkt_getport() illegal on port %p", port);
1080 }
1081
1082 static
1083 int
1084 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1085 {
1086     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1087 }
1088
1089 static
1090 int
1091 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1092 {
1093     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1094 }
1095
1096 static
1097 void *
1098 lwkt_panic_waitport(lwkt_port_t port, int flags)
1099 {
1100     panic("port %p cannot be waited on", port);
1101 }
1102
1103 static
1104 void
1105 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1106 {
1107     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1108 }
1109
1110 static
1111 void
1112 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1113 {
1114     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1115 }