kernel: Remove newlines from the panic messages that have one.
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #ifdef SMP
69 #include <machine/smp.h>
70 #endif
71
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
74
75 /************************************************************************
76  *                              MESSAGE FUNCTIONS                       *
77  ************************************************************************/
78
79 /*
80  * lwkt_sendmsg()
81  *
82  *      Request asynchronous completion and call lwkt_beginmsg().  The
83  *      target port can opt to execute the message synchronously or
84  *      asynchronously and this function will automatically queue the
85  *      response if the target executes the message synchronously.
86  *
87  *      NOTE: The message is in an indeterminant state until this call
88  *      returns.  The caller should not mess with it (e.g. try to abort it)
89  *      until then.
90  */
91 void
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
93 {
94     int error;
95
96     KKASSERT(msg->ms_reply_port != NULL &&
97              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
100         lwkt_replymsg(msg, error);
101     }
102 }
103
104 /*
105  * lwkt_domsg()
106  *
107  *      Request asynchronous completion and call lwkt_beginmsg().  The
108  *      target port can opt to execute the message synchronously or
109  *      asynchronously and this function will automatically queue the
110  *      response if the target executes the message synchronously.
111  */
112 int
113 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
114 {
115     int error;
116
117     KKASSERT(msg->ms_reply_port != NULL &&
118              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
119     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
120     msg->ms_flags |= MSGF_SYNC;
121     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
122         error = lwkt_waitmsg(msg, flags);
123     } else {
124         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
125     }
126     return(error);
127 }
128
129 /*
130  * lwkt_forwardmsg()
131  *
132  * Forward a message received on one port to another port.
133  */
134 int
135 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
136 {   
137     int error;
138
139     crit_enter();
140     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
141     if ((error = port->mp_putport(port, msg)) != EASYNC)
142         lwkt_replymsg(msg, error);
143     crit_exit();
144     return(error);
145 }
146
147 /*
148  * lwkt_abortmsg()
149  *
150  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
151  * The caller must ensure that the message will not be both replied AND
152  * destroyed while the abort is in progress.
153  *
154  * This function issues a callback which might block!
155  */
156 void
157 lwkt_abortmsg(lwkt_msg_t msg)
158 {
159     /*
160      * A critical section protects us from reply IPIs on this cpu.
161      */
162     crit_enter();
163
164     /*
165      * Shortcut the operation if the message has already been returned.
166      * The callback typically constructs a lwkt_msg with the abort request,
167      * issues it synchronously, and waits for completion.  The callback
168      * is not required to actually abort the message and the target port,
169      * upon receiving an abort request message generated by the callback
170      * should check whether the original message has already completed or
171      * not.
172      */
173     if (msg->ms_flags & MSGF_ABORTABLE) {
174         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
175             msg->ms_abortfn(msg);
176     }
177     crit_exit();
178 }
179
180 /************************************************************************
181  *                      PORT INITIALIZATION API                         *
182  ************************************************************************/
183
184 static void *lwkt_thread_getport(lwkt_port_t port);
185 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
186 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
187 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
188 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
189 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
190
191 static void *lwkt_spin_getport(lwkt_port_t port);
192 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
193 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
194 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
195 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
196 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
197
198 static void *lwkt_serialize_getport(lwkt_port_t port);
199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
203
204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_panic_getport(lwkt_port_t port);
206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
210 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
211
212 /*
213  * Core port initialization (internal)
214  */
215 static __inline
216 void
217 _lwkt_initport(lwkt_port_t port,
218                void *(*gportfn)(lwkt_port_t),
219                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
220                int (*wmsgfn)(lwkt_msg_t, int),
221                void *(*wportfn)(lwkt_port_t, int),
222                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
223                void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
224 {
225     bzero(port, sizeof(*port));
226     TAILQ_INIT(&port->mp_msgq);
227     TAILQ_INIT(&port->mp_msgq_prio);
228     port->mp_getport = gportfn;
229     port->mp_putport = pportfn;
230     port->mp_waitmsg =  wmsgfn;
231     port->mp_waitport =  wportfn;
232     port->mp_replyport = rportfn;
233     port->mp_dropmsg = dmsgfn;
234 }
235
236 /*
237  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
238  * we tell the scheduler not to reschedule if td is at a higher priority.
239  *
240  * This routine is called even if the thread is already scheduled.
241  */
242 static __inline
243 void
244 _lwkt_schedule_msg(thread_t td, int flags)
245 {
246     lwkt_schedule(td);
247 }
248
249 /*
250  * lwkt_initport_thread()
251  *
252  *      Initialize a port for use by a particular thread.  The port may
253  *      only be used by <td>.
254  */
255 void
256 lwkt_initport_thread(lwkt_port_t port, thread_t td)
257 {
258     _lwkt_initport(port,
259                    lwkt_thread_getport,
260                    lwkt_thread_putport,
261                    lwkt_thread_waitmsg,
262                    lwkt_thread_waitport,
263                    lwkt_thread_replyport,
264                    lwkt_thread_dropmsg);
265     port->mpu_td = td;
266 }
267
268 /*
269  * lwkt_initport_spin()
270  *
271  *      Initialize a port for use with descriptors that might be accessed
272  *      via multiple LWPs, processes, or threads.  Has somewhat more
273  *      overhead then thread ports.
274  */
275 void
276 lwkt_initport_spin(lwkt_port_t port)
277 {
278     _lwkt_initport(port,
279                    lwkt_spin_getport,
280                    lwkt_spin_putport,
281                    lwkt_spin_waitmsg,
282                    lwkt_spin_waitport,
283                    lwkt_spin_replyport,
284                    lwkt_spin_dropmsg);
285     spin_init(&port->mpu_spin);
286 }
287
288 /*
289  * lwkt_initport_serialize()
290  *
291  *      Initialize a port for use with descriptors that might be accessed
292  *      via multiple LWPs, processes, or threads.  Callers are assumed to
293  *      have held the serializer (slz).
294  */
295 void
296 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
297 {
298     _lwkt_initport(port,
299                    lwkt_serialize_getport,
300                    lwkt_serialize_putport,
301                    lwkt_serialize_waitmsg,
302                    lwkt_serialize_waitport,
303                    lwkt_serialize_replyport,
304                    lwkt_panic_dropmsg);
305     port->mpu_serialize = slz;
306 }
307
308 /*
309  * Similar to the standard initport, this function simply marks the message
310  * as being done and does not attempt to return it to an originating port.
311  */
312 void
313 lwkt_initport_replyonly_null(lwkt_port_t port)
314 {
315     _lwkt_initport(port,
316                    lwkt_panic_getport,
317                    lwkt_panic_putport,
318                    lwkt_panic_waitmsg,
319                    lwkt_panic_waitport,
320                    lwkt_null_replyport,
321                    lwkt_panic_dropmsg);
322 }
323
324 /*
325  * Initialize a reply-only port, typically used as a message sink.  Such
326  * ports can only be used as a reply port.
327  */
328 void
329 lwkt_initport_replyonly(lwkt_port_t port,
330                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
331 {
332     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
333                          lwkt_panic_waitmsg, lwkt_panic_waitport,
334                          rportfn, lwkt_panic_dropmsg);
335 }
336
337 void
338 lwkt_initport_putonly(lwkt_port_t port,
339                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
340 {
341     _lwkt_initport(port, lwkt_panic_getport, pportfn,
342                          lwkt_panic_waitmsg, lwkt_panic_waitport,
343                          lwkt_panic_replyport, lwkt_panic_dropmsg);
344 }
345
346 void
347 lwkt_initport_panic(lwkt_port_t port)
348 {
349     _lwkt_initport(port,
350                    lwkt_panic_getport, lwkt_panic_putport,
351                    lwkt_panic_waitmsg, lwkt_panic_waitport,
352                    lwkt_panic_replyport, lwkt_panic_dropmsg);
353 }
354
355 static __inline
356 void
357 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
358 {
359     lwkt_msg_queue *queue;
360
361     /*
362      * normal case, remove and return the message.
363      */
364     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
365         queue = &port->mp_msgq_prio;
366     else
367         queue = &port->mp_msgq;
368     TAILQ_REMOVE(queue, msg, ms_node);
369
370     /*
371      * atomic op needed for spin ports
372      */
373     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
374 }
375
376 static __inline
377 void
378 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
379 {
380     lwkt_msg_queue *queue;
381
382     /*
383      * atomic op needed for spin ports
384      */
385     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
386     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
387         queue = &port->mp_msgq_prio;
388     else
389         queue = &port->mp_msgq;
390     TAILQ_INSERT_TAIL(queue, msg, ms_node);
391 }
392
393 static __inline
394 lwkt_msg_t
395 _lwkt_pollmsg(lwkt_port_t port)
396 {
397     lwkt_msg_t msg;
398
399     msg = TAILQ_FIRST(&port->mp_msgq_prio);
400     if (__predict_false(msg != NULL))
401         return msg;
402
403     /*
404      * Priority queue has no message, fallback to non-priority queue.
405      */
406     return TAILQ_FIRST(&port->mp_msgq);
407 }
408
409 static __inline
410 void
411 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
412 {
413     /*
414      * atomic op needed for spin ports
415      */
416     _lwkt_pushmsg(port, msg);
417     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
418 }
419
420 /************************************************************************
421  *                      THREAD PORT BACKEND                             *
422  ************************************************************************
423  *
424  * This backend is used when the port a message is retrieved from is owned
425  * by a single thread (the calling thread).  Messages are IPId to the
426  * correct cpu before being enqueued to a port.  Note that this is fairly
427  * optimal since scheduling would have had to do an IPI anyway if the
428  * message were headed to a different cpu.
429  */
430
431 #ifdef SMP
432
433 /*
434  * This function completes reply processing for the default case in the
435  * context of the originating cpu.
436  */
437 static
438 void
439 lwkt_thread_replyport_remote(lwkt_msg_t msg)
440 {
441     lwkt_port_t port = msg->ms_reply_port;
442     int flags;
443
444     /*
445      * Chase any thread migration that occurs
446      */
447     if (port->mpu_td->td_gd != mycpu) {
448         lwkt_send_ipiq(port->mpu_td->td_gd,
449                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
450         return;
451     }
452
453     /*
454      * Cleanup
455      */
456 #ifdef INVARIANTS
457     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
458     msg->ms_flags &= ~MSGF_INTRANSIT;
459 #endif
460     flags = msg->ms_flags;
461     if (msg->ms_flags & MSGF_SYNC) {
462         cpu_sfence();
463         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
464     } else {
465         _lwkt_enqueue_reply(port, msg);
466     }
467     if (port->mp_flags & MSGPORTF_WAITING)
468         _lwkt_schedule_msg(port->mpu_td, flags);
469 }
470
471 #endif
472
473 /*
474  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
475  *
476  * Called with the reply port as an argument but in the context of the
477  * original target port.  Completion must occur on the target port's
478  * cpu.
479  *
480  * The critical section protects us from IPIs on the this CPU.
481  */
482 static
483 void
484 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
485 {
486     int flags;
487
488     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
489
490     if (msg->ms_flags & MSGF_SYNC) {
491         /*
492          * If a synchronous completion has been requested, just wakeup
493          * the message without bothering to queue it to the target port.
494          *
495          * Assume the target thread is non-preemptive, so no critical
496          * section is required.
497          */
498 #ifdef SMP
499         if (port->mpu_td->td_gd == mycpu) {
500 #endif
501             flags = msg->ms_flags;
502             cpu_sfence();
503             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
504             if (port->mp_flags & MSGPORTF_WAITING)
505                 _lwkt_schedule_msg(port->mpu_td, flags);
506 #ifdef SMP
507         } else {
508 #ifdef INVARIANTS
509             msg->ms_flags |= MSGF_INTRANSIT;
510 #endif
511             msg->ms_flags |= MSGF_REPLY;
512             lwkt_send_ipiq(port->mpu_td->td_gd,
513                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
514         }
515 #endif
516     } else {
517         /*
518          * If an asynchronous completion has been requested the message
519          * must be queued to the reply port.
520          *
521          * A critical section is required to interlock the port queue.
522          */
523 #ifdef SMP
524         if (port->mpu_td->td_gd == mycpu) {
525 #endif
526             crit_enter();
527             _lwkt_enqueue_reply(port, msg);
528             if (port->mp_flags & MSGPORTF_WAITING)
529                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
530             crit_exit();
531 #ifdef SMP
532         } else {
533 #ifdef INVARIANTS
534             msg->ms_flags |= MSGF_INTRANSIT;
535 #endif
536             msg->ms_flags |= MSGF_REPLY;
537             lwkt_send_ipiq(port->mpu_td->td_gd,
538                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
539         }
540 #endif
541     }
542 }
543
544 /*
545  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
546  *
547  * This function could _only_ be used when caller is in the same thread
548  * as the message's target port owner thread.
549  */
550 static void
551 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
552 {
553     KASSERT(port->mpu_td == curthread,
554             ("message could only be dropped in the same thread "
555              "as the message target port thread"));
556     crit_enter_quick(port->mpu_td);
557     _lwkt_pullmsg(port, msg);
558     msg->ms_flags |= MSGF_DONE;
559     crit_exit_quick(port->mpu_td);
560 }
561
562 /*
563  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
564  *
565  * Called with the target port as an argument but in the context of the
566  * reply port.  This function always implements an asynchronous put to
567  * the target message port, and thus returns EASYNC.
568  *
569  * The message must already have cleared MSGF_DONE and MSGF_REPLY
570  */
571
572 #ifdef SMP
573
574 static
575 void
576 lwkt_thread_putport_remote(lwkt_msg_t msg)
577 {
578     lwkt_port_t port = msg->ms_target_port;
579
580     /*
581      * Chase any thread migration that occurs
582      */
583     if (port->mpu_td->td_gd != mycpu) {
584         lwkt_send_ipiq(port->mpu_td->td_gd,
585                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
586         return;
587     }
588
589     /*
590      * Cleanup
591      */
592 #ifdef INVARIANTS
593     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
594     msg->ms_flags &= ~MSGF_INTRANSIT;
595 #endif
596     _lwkt_pushmsg(port, msg);
597     if (port->mp_flags & MSGPORTF_WAITING)
598         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
599 }
600
601 #endif
602
603 static
604 int
605 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
606 {
607     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
608
609     msg->ms_target_port = port;
610 #ifdef SMP
611     if (port->mpu_td->td_gd == mycpu) {
612 #endif
613         crit_enter();
614         _lwkt_pushmsg(port, msg);
615         if (port->mp_flags & MSGPORTF_WAITING)
616             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
617         crit_exit();
618 #ifdef SMP
619     } else {
620 #ifdef INVARIANTS
621         msg->ms_flags |= MSGF_INTRANSIT;
622 #endif
623         lwkt_send_ipiq(port->mpu_td->td_gd,
624                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
625     }
626 #endif
627     return (EASYNC);
628 }
629
630 /*
631  * lwkt_thread_getport()
632  *
633  *      Retrieve the next message from the port or NULL if no messages
634  *      are ready.
635  */
636 static
637 void *
638 lwkt_thread_getport(lwkt_port_t port)
639 {
640     lwkt_msg_t msg;
641
642     KKASSERT(port->mpu_td == curthread);
643
644     crit_enter_quick(port->mpu_td);
645     if ((msg = _lwkt_pollmsg(port)) != NULL)
646         _lwkt_pullmsg(port, msg);
647     crit_exit_quick(port->mpu_td);
648     return(msg);
649 }
650
651 /*
652  * lwkt_thread_waitmsg()
653  *
654  *      Wait for a particular message to be replied.  We must be the only
655  *      thread waiting on the message.  The port must be owned by the
656  *      caller.
657  */
658 static
659 int
660 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
661 {
662     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
663             ("can't wait dropable message"));
664
665     if ((msg->ms_flags & MSGF_DONE) == 0) {
666         /*
667          * If the done bit was not set we have to block until it is.
668          */
669         lwkt_port_t port = msg->ms_reply_port;
670         thread_t td = curthread;
671         int sentabort;
672
673         KKASSERT(port->mpu_td == td);
674         crit_enter_quick(td);
675         sentabort = 0;
676
677         while ((msg->ms_flags & MSGF_DONE) == 0) {
678             port->mp_flags |= MSGPORTF_WAITING;
679             if (sentabort == 0) {
680                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
681                     lwkt_abortmsg(msg);
682                 }
683             } else {
684                 lwkt_sleep("waitabt", 0);
685             }
686             port->mp_flags &= ~MSGPORTF_WAITING;
687         }
688         if (msg->ms_flags & MSGF_QUEUED)
689             _lwkt_pullmsg(port, msg);
690         crit_exit_quick(td);
691     } else {
692         /*
693          * If the done bit was set we only have to mess around with the
694          * message if it is queued on the reply port.
695          */
696         if (msg->ms_flags & MSGF_QUEUED) {
697             lwkt_port_t port = msg->ms_reply_port;
698             thread_t td = curthread;
699
700             KKASSERT(port->mpu_td == td);
701             crit_enter_quick(td);
702             _lwkt_pullmsg(port, msg);
703             crit_exit_quick(td);
704         }
705     }
706     return(msg->ms_error);
707 }
708
709 static
710 void *
711 lwkt_thread_waitport(lwkt_port_t port, int flags)
712 {
713     thread_t td = curthread;
714     lwkt_msg_t msg;
715     int error;
716
717     KKASSERT(port->mpu_td == td);
718     crit_enter_quick(td);
719     while ((msg = _lwkt_pollmsg(port)) == NULL) {
720         port->mp_flags |= MSGPORTF_WAITING;
721         error = lwkt_sleep("waitport", flags);
722         port->mp_flags &= ~MSGPORTF_WAITING;
723         if (error)
724             goto done;
725     }
726     _lwkt_pullmsg(port, msg);
727 done:
728     crit_exit_quick(td);
729     return(msg);
730 }
731
732 /************************************************************************
733  *                         SPIN PORT BACKEND                            *
734  ************************************************************************
735  *
736  * This backend uses spinlocks instead of making assumptions about which
737  * thread is accessing the port.  It must be used when a port is not owned
738  * by a particular thread.  This is less optimal then thread ports but
739  * you don't have a choice if there are multiple threads accessing the port.
740  *
741  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
742  * on the message port, it is the responsibility of the code doing the
743  * wakeup to clear this flag rather then the blocked threads.  Some
744  * superfluous wakeups may occur, which is ok.
745  *
746  * XXX synchronous message wakeups are not current optimized.
747  */
748
749 static
750 void *
751 lwkt_spin_getport(lwkt_port_t port)
752 {
753     lwkt_msg_t msg;
754
755     spin_lock(&port->mpu_spin);
756     if ((msg = _lwkt_pollmsg(port)) != NULL)
757         _lwkt_pullmsg(port, msg);
758     spin_unlock(&port->mpu_spin);
759     return(msg);
760 }
761
762 static
763 int
764 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
765 {
766     int dowakeup;
767
768     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
769
770     msg->ms_target_port = port;
771     spin_lock(&port->mpu_spin);
772     _lwkt_pushmsg(port, msg);
773     dowakeup = 0;
774     if (port->mp_flags & MSGPORTF_WAITING) {
775         port->mp_flags &= ~MSGPORTF_WAITING;
776         dowakeup = 1;
777     }
778     spin_unlock(&port->mpu_spin);
779     if (dowakeup)
780         wakeup(port);
781     return (EASYNC);
782 }
783
784 static
785 int
786 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
787 {
788     lwkt_port_t port;
789     int sentabort;
790     int error;
791
792     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
793             ("can't wait dropable message"));
794
795     if ((msg->ms_flags & MSGF_DONE) == 0) {
796         port = msg->ms_reply_port;
797         sentabort = 0;
798         spin_lock(&port->mpu_spin);
799         while ((msg->ms_flags & MSGF_DONE) == 0) {
800             void *won;
801
802             /*
803              * If message was sent synchronously from the beginning
804              * the wakeup will be on the message structure, else it
805              * will be on the port structure.
806              */
807             if (msg->ms_flags & MSGF_SYNC) {
808                 won = msg;
809                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
810             } else {
811                 won = port;
812                 port->mp_flags |= MSGPORTF_WAITING;
813             }
814
815             /*
816              * Only messages which support abort can be interrupted.
817              * We must still wait for message completion regardless.
818              */
819             if ((flags & PCATCH) && sentabort == 0) {
820                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
821                 if (error) {
822                     sentabort = error;
823                     spin_unlock(&port->mpu_spin);
824                     lwkt_abortmsg(msg);
825                     spin_lock(&port->mpu_spin);
826                 }
827             } else {
828                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
829             }
830             /* see note at the top on the MSGPORTF_WAITING flag */
831         }
832         /*
833          * Turn EINTR into ERESTART if the signal indicates.
834          */
835         if (sentabort && msg->ms_error == EINTR)
836             msg->ms_error = sentabort;
837         if (msg->ms_flags & MSGF_QUEUED)
838             _lwkt_pullmsg(port, msg);
839         spin_unlock(&port->mpu_spin);
840     } else {
841         if (msg->ms_flags & MSGF_QUEUED) {
842             port = msg->ms_reply_port;
843             spin_lock(&port->mpu_spin);
844             _lwkt_pullmsg(port, msg);
845             spin_unlock(&port->mpu_spin);
846         }
847     }
848     return(msg->ms_error);
849 }
850
851 static
852 void *
853 lwkt_spin_waitport(lwkt_port_t port, int flags)
854 {
855     lwkt_msg_t msg;
856     int error;
857
858     spin_lock(&port->mpu_spin);
859     while ((msg = _lwkt_pollmsg(port)) == NULL) {
860         port->mp_flags |= MSGPORTF_WAITING;
861         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
862         /* see note at the top on the MSGPORTF_WAITING flag */
863         if (error) {
864             spin_unlock(&port->mpu_spin);
865             return(NULL);
866         }
867     }
868     _lwkt_pullmsg(port, msg);
869     spin_unlock(&port->mpu_spin);
870     return(msg);
871 }
872
873 static
874 void
875 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
876 {
877     int dowakeup;
878
879     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
880
881     if (msg->ms_flags & MSGF_SYNC) {
882         /*
883          * If a synchronous completion has been requested, just wakeup
884          * the message without bothering to queue it to the target port.
885          */
886         spin_lock(&port->mpu_spin);
887         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
888         dowakeup = 0;
889         if (msg->ms_flags & MSGF_WAITING) {
890                 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
891                 dowakeup = 1;
892         }
893         spin_unlock(&port->mpu_spin);
894         if (dowakeup)
895                 wakeup(msg);
896     } else {
897         /*
898          * If an asynchronous completion has been requested the message
899          * must be queued to the reply port.
900          */
901         spin_lock(&port->mpu_spin);
902         _lwkt_enqueue_reply(port, msg);
903         dowakeup = 0;
904         if (port->mp_flags & MSGPORTF_WAITING) {
905             port->mp_flags &= ~MSGPORTF_WAITING;
906             dowakeup = 1;
907         }
908         spin_unlock(&port->mpu_spin);
909         if (dowakeup)
910             wakeup(port);
911     }
912 }
913
914 /*
915  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
916  *
917  * This function could _only_ be used when caller is in the same thread
918  * as the message's target port owner thread.
919  */
920 static void
921 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
922 {
923     KASSERT(port->mpu_td == curthread,
924             ("message could only be dropped in the same thread "
925              "as the message target port thread\n"));
926     spin_lock(&port->mpu_spin);
927     _lwkt_pullmsg(port, msg);
928     msg->ms_flags |= MSGF_DONE;
929     spin_unlock(&port->mpu_spin);
930 }
931
932 /************************************************************************
933  *                        SERIALIZER PORT BACKEND                       *
934  ************************************************************************
935  *
936  * This backend uses serializer to protect port accessing.  Callers are
937  * assumed to have serializer held.  This kind of port is usually created
938  * by network device driver along with _one_ lwkt thread to pipeline
939  * operations which may temporarily release serializer.
940  *
941  * Implementation is based on SPIN PORT BACKEND.
942  */
943
944 static
945 void *
946 lwkt_serialize_getport(lwkt_port_t port)
947 {
948     lwkt_msg_t msg;
949
950     ASSERT_SERIALIZED(port->mpu_serialize);
951
952     if ((msg = _lwkt_pollmsg(port)) != NULL)
953         _lwkt_pullmsg(port, msg);
954     return(msg);
955 }
956
957 static
958 int
959 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
960 {
961     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
962     ASSERT_SERIALIZED(port->mpu_serialize);
963
964     msg->ms_target_port = port;
965     _lwkt_pushmsg(port, msg);
966     if (port->mp_flags & MSGPORTF_WAITING) {
967         port->mp_flags &= ~MSGPORTF_WAITING;
968         wakeup(port);
969     }
970     return (EASYNC);
971 }
972
973 static
974 int
975 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
976 {
977     lwkt_port_t port;
978     int sentabort;
979     int error;
980
981     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
982             ("can't wait dropable message"));
983
984     if ((msg->ms_flags & MSGF_DONE) == 0) {
985         port = msg->ms_reply_port;
986
987         ASSERT_SERIALIZED(port->mpu_serialize);
988
989         sentabort = 0;
990         while ((msg->ms_flags & MSGF_DONE) == 0) {
991             void *won;
992
993             /*
994              * If message was sent synchronously from the beginning
995              * the wakeup will be on the message structure, else it
996              * will be on the port structure.
997              */
998             if (msg->ms_flags & MSGF_SYNC) {
999                 won = msg;
1000             } else {
1001                 won = port;
1002                 port->mp_flags |= MSGPORTF_WAITING;
1003             }
1004
1005             /*
1006              * Only messages which support abort can be interrupted.
1007              * We must still wait for message completion regardless.
1008              */
1009             if ((flags & PCATCH) && sentabort == 0) {
1010                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1011                 if (error) {
1012                     sentabort = error;
1013                     lwkt_serialize_exit(port->mpu_serialize);
1014                     lwkt_abortmsg(msg);
1015                     lwkt_serialize_enter(port->mpu_serialize);
1016                 }
1017             } else {
1018                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1019             }
1020             /* see note at the top on the MSGPORTF_WAITING flag */
1021         }
1022         /*
1023          * Turn EINTR into ERESTART if the signal indicates.
1024          */
1025         if (sentabort && msg->ms_error == EINTR)
1026             msg->ms_error = sentabort;
1027         if (msg->ms_flags & MSGF_QUEUED)
1028             _lwkt_pullmsg(port, msg);
1029     } else {
1030         if (msg->ms_flags & MSGF_QUEUED) {
1031             port = msg->ms_reply_port;
1032
1033             ASSERT_SERIALIZED(port->mpu_serialize);
1034             _lwkt_pullmsg(port, msg);
1035         }
1036     }
1037     return(msg->ms_error);
1038 }
1039
1040 static
1041 void *
1042 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1043 {
1044     lwkt_msg_t msg;
1045     int error;
1046
1047     ASSERT_SERIALIZED(port->mpu_serialize);
1048
1049     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1050         port->mp_flags |= MSGPORTF_WAITING;
1051         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1052         /* see note at the top on the MSGPORTF_WAITING flag */
1053         if (error)
1054             return(NULL);
1055     }
1056     _lwkt_pullmsg(port, msg);
1057     return(msg);
1058 }
1059
1060 static
1061 void
1062 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1063 {
1064     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1065     ASSERT_SERIALIZED(port->mpu_serialize);
1066
1067     if (msg->ms_flags & MSGF_SYNC) {
1068         /*
1069          * If a synchronous completion has been requested, just wakeup
1070          * the message without bothering to queue it to the target port.
1071          */
1072         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1073         wakeup(msg);
1074     } else {
1075         /*
1076          * If an asynchronous completion has been requested the message
1077          * must be queued to the reply port.
1078          */
1079         _lwkt_enqueue_reply(port, msg);
1080         if (port->mp_flags & MSGPORTF_WAITING) {
1081             port->mp_flags &= ~MSGPORTF_WAITING;
1082             wakeup(port);
1083         }
1084     }
1085 }
1086
1087 /************************************************************************
1088  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1089  ************************************************************************/
1090
1091 /*
1092  * You can point a port's reply vector at this function if you just want
1093  * the message marked done, without any queueing or signaling.  This is
1094  * often used for structure-embedded messages.
1095  */
1096 static
1097 void
1098 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1099 {
1100     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1101 }
1102
1103 static
1104 void *
1105 lwkt_panic_getport(lwkt_port_t port)
1106 {
1107     panic("lwkt_getport() illegal on port %p", port);
1108 }
1109
1110 static
1111 int
1112 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1113 {
1114     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1115 }
1116
1117 static
1118 int
1119 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1120 {
1121     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1122 }
1123
1124 static
1125 void *
1126 lwkt_panic_waitport(lwkt_port_t port, int flags)
1127 {
1128     panic("port %p cannot be waited on", port);
1129 }
1130
1131 static
1132 void
1133 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1134 {
1135     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1136 }
1137
1138 static
1139 void
1140 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1141 {
1142     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1143 }