kernel: Add a missing vn_unlock().
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72
73 /************************************************************************
74  *                              MESSAGE FUNCTIONS                       *
75  ************************************************************************/
76
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
79 {
80     return port->mp_putport(port, msg);
81 }
82
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
85 {
86     return port->mp_putport_oncpu(port, msg);
87 }
88
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
91 {
92     KKASSERT(msg->ms_reply_port != NULL &&
93              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
95 }
96
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
99 {
100     int error;
101
102     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
103         /*
104          * Target port opted to execute the message synchronously so
105          * queue the response.
106          */
107         lwkt_replymsg(msg, error);
108     }
109 }
110
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
113 {
114     int error;
115
116     if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
117         /*
118          * Target port opted to execute the message synchronously so
119          * queue the response.
120          */
121         lwkt_replymsg(msg, error);
122     }
123 }
124
125 /*
126  * lwkt_sendmsg()
127  *
128  *      Request asynchronous completion and call lwkt_beginmsg().  The
129  *      target port can opt to execute the message synchronously or
130  *      asynchronously and this function will automatically queue the
131  *      response if the target executes the message synchronously.
132  *
133  *      NOTE: The message is in an indeterminant state until this call
134  *      returns.  The caller should not mess with it (e.g. try to abort it)
135  *      until then.
136  *
137  *      NOTE: Do not use this function to forward a message as we might
138  *      clobber ms_flags in a SMP race.
139  */
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
142 {
143     _lwkt_sendmsg_prepare(port, msg);
144     _lwkt_sendmsg_start(port, msg);
145 }
146
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
149 {
150     _lwkt_sendmsg_prepare(port, msg);
151     _lwkt_sendmsg_start_oncpu(port, msg);
152 }
153
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
156 {
157     _lwkt_sendmsg_prepare(port, msg);
158 }
159
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
162 {
163     _lwkt_sendmsg_start(port, msg);
164 }
165
166 /*
167  * lwkt_domsg()
168  *
169  *      Request synchronous completion and call lwkt_beginmsg().  The
170  *      target port can opt to execute the message synchronously or
171  *      asynchronously and this function will automatically block and
172  *      wait for a response if the target executes the message
173  *      asynchronously.
174  *
175  *      NOTE: Do not use this function to forward a message as we might
176  *      clobber ms_flags in a SMP race.
177  */
178 int
179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
180 {
181     int error;
182
183     KKASSERT(msg->ms_reply_port != NULL &&
184              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
185     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
186     msg->ms_flags |= MSGF_SYNC;
187     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
188         /*
189          * Target port opted to execute the message asynchronously so
190          * block and wait for a reply.
191          */
192         error = lwkt_waitmsg(msg, flags);
193     } else {
194         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
195     }
196     return(error);
197 }
198
199 /*
200  * lwkt_forwardmsg()
201  *
202  * Forward a message received on one port to another port.
203  */
204 int
205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
206 {   
207     int error;
208
209     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
210     if ((error = port->mp_putport(port, msg)) != EASYNC)
211         lwkt_replymsg(msg, error);
212     return(error);
213 }
214
215 /*
216  * lwkt_abortmsg()
217  *
218  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
219  * The caller must ensure that the message will not be both replied AND
220  * destroyed while the abort is in progress.
221  *
222  * This function issues a callback which might block!
223  */
224 void
225 lwkt_abortmsg(lwkt_msg_t msg)
226 {
227     /*
228      * A critical section protects us from reply IPIs on this cpu.
229      */
230     crit_enter();
231
232     /*
233      * Shortcut the operation if the message has already been returned.
234      * The callback typically constructs a lwkt_msg with the abort request,
235      * issues it synchronously, and waits for completion.  The callback
236      * is not required to actually abort the message and the target port,
237      * upon receiving an abort request message generated by the callback
238      * should check whether the original message has already completed or
239      * not.
240      */
241     if (msg->ms_flags & MSGF_ABORTABLE) {
242         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
243             msg->ms_abortfn(msg);
244     }
245     crit_exit();
246 }
247
248 /************************************************************************
249  *                      PORT INITIALIZATION API                         *
250  ************************************************************************/
251
252 static void *lwkt_thread_getport(lwkt_port_t port);
253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
258
259 static void *lwkt_spin_getport(lwkt_port_t port);
260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
266
267 static void *lwkt_serialize_getport(lwkt_port_t port);
268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
272
273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
274 static void *lwkt_panic_getport(lwkt_port_t port);
275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
281
282 /*
283  * Core port initialization (internal)
284  */
285 static __inline
286 void
287 _lwkt_initport(lwkt_port_t port,
288                void *(*gportfn)(lwkt_port_t),
289                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
290                int (*wmsgfn)(lwkt_msg_t, int),
291                void *(*wportfn)(lwkt_port_t, int),
292                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
293                int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
294                int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
295 {
296     bzero(port, sizeof(*port));
297     port->mp_cpuid = -1;
298     TAILQ_INIT(&port->mp_msgq);
299     TAILQ_INIT(&port->mp_msgq_prio);
300     port->mp_getport = gportfn;
301     port->mp_putport = pportfn;
302     port->mp_waitmsg = wmsgfn;
303     port->mp_waitport = wportfn;
304     port->mp_replyport = rportfn;
305     port->mp_dropmsg = dmsgfn;
306     port->mp_putport_oncpu = pportfn_oncpu;
307 }
308
309 /*
310  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
311  * we tell the scheduler not to reschedule if td is at a higher priority.
312  *
313  * This routine is called even if the thread is already scheduled.
314  */
315 static __inline
316 void
317 _lwkt_schedule_msg(thread_t td, int flags)
318 {
319     lwkt_schedule(td);
320 }
321
322 /*
323  * lwkt_initport_thread()
324  *
325  *      Initialize a port for use by a particular thread.  The port may
326  *      only be used by <td>.
327  */
328 void
329 lwkt_initport_thread(lwkt_port_t port, thread_t td)
330 {
331     _lwkt_initport(port,
332                    lwkt_thread_getport,
333                    lwkt_thread_putport,
334                    lwkt_thread_waitmsg,
335                    lwkt_thread_waitport,
336                    lwkt_thread_replyport,
337                    lwkt_thread_dropmsg,
338                    lwkt_thread_putport);
339     port->mpu_td = td;
340 }
341
342 /*
343  * lwkt_initport_spin()
344  *
345  *      Initialize a port for use with descriptors that might be accessed
346  *      via multiple LWPs, processes, or threads.  Has somewhat more
347  *      overhead then thread ports.
348  */
349 void
350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
351 {
352     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
353     int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
354
355     if (td == NULL)
356         dmsgfn = lwkt_panic_dropmsg;
357     else
358         dmsgfn = lwkt_spin_dropmsg;
359
360     if (fixed_cpuid)
361         pportfn_oncpu = lwkt_spin_putport_oncpu;
362     else
363         pportfn_oncpu = lwkt_panic_putport_oncpu;
364
365     _lwkt_initport(port,
366                    lwkt_spin_getport,
367                    lwkt_spin_putport,
368                    lwkt_spin_waitmsg,
369                    lwkt_spin_waitport,
370                    lwkt_spin_replyport,
371                    dmsgfn,
372                    pportfn_oncpu);
373     spin_init(&port->mpu_spin);
374     port->mpu_td = td;
375     if (fixed_cpuid)
376         port->mp_cpuid = td->td_gd->gd_cpuid;
377 }
378
379 /*
380  * lwkt_initport_serialize()
381  *
382  *      Initialize a port for use with descriptors that might be accessed
383  *      via multiple LWPs, processes, or threads.  Callers are assumed to
384  *      have held the serializer (slz).
385  */
386 void
387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
388 {
389     _lwkt_initport(port,
390                    lwkt_serialize_getport,
391                    lwkt_serialize_putport,
392                    lwkt_serialize_waitmsg,
393                    lwkt_serialize_waitport,
394                    lwkt_serialize_replyport,
395                    lwkt_panic_dropmsg,
396                    lwkt_panic_putport_oncpu);
397     port->mpu_serialize = slz;
398 }
399
400 /*
401  * Similar to the standard initport, this function simply marks the message
402  * as being done and does not attempt to return it to an originating port.
403  */
404 void
405 lwkt_initport_replyonly_null(lwkt_port_t port)
406 {
407     _lwkt_initport(port,
408                    lwkt_panic_getport,
409                    lwkt_panic_putport,
410                    lwkt_panic_waitmsg,
411                    lwkt_panic_waitport,
412                    lwkt_null_replyport,
413                    lwkt_panic_dropmsg,
414                    lwkt_panic_putport_oncpu);
415 }
416
417 /*
418  * Initialize a reply-only port, typically used as a message sink.  Such
419  * ports can only be used as a reply port.
420  */
421 void
422 lwkt_initport_replyonly(lwkt_port_t port,
423                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
424 {
425     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
426                          lwkt_panic_waitmsg, lwkt_panic_waitport,
427                          rportfn, lwkt_panic_dropmsg,
428                          lwkt_panic_putport_oncpu);
429 }
430
431 void
432 lwkt_initport_putonly(lwkt_port_t port,
433                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
434 {
435     _lwkt_initport(port, lwkt_panic_getport, pportfn,
436                          lwkt_panic_waitmsg, lwkt_panic_waitport,
437                          lwkt_panic_replyport, lwkt_panic_dropmsg,
438                          lwkt_panic_putport_oncpu);
439 }
440
441 void
442 lwkt_initport_panic(lwkt_port_t port)
443 {
444     _lwkt_initport(port,
445                    lwkt_panic_getport, lwkt_panic_putport,
446                    lwkt_panic_waitmsg, lwkt_panic_waitport,
447                    lwkt_panic_replyport, lwkt_panic_dropmsg,
448                    lwkt_panic_putport_oncpu);
449 }
450
451 static __inline
452 void
453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
454 {
455     lwkt_msg_queue *queue;
456
457     /*
458      * normal case, remove and return the message.
459      */
460     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
461         queue = &port->mp_msgq_prio;
462     else
463         queue = &port->mp_msgq;
464     TAILQ_REMOVE(queue, msg, ms_node);
465
466     /*
467      * atomic op needed for spin ports
468      */
469     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
470 }
471
472 static __inline
473 void
474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
475 {
476     lwkt_msg_queue *queue;
477
478     /*
479      * atomic op needed for spin ports
480      */
481     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
482     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
483         queue = &port->mp_msgq_prio;
484     else
485         queue = &port->mp_msgq;
486     TAILQ_INSERT_TAIL(queue, msg, ms_node);
487 }
488
489 static __inline
490 lwkt_msg_t
491 _lwkt_pollmsg(lwkt_port_t port)
492 {
493     lwkt_msg_t msg;
494
495     msg = TAILQ_FIRST(&port->mp_msgq_prio);
496     if (__predict_false(msg != NULL))
497         return msg;
498
499     /*
500      * Priority queue has no message, fallback to non-priority queue.
501      */
502     return TAILQ_FIRST(&port->mp_msgq);
503 }
504
505 static __inline
506 void
507 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
508 {
509     /*
510      * atomic op needed for spin ports
511      */
512     _lwkt_pushmsg(port, msg);
513     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
514 }
515
516 /************************************************************************
517  *                      THREAD PORT BACKEND                             *
518  ************************************************************************
519  *
520  * This backend is used when the port a message is retrieved from is owned
521  * by a single thread (the calling thread).  Messages are IPId to the
522  * correct cpu before being enqueued to a port.  Note that this is fairly
523  * optimal since scheduling would have had to do an IPI anyway if the
524  * message were headed to a different cpu.
525  */
526
527 /*
528  * This function completes reply processing for the default case in the
529  * context of the originating cpu.
530  */
531 static
532 void
533 lwkt_thread_replyport_remote(lwkt_msg_t msg)
534 {
535     lwkt_port_t port = msg->ms_reply_port;
536     int flags;
537
538     /*
539      * Chase any thread migration that occurs
540      */
541     if (port->mpu_td->td_gd != mycpu) {
542         lwkt_send_ipiq(port->mpu_td->td_gd,
543                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
544         return;
545     }
546
547     /*
548      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
549      */
550 #ifdef INVARIANTS
551     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
552     msg->ms_flags &= ~MSGF_INTRANSIT;
553 #endif
554     flags = msg->ms_flags;
555     if (msg->ms_flags & MSGF_SYNC) {
556         cpu_sfence();
557         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
558     } else {
559         _lwkt_enqueue_reply(port, msg);
560     }
561     if (port->mp_flags & MSGPORTF_WAITING)
562         _lwkt_schedule_msg(port->mpu_td, flags);
563 }
564
565 /*
566  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
567  *
568  * Called with the reply port as an argument but in the context of the
569  * original target port.  Completion must occur on the target port's
570  * cpu.
571  *
572  * The critical section protects us from IPIs on the this CPU.
573  */
574 static
575 void
576 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
577 {
578     int flags;
579
580     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
581
582     if (msg->ms_flags & MSGF_SYNC) {
583         /*
584          * If a synchronous completion has been requested, just wakeup
585          * the message without bothering to queue it to the target port.
586          *
587          * Assume the target thread is non-preemptive, so no critical
588          * section is required.
589          */
590         if (port->mpu_td->td_gd == mycpu) {
591             crit_enter();
592             flags = msg->ms_flags;
593             cpu_sfence();
594             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
595             if (port->mp_flags & MSGPORTF_WAITING)
596                 _lwkt_schedule_msg(port->mpu_td, flags);
597             crit_exit();
598         } else {
599 #ifdef INVARIANTS
600             atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
601 #endif
602             atomic_set_int(&msg->ms_flags, MSGF_REPLY);
603             lwkt_send_ipiq(port->mpu_td->td_gd,
604                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
605         }
606     } else {
607         /*
608          * If an asynchronous completion has been requested the message
609          * must be queued to the reply port.
610          *
611          * A critical section is required to interlock the port queue.
612          */
613         if (port->mpu_td->td_gd == mycpu) {
614             crit_enter();
615             _lwkt_enqueue_reply(port, msg);
616             if (port->mp_flags & MSGPORTF_WAITING)
617                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
618             crit_exit();
619         } else {
620 #ifdef INVARIANTS
621             atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
622 #endif
623             atomic_set_int(&msg->ms_flags, MSGF_REPLY);
624             lwkt_send_ipiq(port->mpu_td->td_gd,
625                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
626         }
627     }
628 }
629
630 /*
631  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
632  *
633  * This function could _only_ be used when caller is in the same thread
634  * as the message's target port owner thread.
635  */
636 static int
637 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
638 {
639     int error;
640
641     KASSERT(port->mpu_td == curthread,
642             ("message could only be dropped in the same thread "
643              "as the message target port thread"));
644     crit_enter_quick(port->mpu_td);
645     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
646             _lwkt_pullmsg(port, msg);
647             atomic_set_int(&msg->ms_flags, MSGF_DONE);
648             error = 0;
649     } else {
650             error = ENOENT;
651     }
652     crit_exit_quick(port->mpu_td);
653
654     return (error);
655 }
656
657 /*
658  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
659  *
660  * Called with the target port as an argument but in the context of the
661  * reply port.  This function always implements an asynchronous put to
662  * the target message port, and thus returns EASYNC.
663  *
664  * The message must already have cleared MSGF_DONE and MSGF_REPLY
665  */
666 static
667 void
668 lwkt_thread_putport_remote(lwkt_msg_t msg)
669 {
670     lwkt_port_t port = msg->ms_target_port;
671
672     /*
673      * Chase any thread migration that occurs
674      */
675     if (port->mpu_td->td_gd != mycpu) {
676         lwkt_send_ipiq(port->mpu_td->td_gd,
677                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
678         return;
679     }
680
681     /*
682      * An atomic op is needed on ms_flags vs originator.  Also
683      * note that the originator might be using a different type
684      * of msgport.
685      */
686 #ifdef INVARIANTS
687     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
688     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
689 #endif
690     _lwkt_pushmsg(port, msg);
691     if (port->mp_flags & MSGPORTF_WAITING)
692         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
693 }
694
695 static
696 int
697 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
698 {
699     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
700
701     msg->ms_target_port = port;
702     if (port->mpu_td->td_gd == mycpu) {
703         crit_enter();
704         _lwkt_pushmsg(port, msg);
705         if (port->mp_flags & MSGPORTF_WAITING)
706             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
707         crit_exit();
708     } else {
709 #ifdef INVARIANTS
710         /*
711          * Cleanup.
712          *
713          * An atomic op is needed on ms_flags vs originator.  Also
714          * note that the originator might be using a different type
715          * of msgport.
716          */
717         atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
718 #endif
719         lwkt_send_ipiq(port->mpu_td->td_gd,
720                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
721     }
722     return (EASYNC);
723 }
724
725 /*
726  * lwkt_thread_getport()
727  *
728  *      Retrieve the next message from the port or NULL if no messages
729  *      are ready.
730  */
731 static
732 void *
733 lwkt_thread_getport(lwkt_port_t port)
734 {
735     lwkt_msg_t msg;
736
737     KKASSERT(port->mpu_td == curthread);
738
739     crit_enter_quick(port->mpu_td);
740     if ((msg = _lwkt_pollmsg(port)) != NULL)
741         _lwkt_pullmsg(port, msg);
742     crit_exit_quick(port->mpu_td);
743     return(msg);
744 }
745
746 /*
747  * lwkt_thread_waitmsg()
748  *
749  *      Wait for a particular message to be replied.  We must be the only
750  *      thread waiting on the message.  The port must be owned by the
751  *      caller.
752  */
753 static
754 int
755 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
756 {
757     thread_t td = curthread;
758
759     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
760             ("can't wait dropable message"));
761
762     if ((msg->ms_flags & MSGF_DONE) == 0) {
763         /*
764          * If the done bit was not set we have to block until it is.
765          */
766         lwkt_port_t port = msg->ms_reply_port;
767         int sentabort;
768
769         KKASSERT(port->mpu_td == td);
770         crit_enter_quick(td);
771         sentabort = 0;
772
773         while ((msg->ms_flags & MSGF_DONE) == 0) {
774             port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
775             if (sentabort == 0) {
776                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
777                     lwkt_abortmsg(msg);
778                 }
779             } else {
780                 lwkt_sleep("waitabt", 0);
781             }
782             port->mp_flags &= ~MSGPORTF_WAITING;
783         }
784         if (msg->ms_flags & MSGF_QUEUED)
785             _lwkt_pullmsg(port, msg);
786         crit_exit_quick(td);
787     } else {
788         /*
789          * If the done bit was set we only have to mess around with the
790          * message if it is queued on the reply port.
791          */
792         crit_enter_quick(td);
793         if (msg->ms_flags & MSGF_QUEUED) {
794             lwkt_port_t port = msg->ms_reply_port;
795             thread_t td __debugvar = curthread;
796
797             KKASSERT(port->mpu_td == td);
798             _lwkt_pullmsg(port, msg);
799         }
800         crit_exit_quick(td);
801     }
802     return(msg->ms_error);
803 }
804
805 /*
806  * lwkt_thread_waitport()
807  *
808  *      Wait for a new message to be available on the port.  We must be the
809  *      the only thread waiting on the port.  The port must be owned by caller.
810  */
811 static
812 void *
813 lwkt_thread_waitport(lwkt_port_t port, int flags)
814 {
815     thread_t td = curthread;
816     lwkt_msg_t msg;
817     int error;
818
819     KKASSERT(port->mpu_td == td);
820     crit_enter_quick(td);
821     while ((msg = _lwkt_pollmsg(port)) == NULL) {
822         port->mp_flags |= MSGPORTF_WAITING;
823         error = lwkt_sleep("waitport", flags);
824         port->mp_flags &= ~MSGPORTF_WAITING;
825         if (error)
826             goto done;
827     }
828     _lwkt_pullmsg(port, msg);
829 done:
830     crit_exit_quick(td);
831     return(msg);
832 }
833
834 /************************************************************************
835  *                         SPIN PORT BACKEND                            *
836  ************************************************************************
837  *
838  * This backend uses spinlocks instead of making assumptions about which
839  * thread is accessing the port.  It must be used when a port is not owned
840  * by a particular thread.  This is less optimal then thread ports but
841  * you don't have a choice if there are multiple threads accessing the port.
842  *
843  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
844  * on the message port, it is the responsibility of the code doing the
845  * wakeup to clear this flag rather then the blocked threads.  Some
846  * superfluous wakeups may occur, which is ok.
847  *
848  * XXX synchronous message wakeups are not current optimized.
849  */
850
851 static
852 void *
853 lwkt_spin_getport(lwkt_port_t port)
854 {
855     lwkt_msg_t msg;
856
857     spin_lock(&port->mpu_spin);
858     if ((msg = _lwkt_pollmsg(port)) != NULL)
859         _lwkt_pullmsg(port, msg);
860     spin_unlock(&port->mpu_spin);
861     return(msg);
862 }
863
864 static __inline int
865 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
866 {
867     int dowakeup;
868
869     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
870
871     msg->ms_target_port = port;
872     spin_lock(&port->mpu_spin);
873     _lwkt_pushmsg(port, msg);
874     dowakeup = 0;
875     if (port->mp_flags & MSGPORTF_WAITING) {
876         port->mp_flags &= ~MSGPORTF_WAITING;
877         dowakeup = 1;
878     }
879     spin_unlock(&port->mpu_spin);
880
881     return dowakeup;
882 }
883
884 static
885 int
886 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
887 {
888     if (lwkt_spin_putport_only(port, msg))
889         wakeup(port);
890     return (EASYNC);
891 }
892
893 static
894 int
895 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
896 {
897     KASSERT(port->mp_cpuid == mycpuid,
898         ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
899          port->mp_cpuid, mycpuid));
900     if (lwkt_spin_putport_only(port, msg))
901         wakeup_mycpu(port);
902     return (EASYNC);
903 }
904
905 static
906 int
907 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
908 {
909     lwkt_port_t port;
910     int sentabort;
911     int error;
912
913     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
914             ("can't wait dropable message"));
915     port = msg->ms_reply_port;
916
917     if ((msg->ms_flags & MSGF_DONE) == 0) {
918         sentabort = 0;
919         spin_lock(&port->mpu_spin);
920         while ((msg->ms_flags & MSGF_DONE) == 0) {
921             void *won;
922
923             /*
924              * If message was sent synchronously from the beginning
925              * the wakeup will be on the message structure, else it
926              * will be on the port structure.
927              *
928              * ms_flags needs atomic op originator vs target MSGF_QUEUED
929              */
930             if (msg->ms_flags & MSGF_SYNC) {
931                 won = msg;
932                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
933             } else {
934                 won = port;
935                 port->mp_flags |= MSGPORTF_WAITING;
936             }
937
938             /*
939              * Only messages which support abort can be interrupted.
940              * We must still wait for message completion regardless.
941              */
942             if ((flags & PCATCH) && sentabort == 0) {
943                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
944                 if (error) {
945                     sentabort = error;
946                     spin_unlock(&port->mpu_spin);
947                     lwkt_abortmsg(msg);
948                     spin_lock(&port->mpu_spin);
949                 }
950             } else {
951                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
952             }
953             /* see note at the top on the MSGPORTF_WAITING flag */
954         }
955         /*
956          * Turn EINTR into ERESTART if the signal indicates.
957          */
958         if (sentabort && msg->ms_error == EINTR)
959             msg->ms_error = sentabort;
960         if (msg->ms_flags & MSGF_QUEUED)
961             _lwkt_pullmsg(port, msg);
962         spin_unlock(&port->mpu_spin);
963     } else {
964         spin_lock(&port->mpu_spin);
965         if (msg->ms_flags & MSGF_QUEUED) {
966             _lwkt_pullmsg(port, msg);
967         }
968         spin_unlock(&port->mpu_spin);
969     }
970     return(msg->ms_error);
971 }
972
973 static
974 void *
975 lwkt_spin_waitport(lwkt_port_t port, int flags)
976 {
977     lwkt_msg_t msg;
978     int error;
979
980     spin_lock(&port->mpu_spin);
981     while ((msg = _lwkt_pollmsg(port)) == NULL) {
982         port->mp_flags |= MSGPORTF_WAITING;
983         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
984         /* see note at the top on the MSGPORTF_WAITING flag */
985         if (error) {
986             spin_unlock(&port->mpu_spin);
987             return(NULL);
988         }
989     }
990     _lwkt_pullmsg(port, msg);
991     spin_unlock(&port->mpu_spin);
992     return(msg);
993 }
994
995 static
996 void
997 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
998 {
999     int dowakeup;
1000
1001     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1002
1003     if (msg->ms_flags & MSGF_SYNC) {
1004         /*
1005          * If a synchronous completion has been requested, just wakeup
1006          * the message without bothering to queue it to the target port.
1007          *
1008          * ms_flags protected by reply port spinlock
1009          */
1010         spin_lock(&port->mpu_spin);
1011         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1012         dowakeup = 0;
1013         if (msg->ms_flags & MSGF_WAITING) {
1014                 msg->ms_flags &= ~MSGF_WAITING;
1015                 dowakeup = 1;
1016         }
1017         spin_unlock(&port->mpu_spin);
1018         if (dowakeup)
1019                 wakeup(msg);
1020     } else {
1021         /*
1022          * If an asynchronous completion has been requested the message
1023          * must be queued to the reply port.
1024          */
1025         spin_lock(&port->mpu_spin);
1026         _lwkt_enqueue_reply(port, msg);
1027         dowakeup = 0;
1028         if (port->mp_flags & MSGPORTF_WAITING) {
1029             port->mp_flags &= ~MSGPORTF_WAITING;
1030             dowakeup = 1;
1031         }
1032         spin_unlock(&port->mpu_spin);
1033         if (dowakeup)
1034             wakeup(port);
1035     }
1036 }
1037
1038 /*
1039  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1040  *
1041  * This function could _only_ be used when caller is in the same thread
1042  * as the message's target port owner thread.
1043  */
1044 static int
1045 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1046 {
1047     int error;
1048
1049     KASSERT(port->mpu_td == curthread,
1050             ("message could only be dropped in the same thread "
1051              "as the message target port thread\n"));
1052     spin_lock(&port->mpu_spin);
1053     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1054             _lwkt_pullmsg(port, msg);
1055             msg->ms_flags |= MSGF_DONE;
1056             error = 0;
1057     } else {
1058             error = ENOENT;
1059     }
1060     spin_unlock(&port->mpu_spin);
1061
1062     return (error);
1063 }
1064
1065 /************************************************************************
1066  *                        SERIALIZER PORT BACKEND                       *
1067  ************************************************************************
1068  *
1069  * This backend uses serializer to protect port accessing.  Callers are
1070  * assumed to have serializer held.  This kind of port is usually created
1071  * by network device driver along with _one_ lwkt thread to pipeline
1072  * operations which may temporarily release serializer.
1073  *
1074  * Implementation is based on SPIN PORT BACKEND.
1075  */
1076
1077 static
1078 void *
1079 lwkt_serialize_getport(lwkt_port_t port)
1080 {
1081     lwkt_msg_t msg;
1082
1083     ASSERT_SERIALIZED(port->mpu_serialize);
1084
1085     if ((msg = _lwkt_pollmsg(port)) != NULL)
1086         _lwkt_pullmsg(port, msg);
1087     return(msg);
1088 }
1089
1090 static
1091 int
1092 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1093 {
1094     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1095     ASSERT_SERIALIZED(port->mpu_serialize);
1096
1097     msg->ms_target_port = port;
1098     _lwkt_pushmsg(port, msg);
1099     if (port->mp_flags & MSGPORTF_WAITING) {
1100         port->mp_flags &= ~MSGPORTF_WAITING;
1101         wakeup(port);
1102     }
1103     return (EASYNC);
1104 }
1105
1106 static
1107 int
1108 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1109 {
1110     lwkt_port_t port;
1111     int sentabort;
1112     int error;
1113
1114     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1115             ("can't wait dropable message"));
1116
1117     if ((msg->ms_flags & MSGF_DONE) == 0) {
1118         port = msg->ms_reply_port;
1119
1120         ASSERT_SERIALIZED(port->mpu_serialize);
1121
1122         sentabort = 0;
1123         while ((msg->ms_flags & MSGF_DONE) == 0) {
1124             void *won;
1125
1126             /*
1127              * If message was sent synchronously from the beginning
1128              * the wakeup will be on the message structure, else it
1129              * will be on the port structure.
1130              */
1131             if (msg->ms_flags & MSGF_SYNC) {
1132                 won = msg;
1133             } else {
1134                 won = port;
1135                 port->mp_flags |= MSGPORTF_WAITING;
1136             }
1137
1138             /*
1139              * Only messages which support abort can be interrupted.
1140              * We must still wait for message completion regardless.
1141              */
1142             if ((flags & PCATCH) && sentabort == 0) {
1143                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1144                 if (error) {
1145                     sentabort = error;
1146                     lwkt_serialize_exit(port->mpu_serialize);
1147                     lwkt_abortmsg(msg);
1148                     lwkt_serialize_enter(port->mpu_serialize);
1149                 }
1150             } else {
1151                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1152             }
1153             /* see note at the top on the MSGPORTF_WAITING flag */
1154         }
1155         /*
1156          * Turn EINTR into ERESTART if the signal indicates.
1157          */
1158         if (sentabort && msg->ms_error == EINTR)
1159             msg->ms_error = sentabort;
1160         if (msg->ms_flags & MSGF_QUEUED)
1161             _lwkt_pullmsg(port, msg);
1162     } else {
1163         if (msg->ms_flags & MSGF_QUEUED) {
1164             port = msg->ms_reply_port;
1165
1166             ASSERT_SERIALIZED(port->mpu_serialize);
1167             _lwkt_pullmsg(port, msg);
1168         }
1169     }
1170     return(msg->ms_error);
1171 }
1172
1173 static
1174 void *
1175 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1176 {
1177     lwkt_msg_t msg;
1178     int error;
1179
1180     ASSERT_SERIALIZED(port->mpu_serialize);
1181
1182     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1183         port->mp_flags |= MSGPORTF_WAITING;
1184         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1185         /* see note at the top on the MSGPORTF_WAITING flag */
1186         if (error)
1187             return(NULL);
1188     }
1189     _lwkt_pullmsg(port, msg);
1190     return(msg);
1191 }
1192
1193 static
1194 void
1195 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1196 {
1197     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1198     ASSERT_SERIALIZED(port->mpu_serialize);
1199
1200     if (msg->ms_flags & MSGF_SYNC) {
1201         /*
1202          * If a synchronous completion has been requested, just wakeup
1203          * the message without bothering to queue it to the target port.
1204          *
1205          * (both sides synchronized via serialized reply port)
1206          */
1207         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1208         wakeup(msg);
1209     } else {
1210         /*
1211          * If an asynchronous completion has been requested the message
1212          * must be queued to the reply port.
1213          */
1214         _lwkt_enqueue_reply(port, msg);
1215         if (port->mp_flags & MSGPORTF_WAITING) {
1216             port->mp_flags &= ~MSGPORTF_WAITING;
1217             wakeup(port);
1218         }
1219     }
1220 }
1221
1222 /************************************************************************
1223  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1224  ************************************************************************/
1225
1226 /*
1227  * You can point a port's reply vector at this function if you just want
1228  * the message marked done, without any queueing or signaling.  This is
1229  * often used for structure-embedded messages.
1230  */
1231 static
1232 void
1233 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1234 {
1235     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1236 }
1237
1238 static
1239 void *
1240 lwkt_panic_getport(lwkt_port_t port)
1241 {
1242     panic("lwkt_getport() illegal on port %p", port);
1243 }
1244
1245 static
1246 int
1247 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1248 {
1249     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1250 }
1251
1252 static
1253 int
1254 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1255 {
1256     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1257 }
1258
1259 static
1260 void *
1261 lwkt_panic_waitport(lwkt_port_t port, int flags)
1262 {
1263     panic("port %p cannot be waited on", port);
1264 }
1265
1266 static
1267 void
1268 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1269 {
1270     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1271 }
1272
1273 static
1274 int
1275 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1276 {
1277     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1278     /* NOT REACHED */
1279     return (ENOENT);
1280 }
1281
1282 static
1283 int
1284 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1285 {
1286     panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1287         port, msg);
1288     /* NOT REACHED */
1289     return (ENOENT);
1290 }