kernel - Add reapctl() system call for managing sub-processes
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72
73 /************************************************************************
74  *                              MESSAGE FUNCTIONS                       *
75  ************************************************************************/
76
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
79 {
80     return port->mp_putport(port, msg);
81 }
82
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
85 {
86     return port->mp_putport_oncpu(port, msg);
87 }
88
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
91 {
92     KKASSERT(msg->ms_reply_port != NULL &&
93              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
95 }
96
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
99 {
100     int error;
101
102     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
103         /*
104          * Target port opted to execute the message synchronously so
105          * queue the response.
106          */
107         lwkt_replymsg(msg, error);
108     }
109 }
110
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
113 {
114     int error;
115
116     if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
117         /*
118          * Target port opted to execute the message synchronously so
119          * queue the response.
120          */
121         lwkt_replymsg(msg, error);
122     }
123 }
124
125 /*
126  * lwkt_sendmsg()
127  *
128  *      Request asynchronous completion and call lwkt_beginmsg().  The
129  *      target port can opt to execute the message synchronously or
130  *      asynchronously and this function will automatically queue the
131  *      response if the target executes the message synchronously.
132  *
133  *      NOTE: The message is in an indeterminant state until this call
134  *      returns.  The caller should not mess with it (e.g. try to abort it)
135  *      until then.
136  *
137  *      NOTE: Do not use this function to forward a message as we might
138  *      clobber ms_flags in a SMP race.
139  */
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
142 {
143     _lwkt_sendmsg_prepare(port, msg);
144     _lwkt_sendmsg_start(port, msg);
145 }
146
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
149 {
150     _lwkt_sendmsg_prepare(port, msg);
151     _lwkt_sendmsg_start_oncpu(port, msg);
152 }
153
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
156 {
157     _lwkt_sendmsg_prepare(port, msg);
158 }
159
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
162 {
163     _lwkt_sendmsg_start(port, msg);
164 }
165
166 /*
167  * lwkt_domsg()
168  *
169  *      Request synchronous completion and call lwkt_beginmsg().  The
170  *      target port can opt to execute the message synchronously or
171  *      asynchronously and this function will automatically block and
172  *      wait for a response if the target executes the message
173  *      asynchronously.
174  *
175  *      NOTE: Do not use this function to forward a message as we might
176  *      clobber ms_flags in a SMP race.
177  */
178 int
179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
180 {
181     int error;
182
183     KKASSERT(msg->ms_reply_port != NULL &&
184              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
185     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
186     msg->ms_flags |= MSGF_SYNC;
187     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
188         /*
189          * Target port opted to execute the message asynchronously so
190          * block and wait for a reply.
191          */
192         error = lwkt_waitmsg(msg, flags);
193     } else {
194         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
195     }
196     return(error);
197 }
198
199 /*
200  * lwkt_forwardmsg()
201  *
202  * Forward a message received on one port to another port.
203  */
204 int
205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
206 {   
207     int error;
208
209     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
210     if ((error = port->mp_putport(port, msg)) != EASYNC)
211         lwkt_replymsg(msg, error);
212     return(error);
213 }
214
215 /*
216  * lwkt_abortmsg()
217  *
218  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
219  * The caller must ensure that the message will not be both replied AND
220  * destroyed while the abort is in progress.
221  *
222  * This function issues a callback which might block!
223  */
224 void
225 lwkt_abortmsg(lwkt_msg_t msg)
226 {
227     /*
228      * A critical section protects us from reply IPIs on this cpu.
229      */
230     crit_enter();
231
232     /*
233      * Shortcut the operation if the message has already been returned.
234      * The callback typically constructs a lwkt_msg with the abort request,
235      * issues it synchronously, and waits for completion.  The callback
236      * is not required to actually abort the message and the target port,
237      * upon receiving an abort request message generated by the callback
238      * should check whether the original message has already completed or
239      * not.
240      */
241     if (msg->ms_flags & MSGF_ABORTABLE) {
242         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
243             msg->ms_abortfn(msg);
244     }
245     crit_exit();
246 }
247
248 /************************************************************************
249  *                      PORT INITIALIZATION API                         *
250  ************************************************************************/
251
252 static void *lwkt_thread_getport(lwkt_port_t port);
253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
258
259 static void *lwkt_spin_getport(lwkt_port_t port);
260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
266
267 static void *lwkt_serialize_getport(lwkt_port_t port);
268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
272
273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
274 static void *lwkt_panic_getport(lwkt_port_t port);
275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
281
282 /*
283  * Core port initialization (internal)
284  */
285 static __inline
286 void
287 _lwkt_initport(lwkt_port_t port,
288                void *(*gportfn)(lwkt_port_t),
289                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
290                int (*wmsgfn)(lwkt_msg_t, int),
291                void *(*wportfn)(lwkt_port_t, int),
292                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
293                int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
294                int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
295 {
296     bzero(port, sizeof(*port));
297     port->mp_cpuid = -1;
298     TAILQ_INIT(&port->mp_msgq);
299     TAILQ_INIT(&port->mp_msgq_prio);
300     port->mp_getport = gportfn;
301     port->mp_putport = pportfn;
302     port->mp_waitmsg = wmsgfn;
303     port->mp_waitport = wportfn;
304     port->mp_replyport = rportfn;
305     port->mp_dropmsg = dmsgfn;
306     port->mp_putport_oncpu = pportfn_oncpu;
307 }
308
309 /*
310  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
311  * we tell the scheduler not to reschedule if td is at a higher priority.
312  *
313  * This routine is called even if the thread is already scheduled.
314  */
315 static __inline
316 void
317 _lwkt_schedule_msg(thread_t td, int flags)
318 {
319     lwkt_schedule(td);
320 }
321
322 /*
323  * lwkt_initport_thread()
324  *
325  *      Initialize a port for use by a particular thread.  The port may
326  *      only be used by <td>.
327  */
328 void
329 lwkt_initport_thread(lwkt_port_t port, thread_t td)
330 {
331     _lwkt_initport(port,
332                    lwkt_thread_getport,
333                    lwkt_thread_putport,
334                    lwkt_thread_waitmsg,
335                    lwkt_thread_waitport,
336                    lwkt_thread_replyport,
337                    lwkt_thread_dropmsg,
338                    lwkt_thread_putport);
339     port->mpu_td = td;
340 }
341
342 /*
343  * lwkt_initport_spin()
344  *
345  *      Initialize a port for use with descriptors that might be accessed
346  *      via multiple LWPs, processes, or threads.  Has somewhat more
347  *      overhead then thread ports.
348  */
349 void
350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
351 {
352     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
353     int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
354
355     if (td == NULL)
356         dmsgfn = lwkt_panic_dropmsg;
357     else
358         dmsgfn = lwkt_spin_dropmsg;
359
360     if (fixed_cpuid)
361         pportfn_oncpu = lwkt_spin_putport_oncpu;
362     else
363         pportfn_oncpu = lwkt_panic_putport_oncpu;
364
365     _lwkt_initport(port,
366                    lwkt_spin_getport,
367                    lwkt_spin_putport,
368                    lwkt_spin_waitmsg,
369                    lwkt_spin_waitport,
370                    lwkt_spin_replyport,
371                    dmsgfn,
372                    pportfn_oncpu);
373     spin_init(&port->mpu_spin, "lwktinitport");
374     port->mpu_td = td;
375     if (fixed_cpuid)
376         port->mp_cpuid = td->td_gd->gd_cpuid;
377 }
378
379 /*
380  * lwkt_initport_serialize()
381  *
382  *      Initialize a port for use with descriptors that might be accessed
383  *      via multiple LWPs, processes, or threads.  Callers are assumed to
384  *      have held the serializer (slz).
385  */
386 void
387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
388 {
389     _lwkt_initport(port,
390                    lwkt_serialize_getport,
391                    lwkt_serialize_putport,
392                    lwkt_serialize_waitmsg,
393                    lwkt_serialize_waitport,
394                    lwkt_serialize_replyport,
395                    lwkt_panic_dropmsg,
396                    lwkt_panic_putport_oncpu);
397     port->mpu_serialize = slz;
398 }
399
400 /*
401  * Similar to the standard initport, this function simply marks the message
402  * as being done and does not attempt to return it to an originating port.
403  */
404 void
405 lwkt_initport_replyonly_null(lwkt_port_t port)
406 {
407     _lwkt_initport(port,
408                    lwkt_panic_getport,
409                    lwkt_panic_putport,
410                    lwkt_panic_waitmsg,
411                    lwkt_panic_waitport,
412                    lwkt_null_replyport,
413                    lwkt_panic_dropmsg,
414                    lwkt_panic_putport_oncpu);
415 }
416
417 /*
418  * Initialize a reply-only port, typically used as a message sink.  Such
419  * ports can only be used as a reply port.
420  */
421 void
422 lwkt_initport_replyonly(lwkt_port_t port,
423                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
424 {
425     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
426                          lwkt_panic_waitmsg, lwkt_panic_waitport,
427                          rportfn, lwkt_panic_dropmsg,
428                          lwkt_panic_putport_oncpu);
429 }
430
431 void
432 lwkt_initport_putonly(lwkt_port_t port,
433                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
434 {
435     _lwkt_initport(port, lwkt_panic_getport, pportfn,
436                          lwkt_panic_waitmsg, lwkt_panic_waitport,
437                          lwkt_panic_replyport, lwkt_panic_dropmsg,
438                          lwkt_panic_putport_oncpu);
439 }
440
441 void
442 lwkt_initport_panic(lwkt_port_t port)
443 {
444     _lwkt_initport(port,
445                    lwkt_panic_getport, lwkt_panic_putport,
446                    lwkt_panic_waitmsg, lwkt_panic_waitport,
447                    lwkt_panic_replyport, lwkt_panic_dropmsg,
448                    lwkt_panic_putport_oncpu);
449 }
450
451 static __inline
452 void
453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
454 {
455     lwkt_msg_queue *queue;
456
457     /*
458      * normal case, remove and return the message.
459      */
460     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
461         queue = &port->mp_msgq_prio;
462     else
463         queue = &port->mp_msgq;
464     TAILQ_REMOVE(queue, msg, ms_node);
465
466     /*
467      * atomic op needed for spin ports
468      */
469     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
470 }
471
472 static __inline
473 void
474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
475 {
476     lwkt_msg_queue *queue;
477
478     /*
479      * atomic op needed for spin ports
480      */
481     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
482     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
483         queue = &port->mp_msgq_prio;
484     else
485         queue = &port->mp_msgq;
486     TAILQ_INSERT_TAIL(queue, msg, ms_node);
487
488     if (msg->ms_flags & MSGF_RECEIPT) {
489         /*
490          * In case this message is forwarded later, the receipt
491          * flag was cleared once the receipt function is called.
492          */
493         atomic_clear_int(&msg->ms_flags, MSGF_RECEIPT);
494         msg->ms_receiptfn(msg, port);
495     }
496 }
497
498 static __inline
499 lwkt_msg_t
500 _lwkt_pollmsg(lwkt_port_t port)
501 {
502     lwkt_msg_t msg;
503
504     msg = TAILQ_FIRST(&port->mp_msgq_prio);
505     if (__predict_false(msg != NULL))
506         return msg;
507
508     /*
509      * Priority queue has no message, fallback to non-priority queue.
510      */
511     return TAILQ_FIRST(&port->mp_msgq);
512 }
513
514 static __inline
515 void
516 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
517 {
518     /*
519      * atomic op needed for spin ports
520      */
521     _lwkt_pushmsg(port, msg);
522     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
523 }
524
525 /************************************************************************
526  *                      THREAD PORT BACKEND                             *
527  ************************************************************************
528  *
529  * This backend is used when the port a message is retrieved from is owned
530  * by a single thread (the calling thread).  Messages are IPId to the
531  * correct cpu before being enqueued to a port.  Note that this is fairly
532  * optimal since scheduling would have had to do an IPI anyway if the
533  * message were headed to a different cpu.
534  */
535
536 /*
537  * This function completes reply processing for the default case in the
538  * context of the originating cpu.
539  */
540 static
541 void
542 lwkt_thread_replyport_remote(lwkt_msg_t msg)
543 {
544     lwkt_port_t port = msg->ms_reply_port;
545     int flags;
546
547     /*
548      * Chase any thread migration that occurs
549      */
550     if (port->mpu_td->td_gd != mycpu) {
551         lwkt_send_ipiq(port->mpu_td->td_gd,
552                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
553         return;
554     }
555
556     /*
557      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
558      */
559 #ifdef INVARIANTS
560     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
561     msg->ms_flags &= ~MSGF_INTRANSIT;
562 #endif
563     flags = msg->ms_flags;
564     if (msg->ms_flags & MSGF_SYNC) {
565         cpu_sfence();
566         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
567     } else {
568         _lwkt_enqueue_reply(port, msg);
569     }
570     if (port->mp_flags & MSGPORTF_WAITING)
571         _lwkt_schedule_msg(port->mpu_td, flags);
572 }
573
574 /*
575  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
576  *
577  * Called with the reply port as an argument but in the context of the
578  * original target port.  Completion must occur on the target port's
579  * cpu.
580  *
581  * The critical section protects us from IPIs on the this CPU.
582  */
583 static
584 void
585 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
586 {
587     int flags;
588
589     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
590
591     if (msg->ms_flags & MSGF_SYNC) {
592         /*
593          * If a synchronous completion has been requested, just wakeup
594          * the message without bothering to queue it to the target port.
595          *
596          * Assume the target thread is non-preemptive, so no critical
597          * section is required.
598          */
599         if (port->mpu_td->td_gd == mycpu) {
600             crit_enter();
601             flags = msg->ms_flags;
602             cpu_sfence();
603             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
604             if (port->mp_flags & MSGPORTF_WAITING)
605                 _lwkt_schedule_msg(port->mpu_td, flags);
606             crit_exit();
607         } else {
608 #ifdef INVARIANTS
609             atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
610 #endif
611             atomic_set_int(&msg->ms_flags, MSGF_REPLY);
612             lwkt_send_ipiq(port->mpu_td->td_gd,
613                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
614         }
615     } else {
616         /*
617          * If an asynchronous completion has been requested the message
618          * must be queued to the reply port.
619          *
620          * A critical section is required to interlock the port queue.
621          */
622         if (port->mpu_td->td_gd == mycpu) {
623             crit_enter();
624             _lwkt_enqueue_reply(port, msg);
625             if (port->mp_flags & MSGPORTF_WAITING)
626                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
627             crit_exit();
628         } else {
629 #ifdef INVARIANTS
630             atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
631 #endif
632             atomic_set_int(&msg->ms_flags, MSGF_REPLY);
633             lwkt_send_ipiq(port->mpu_td->td_gd,
634                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
635         }
636     }
637 }
638
639 /*
640  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
641  *
642  * This function could _only_ be used when caller is in the same thread
643  * as the message's target port owner thread.
644  */
645 static int
646 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
647 {
648     int error;
649
650     KASSERT(port->mpu_td == curthread,
651             ("message could only be dropped in the same thread "
652              "as the message target port thread"));
653     crit_enter_quick(port->mpu_td);
654     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
655             _lwkt_pullmsg(port, msg);
656             atomic_set_int(&msg->ms_flags, MSGF_DONE);
657             error = 0;
658     } else {
659             error = ENOENT;
660     }
661     crit_exit_quick(port->mpu_td);
662
663     return (error);
664 }
665
666 /*
667  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
668  *
669  * Called with the target port as an argument but in the context of the
670  * reply port.  This function always implements an asynchronous put to
671  * the target message port, and thus returns EASYNC.
672  *
673  * The message must already have cleared MSGF_DONE and MSGF_REPLY
674  */
675 static
676 void
677 lwkt_thread_putport_remote(lwkt_msg_t msg)
678 {
679     lwkt_port_t port = msg->ms_target_port;
680
681     /*
682      * Chase any thread migration that occurs
683      */
684     if (port->mpu_td->td_gd != mycpu) {
685         lwkt_send_ipiq(port->mpu_td->td_gd,
686                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
687         return;
688     }
689
690     /*
691      * An atomic op is needed on ms_flags vs originator.  Also
692      * note that the originator might be using a different type
693      * of msgport.
694      */
695 #ifdef INVARIANTS
696     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
697     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
698 #endif
699     _lwkt_pushmsg(port, msg);
700     if (port->mp_flags & MSGPORTF_WAITING)
701         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
702 }
703
704 static
705 int
706 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
707 {
708     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
709
710     msg->ms_target_port = port;
711     if (port->mpu_td->td_gd == mycpu) {
712         crit_enter();
713         _lwkt_pushmsg(port, msg);
714         if (port->mp_flags & MSGPORTF_WAITING)
715             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
716         crit_exit();
717     } else {
718 #ifdef INVARIANTS
719         /*
720          * Cleanup.
721          *
722          * An atomic op is needed on ms_flags vs originator.  Also
723          * note that the originator might be using a different type
724          * of msgport.
725          */
726         atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
727 #endif
728         lwkt_send_ipiq(port->mpu_td->td_gd,
729                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
730     }
731     return (EASYNC);
732 }
733
734 /*
735  * lwkt_thread_getport()
736  *
737  *      Retrieve the next message from the port or NULL if no messages
738  *      are ready.
739  */
740 static
741 void *
742 lwkt_thread_getport(lwkt_port_t port)
743 {
744     lwkt_msg_t msg;
745
746     KKASSERT(port->mpu_td == curthread);
747
748     crit_enter_quick(port->mpu_td);
749     if ((msg = _lwkt_pollmsg(port)) != NULL)
750         _lwkt_pullmsg(port, msg);
751     crit_exit_quick(port->mpu_td);
752     return(msg);
753 }
754
755 /*
756  * lwkt_thread_waitmsg()
757  *
758  *      Wait for a particular message to be replied.  We must be the only
759  *      thread waiting on the message.  The port must be owned by the
760  *      caller.
761  */
762 static
763 int
764 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
765 {
766     thread_t td = curthread;
767
768     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
769             ("can't wait dropable message"));
770
771     if ((msg->ms_flags & MSGF_DONE) == 0) {
772         /*
773          * If the done bit was not set we have to block until it is.
774          */
775         lwkt_port_t port = msg->ms_reply_port;
776         int sentabort;
777
778         KKASSERT(port->mpu_td == td);
779         crit_enter_quick(td);
780         sentabort = 0;
781
782         while ((msg->ms_flags & MSGF_DONE) == 0) {
783             port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
784             if (sentabort == 0) {
785                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
786                     lwkt_abortmsg(msg);
787                 }
788             } else {
789                 lwkt_sleep("waitabt", 0);
790             }
791             port->mp_flags &= ~MSGPORTF_WAITING;
792         }
793         if (msg->ms_flags & MSGF_QUEUED)
794             _lwkt_pullmsg(port, msg);
795         crit_exit_quick(td);
796     } else {
797         /*
798          * If the done bit was set we only have to mess around with the
799          * message if it is queued on the reply port.
800          */
801         crit_enter_quick(td);
802         if (msg->ms_flags & MSGF_QUEUED) {
803             lwkt_port_t port = msg->ms_reply_port;
804             thread_t td __debugvar = curthread;
805
806             KKASSERT(port->mpu_td == td);
807             _lwkt_pullmsg(port, msg);
808         }
809         crit_exit_quick(td);
810     }
811     return(msg->ms_error);
812 }
813
814 /*
815  * lwkt_thread_waitport()
816  *
817  *      Wait for a new message to be available on the port.  We must be the
818  *      the only thread waiting on the port.  The port must be owned by caller.
819  */
820 static
821 void *
822 lwkt_thread_waitport(lwkt_port_t port, int flags)
823 {
824     thread_t td = curthread;
825     lwkt_msg_t msg;
826     int error;
827
828     KKASSERT(port->mpu_td == td);
829     crit_enter_quick(td);
830     while ((msg = _lwkt_pollmsg(port)) == NULL) {
831         port->mp_flags |= MSGPORTF_WAITING;
832         error = lwkt_sleep("waitport", flags);
833         port->mp_flags &= ~MSGPORTF_WAITING;
834         if (error)
835             goto done;
836     }
837     _lwkt_pullmsg(port, msg);
838 done:
839     crit_exit_quick(td);
840     return(msg);
841 }
842
843 /************************************************************************
844  *                         SPIN PORT BACKEND                            *
845  ************************************************************************
846  *
847  * This backend uses spinlocks instead of making assumptions about which
848  * thread is accessing the port.  It must be used when a port is not owned
849  * by a particular thread.  This is less optimal then thread ports but
850  * you don't have a choice if there are multiple threads accessing the port.
851  *
852  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
853  * on the message port, it is the responsibility of the code doing the
854  * wakeup to clear this flag rather then the blocked threads.  Some
855  * superfluous wakeups may occur, which is ok.
856  *
857  * XXX synchronous message wakeups are not current optimized.
858  */
859
860 static
861 void *
862 lwkt_spin_getport(lwkt_port_t port)
863 {
864     lwkt_msg_t msg;
865
866     spin_lock(&port->mpu_spin);
867     if ((msg = _lwkt_pollmsg(port)) != NULL)
868         _lwkt_pullmsg(port, msg);
869     spin_unlock(&port->mpu_spin);
870     return(msg);
871 }
872
873 static __inline int
874 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
875 {
876     int dowakeup;
877
878     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
879
880     msg->ms_target_port = port;
881     spin_lock(&port->mpu_spin);
882     _lwkt_pushmsg(port, msg);
883     dowakeup = 0;
884     if (port->mp_flags & MSGPORTF_WAITING) {
885         port->mp_flags &= ~MSGPORTF_WAITING;
886         dowakeup = 1;
887     }
888     spin_unlock(&port->mpu_spin);
889
890     return dowakeup;
891 }
892
893 static
894 int
895 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
896 {
897     if (lwkt_spin_putport_only(port, msg))
898         wakeup(port);
899     return (EASYNC);
900 }
901
902 static
903 int
904 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
905 {
906     KASSERT(port->mp_cpuid == mycpuid,
907         ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
908          port->mp_cpuid, mycpuid));
909     if (lwkt_spin_putport_only(port, msg))
910         wakeup_mycpu(port);
911     return (EASYNC);
912 }
913
914 static
915 int
916 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
917 {
918     lwkt_port_t port;
919     int sentabort;
920     int error;
921
922     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
923             ("can't wait dropable message"));
924     port = msg->ms_reply_port;
925
926     if ((msg->ms_flags & MSGF_DONE) == 0) {
927         sentabort = 0;
928         spin_lock(&port->mpu_spin);
929         while ((msg->ms_flags & MSGF_DONE) == 0) {
930             void *won;
931
932             /*
933              * If message was sent synchronously from the beginning
934              * the wakeup will be on the message structure, else it
935              * will be on the port structure.
936              *
937              * ms_flags needs atomic op originator vs target MSGF_QUEUED
938              */
939             if (msg->ms_flags & MSGF_SYNC) {
940                 won = msg;
941                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
942             } else {
943                 won = port;
944                 port->mp_flags |= MSGPORTF_WAITING;
945             }
946
947             /*
948              * Only messages which support abort can be interrupted.
949              * We must still wait for message completion regardless.
950              */
951             if ((flags & PCATCH) && sentabort == 0) {
952                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
953                 if (error) {
954                     sentabort = error;
955                     spin_unlock(&port->mpu_spin);
956                     lwkt_abortmsg(msg);
957                     spin_lock(&port->mpu_spin);
958                 }
959             } else {
960                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
961             }
962             /* see note at the top on the MSGPORTF_WAITING flag */
963         }
964         /*
965          * Turn EINTR into ERESTART if the signal indicates.
966          */
967         if (sentabort && msg->ms_error == EINTR)
968             msg->ms_error = sentabort;
969         if (msg->ms_flags & MSGF_QUEUED)
970             _lwkt_pullmsg(port, msg);
971         spin_unlock(&port->mpu_spin);
972     } else {
973         spin_lock(&port->mpu_spin);
974         if (msg->ms_flags & MSGF_QUEUED) {
975             _lwkt_pullmsg(port, msg);
976         }
977         spin_unlock(&port->mpu_spin);
978     }
979     return(msg->ms_error);
980 }
981
982 static
983 void *
984 lwkt_spin_waitport(lwkt_port_t port, int flags)
985 {
986     lwkt_msg_t msg;
987     int error;
988
989     spin_lock(&port->mpu_spin);
990     while ((msg = _lwkt_pollmsg(port)) == NULL) {
991         port->mp_flags |= MSGPORTF_WAITING;
992         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
993         /* see note at the top on the MSGPORTF_WAITING flag */
994         if (error) {
995             spin_unlock(&port->mpu_spin);
996             return(NULL);
997         }
998     }
999     _lwkt_pullmsg(port, msg);
1000     spin_unlock(&port->mpu_spin);
1001     return(msg);
1002 }
1003
1004 static
1005 void
1006 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
1007 {
1008     int dowakeup;
1009
1010     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1011
1012     if (msg->ms_flags & MSGF_SYNC) {
1013         /*
1014          * If a synchronous completion has been requested, just wakeup
1015          * the message without bothering to queue it to the target port.
1016          *
1017          * ms_flags protected by reply port spinlock
1018          */
1019         spin_lock(&port->mpu_spin);
1020         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1021         dowakeup = 0;
1022         if (msg->ms_flags & MSGF_WAITING) {
1023                 msg->ms_flags &= ~MSGF_WAITING;
1024                 dowakeup = 1;
1025         }
1026         spin_unlock(&port->mpu_spin);
1027         if (dowakeup)
1028                 wakeup(msg);
1029     } else {
1030         /*
1031          * If an asynchronous completion has been requested the message
1032          * must be queued to the reply port.
1033          */
1034         spin_lock(&port->mpu_spin);
1035         _lwkt_enqueue_reply(port, msg);
1036         dowakeup = 0;
1037         if (port->mp_flags & MSGPORTF_WAITING) {
1038             port->mp_flags &= ~MSGPORTF_WAITING;
1039             dowakeup = 1;
1040         }
1041         spin_unlock(&port->mpu_spin);
1042         if (dowakeup)
1043             wakeup(port);
1044     }
1045 }
1046
1047 /*
1048  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1049  *
1050  * This function could _only_ be used when caller is in the same thread
1051  * as the message's target port owner thread.
1052  */
1053 static int
1054 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1055 {
1056     int error;
1057
1058     KASSERT(port->mpu_td == curthread,
1059             ("message could only be dropped in the same thread "
1060              "as the message target port thread\n"));
1061     spin_lock(&port->mpu_spin);
1062     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1063             _lwkt_pullmsg(port, msg);
1064             msg->ms_flags |= MSGF_DONE;
1065             error = 0;
1066     } else {
1067             error = ENOENT;
1068     }
1069     spin_unlock(&port->mpu_spin);
1070
1071     return (error);
1072 }
1073
1074 /************************************************************************
1075  *                        SERIALIZER PORT BACKEND                       *
1076  ************************************************************************
1077  *
1078  * This backend uses serializer to protect port accessing.  Callers are
1079  * assumed to have serializer held.  This kind of port is usually created
1080  * by network device driver along with _one_ lwkt thread to pipeline
1081  * operations which may temporarily release serializer.
1082  *
1083  * Implementation is based on SPIN PORT BACKEND.
1084  */
1085
1086 static
1087 void *
1088 lwkt_serialize_getport(lwkt_port_t port)
1089 {
1090     lwkt_msg_t msg;
1091
1092     ASSERT_SERIALIZED(port->mpu_serialize);
1093
1094     if ((msg = _lwkt_pollmsg(port)) != NULL)
1095         _lwkt_pullmsg(port, msg);
1096     return(msg);
1097 }
1098
1099 static
1100 int
1101 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1102 {
1103     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1104     ASSERT_SERIALIZED(port->mpu_serialize);
1105
1106     msg->ms_target_port = port;
1107     _lwkt_pushmsg(port, msg);
1108     if (port->mp_flags & MSGPORTF_WAITING) {
1109         port->mp_flags &= ~MSGPORTF_WAITING;
1110         wakeup(port);
1111     }
1112     return (EASYNC);
1113 }
1114
1115 static
1116 int
1117 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1118 {
1119     lwkt_port_t port;
1120     int sentabort;
1121     int error;
1122
1123     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1124             ("can't wait dropable message"));
1125
1126     if ((msg->ms_flags & MSGF_DONE) == 0) {
1127         port = msg->ms_reply_port;
1128
1129         ASSERT_SERIALIZED(port->mpu_serialize);
1130
1131         sentabort = 0;
1132         while ((msg->ms_flags & MSGF_DONE) == 0) {
1133             void *won;
1134
1135             /*
1136              * If message was sent synchronously from the beginning
1137              * the wakeup will be on the message structure, else it
1138              * will be on the port structure.
1139              */
1140             if (msg->ms_flags & MSGF_SYNC) {
1141                 won = msg;
1142             } else {
1143                 won = port;
1144                 port->mp_flags |= MSGPORTF_WAITING;
1145             }
1146
1147             /*
1148              * Only messages which support abort can be interrupted.
1149              * We must still wait for message completion regardless.
1150              */
1151             if ((flags & PCATCH) && sentabort == 0) {
1152                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1153                 if (error) {
1154                     sentabort = error;
1155                     lwkt_serialize_exit(port->mpu_serialize);
1156                     lwkt_abortmsg(msg);
1157                     lwkt_serialize_enter(port->mpu_serialize);
1158                 }
1159             } else {
1160                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1161             }
1162             /* see note at the top on the MSGPORTF_WAITING flag */
1163         }
1164         /*
1165          * Turn EINTR into ERESTART if the signal indicates.
1166          */
1167         if (sentabort && msg->ms_error == EINTR)
1168             msg->ms_error = sentabort;
1169         if (msg->ms_flags & MSGF_QUEUED)
1170             _lwkt_pullmsg(port, msg);
1171     } else {
1172         if (msg->ms_flags & MSGF_QUEUED) {
1173             port = msg->ms_reply_port;
1174
1175             ASSERT_SERIALIZED(port->mpu_serialize);
1176             _lwkt_pullmsg(port, msg);
1177         }
1178     }
1179     return(msg->ms_error);
1180 }
1181
1182 static
1183 void *
1184 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1185 {
1186     lwkt_msg_t msg;
1187     int error;
1188
1189     ASSERT_SERIALIZED(port->mpu_serialize);
1190
1191     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1192         port->mp_flags |= MSGPORTF_WAITING;
1193         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1194         /* see note at the top on the MSGPORTF_WAITING flag */
1195         if (error)
1196             return(NULL);
1197     }
1198     _lwkt_pullmsg(port, msg);
1199     return(msg);
1200 }
1201
1202 static
1203 void
1204 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1205 {
1206     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1207     ASSERT_SERIALIZED(port->mpu_serialize);
1208
1209     if (msg->ms_flags & MSGF_SYNC) {
1210         /*
1211          * If a synchronous completion has been requested, just wakeup
1212          * the message without bothering to queue it to the target port.
1213          *
1214          * (both sides synchronized via serialized reply port)
1215          */
1216         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1217         wakeup(msg);
1218     } else {
1219         /*
1220          * If an asynchronous completion has been requested the message
1221          * must be queued to the reply port.
1222          */
1223         _lwkt_enqueue_reply(port, msg);
1224         if (port->mp_flags & MSGPORTF_WAITING) {
1225             port->mp_flags &= ~MSGPORTF_WAITING;
1226             wakeup(port);
1227         }
1228     }
1229 }
1230
1231 /************************************************************************
1232  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1233  ************************************************************************/
1234
1235 /*
1236  * You can point a port's reply vector at this function if you just want
1237  * the message marked done, without any queueing or signaling.  This is
1238  * often used for structure-embedded messages.
1239  */
1240 static
1241 void
1242 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1243 {
1244     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1245 }
1246
1247 static
1248 void *
1249 lwkt_panic_getport(lwkt_port_t port)
1250 {
1251     panic("lwkt_getport() illegal on port %p", port);
1252 }
1253
1254 static
1255 int
1256 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1257 {
1258     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1259 }
1260
1261 static
1262 int
1263 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1264 {
1265     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1266 }
1267
1268 static
1269 void *
1270 lwkt_panic_waitport(lwkt_port_t port, int flags)
1271 {
1272     panic("port %p cannot be waited on", port);
1273 }
1274
1275 static
1276 void
1277 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1278 {
1279     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1280 }
1281
1282 static
1283 int
1284 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1285 {
1286     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1287     /* NOT REACHED */
1288     return (ENOENT);
1289 }
1290
1291 static
1292 int
1293 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1294 {
1295     panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1296         port, msg);
1297     /* NOT REACHED */
1298     return (ENOENT);
1299 }