36c17b7e65f1a3321ee5ba4572f7af75f6c480cd
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #ifdef SMP
69 #include <machine/smp.h>
70 #endif
71
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
74
75 /************************************************************************
76  *                              MESSAGE FUNCTIONS                       *
77  ************************************************************************/
78
79 /*
80  * lwkt_sendmsg()
81  *
82  *      Request asynchronous completion and call lwkt_beginmsg().  The
83  *      target port can opt to execute the message synchronously or
84  *      asynchronously and this function will automatically queue the
85  *      response if the target executes the message synchronously.
86  *
87  *      NOTE: The message is in an indeterminant state until this call
88  *      returns.  The caller should not mess with it (e.g. try to abort it)
89  *      until then.
90  */
91 void
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
93 {
94     int error;
95
96     KKASSERT(msg->ms_reply_port != NULL &&
97              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
100         /*
101          * Target port opted to execute the message synchronously so
102          * queue the response.
103          */
104         lwkt_replymsg(msg, error);
105     }
106 }
107
108 /*
109  * lwkt_domsg()
110  *
111  *      Request synchronous completion and call lwkt_beginmsg().  The
112  *      target port can opt to execute the message synchronously or
113  *      asynchronously and this function will automatically block and
114  *      wait for a response if the target executes the message
115  *      asynchronously.
116  */
117 int
118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
119 {
120     int error;
121
122     KKASSERT(msg->ms_reply_port != NULL &&
123              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
124     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
125     msg->ms_flags |= MSGF_SYNC;
126     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
127         /*
128          * Target port opted to execute the message asynchronously so
129          * block and wait for a reply.
130          */
131         error = lwkt_waitmsg(msg, flags);
132     } else {
133         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
134     }
135     return(error);
136 }
137
138 /*
139  * lwkt_forwardmsg()
140  *
141  * Forward a message received on one port to another port.
142  */
143 int
144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
145 {   
146     int error;
147
148     crit_enter();
149     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
150     if ((error = port->mp_putport(port, msg)) != EASYNC)
151         lwkt_replymsg(msg, error);
152     crit_exit();
153     return(error);
154 }
155
156 /*
157  * lwkt_abortmsg()
158  *
159  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
160  * The caller must ensure that the message will not be both replied AND
161  * destroyed while the abort is in progress.
162  *
163  * This function issues a callback which might block!
164  */
165 void
166 lwkt_abortmsg(lwkt_msg_t msg)
167 {
168     /*
169      * A critical section protects us from reply IPIs on this cpu.
170      */
171     crit_enter();
172
173     /*
174      * Shortcut the operation if the message has already been returned.
175      * The callback typically constructs a lwkt_msg with the abort request,
176      * issues it synchronously, and waits for completion.  The callback
177      * is not required to actually abort the message and the target port,
178      * upon receiving an abort request message generated by the callback
179      * should check whether the original message has already completed or
180      * not.
181      */
182     if (msg->ms_flags & MSGF_ABORTABLE) {
183         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
184             msg->ms_abortfn(msg);
185     }
186     crit_exit();
187 }
188
189 /************************************************************************
190  *                      PORT INITIALIZATION API                         *
191  ************************************************************************/
192
193 static void *lwkt_thread_getport(lwkt_port_t port);
194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
199
200 static void *lwkt_spin_getport(lwkt_port_t port);
201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
206
207 static void *lwkt_serialize_getport(lwkt_port_t port);
208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
212
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
220
221 /*
222  * Core port initialization (internal)
223  */
224 static __inline
225 void
226 _lwkt_initport(lwkt_port_t port,
227                void *(*gportfn)(lwkt_port_t),
228                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
229                int (*wmsgfn)(lwkt_msg_t, int),
230                void *(*wportfn)(lwkt_port_t, int),
231                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
232                void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     TAILQ_INIT(&port->mp_msgq_prio);
237     port->mp_getport = gportfn;
238     port->mp_putport = pportfn;
239     port->mp_waitmsg =  wmsgfn;
240     port->mp_waitport =  wportfn;
241     port->mp_replyport = rportfn;
242     port->mp_dropmsg = dmsgfn;
243 }
244
245 /*
246  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
247  * we tell the scheduler not to reschedule if td is at a higher priority.
248  *
249  * This routine is called even if the thread is already scheduled.
250  */
251 static __inline
252 void
253 _lwkt_schedule_msg(thread_t td, int flags)
254 {
255     lwkt_schedule(td);
256 }
257
258 /*
259  * lwkt_initport_thread()
260  *
261  *      Initialize a port for use by a particular thread.  The port may
262  *      only be used by <td>.
263  */
264 void
265 lwkt_initport_thread(lwkt_port_t port, thread_t td)
266 {
267     _lwkt_initport(port,
268                    lwkt_thread_getport,
269                    lwkt_thread_putport,
270                    lwkt_thread_waitmsg,
271                    lwkt_thread_waitport,
272                    lwkt_thread_replyport,
273                    lwkt_thread_dropmsg);
274     port->mpu_td = td;
275 }
276
277 /*
278  * lwkt_initport_spin()
279  *
280  *      Initialize a port for use with descriptors that might be accessed
281  *      via multiple LWPs, processes, or threads.  Has somewhat more
282  *      overhead then thread ports.
283  */
284 void
285 lwkt_initport_spin(lwkt_port_t port, thread_t td)
286 {
287     void (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
288
289     if (td == NULL)
290         dmsgfn = lwkt_panic_dropmsg;
291     else
292         dmsgfn = lwkt_spin_dropmsg;
293
294     _lwkt_initport(port,
295                    lwkt_spin_getport,
296                    lwkt_spin_putport,
297                    lwkt_spin_waitmsg,
298                    lwkt_spin_waitport,
299                    lwkt_spin_replyport,
300                    dmsgfn);
301     spin_init(&port->mpu_spin);
302     port->mpu_td = td;
303 }
304
305 /*
306  * lwkt_initport_serialize()
307  *
308  *      Initialize a port for use with descriptors that might be accessed
309  *      via multiple LWPs, processes, or threads.  Callers are assumed to
310  *      have held the serializer (slz).
311  */
312 void
313 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
314 {
315     _lwkt_initport(port,
316                    lwkt_serialize_getport,
317                    lwkt_serialize_putport,
318                    lwkt_serialize_waitmsg,
319                    lwkt_serialize_waitport,
320                    lwkt_serialize_replyport,
321                    lwkt_panic_dropmsg);
322     port->mpu_serialize = slz;
323 }
324
325 /*
326  * Similar to the standard initport, this function simply marks the message
327  * as being done and does not attempt to return it to an originating port.
328  */
329 void
330 lwkt_initport_replyonly_null(lwkt_port_t port)
331 {
332     _lwkt_initport(port,
333                    lwkt_panic_getport,
334                    lwkt_panic_putport,
335                    lwkt_panic_waitmsg,
336                    lwkt_panic_waitport,
337                    lwkt_null_replyport,
338                    lwkt_panic_dropmsg);
339 }
340
341 /*
342  * Initialize a reply-only port, typically used as a message sink.  Such
343  * ports can only be used as a reply port.
344  */
345 void
346 lwkt_initport_replyonly(lwkt_port_t port,
347                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
348 {
349     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
350                          lwkt_panic_waitmsg, lwkt_panic_waitport,
351                          rportfn, lwkt_panic_dropmsg);
352 }
353
354 void
355 lwkt_initport_putonly(lwkt_port_t port,
356                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
357 {
358     _lwkt_initport(port, lwkt_panic_getport, pportfn,
359                          lwkt_panic_waitmsg, lwkt_panic_waitport,
360                          lwkt_panic_replyport, lwkt_panic_dropmsg);
361 }
362
363 void
364 lwkt_initport_panic(lwkt_port_t port)
365 {
366     _lwkt_initport(port,
367                    lwkt_panic_getport, lwkt_panic_putport,
368                    lwkt_panic_waitmsg, lwkt_panic_waitport,
369                    lwkt_panic_replyport, lwkt_panic_dropmsg);
370 }
371
372 static __inline
373 void
374 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
375 {
376     lwkt_msg_queue *queue;
377
378     /*
379      * normal case, remove and return the message.
380      */
381     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
382         queue = &port->mp_msgq_prio;
383     else
384         queue = &port->mp_msgq;
385     TAILQ_REMOVE(queue, msg, ms_node);
386
387     /*
388      * atomic op needed for spin ports
389      */
390     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
391 }
392
393 static __inline
394 void
395 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
396 {
397     lwkt_msg_queue *queue;
398
399     /*
400      * atomic op needed for spin ports
401      */
402     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
403     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
404         queue = &port->mp_msgq_prio;
405     else
406         queue = &port->mp_msgq;
407     TAILQ_INSERT_TAIL(queue, msg, ms_node);
408 }
409
410 static __inline
411 lwkt_msg_t
412 _lwkt_pollmsg(lwkt_port_t port)
413 {
414     lwkt_msg_t msg;
415
416     msg = TAILQ_FIRST(&port->mp_msgq_prio);
417     if (__predict_false(msg != NULL))
418         return msg;
419
420     /*
421      * Priority queue has no message, fallback to non-priority queue.
422      */
423     return TAILQ_FIRST(&port->mp_msgq);
424 }
425
426 static __inline
427 void
428 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
429 {
430     /*
431      * atomic op needed for spin ports
432      */
433     _lwkt_pushmsg(port, msg);
434     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
435 }
436
437 /************************************************************************
438  *                      THREAD PORT BACKEND                             *
439  ************************************************************************
440  *
441  * This backend is used when the port a message is retrieved from is owned
442  * by a single thread (the calling thread).  Messages are IPId to the
443  * correct cpu before being enqueued to a port.  Note that this is fairly
444  * optimal since scheduling would have had to do an IPI anyway if the
445  * message were headed to a different cpu.
446  */
447
448 #ifdef SMP
449
450 /*
451  * This function completes reply processing for the default case in the
452  * context of the originating cpu.
453  */
454 static
455 void
456 lwkt_thread_replyport_remote(lwkt_msg_t msg)
457 {
458     lwkt_port_t port = msg->ms_reply_port;
459     int flags;
460
461     /*
462      * Chase any thread migration that occurs
463      */
464     if (port->mpu_td->td_gd != mycpu) {
465         lwkt_send_ipiq(port->mpu_td->td_gd,
466                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
467         return;
468     }
469
470     /*
471      * Cleanup
472      */
473 #ifdef INVARIANTS
474     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
475     msg->ms_flags &= ~MSGF_INTRANSIT;
476 #endif
477     flags = msg->ms_flags;
478     if (msg->ms_flags & MSGF_SYNC) {
479         cpu_sfence();
480         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
481     } else {
482         _lwkt_enqueue_reply(port, msg);
483     }
484     if (port->mp_flags & MSGPORTF_WAITING)
485         _lwkt_schedule_msg(port->mpu_td, flags);
486 }
487
488 #endif
489
490 /*
491  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
492  *
493  * Called with the reply port as an argument but in the context of the
494  * original target port.  Completion must occur on the target port's
495  * cpu.
496  *
497  * The critical section protects us from IPIs on the this CPU.
498  */
499 static
500 void
501 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
502 {
503     int flags;
504
505     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
506
507     if (msg->ms_flags & MSGF_SYNC) {
508         /*
509          * If a synchronous completion has been requested, just wakeup
510          * the message without bothering to queue it to the target port.
511          *
512          * Assume the target thread is non-preemptive, so no critical
513          * section is required.
514          */
515 #ifdef SMP
516         if (port->mpu_td->td_gd == mycpu) {
517 #endif
518             flags = msg->ms_flags;
519             cpu_sfence();
520             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
521             if (port->mp_flags & MSGPORTF_WAITING)
522                 _lwkt_schedule_msg(port->mpu_td, flags);
523 #ifdef SMP
524         } else {
525 #ifdef INVARIANTS
526             msg->ms_flags |= MSGF_INTRANSIT;
527 #endif
528             msg->ms_flags |= MSGF_REPLY;
529             lwkt_send_ipiq(port->mpu_td->td_gd,
530                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
531         }
532 #endif
533     } else {
534         /*
535          * If an asynchronous completion has been requested the message
536          * must be queued to the reply port.
537          *
538          * A critical section is required to interlock the port queue.
539          */
540 #ifdef SMP
541         if (port->mpu_td->td_gd == mycpu) {
542 #endif
543             crit_enter();
544             _lwkt_enqueue_reply(port, msg);
545             if (port->mp_flags & MSGPORTF_WAITING)
546                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
547             crit_exit();
548 #ifdef SMP
549         } else {
550 #ifdef INVARIANTS
551             msg->ms_flags |= MSGF_INTRANSIT;
552 #endif
553             msg->ms_flags |= MSGF_REPLY;
554             lwkt_send_ipiq(port->mpu_td->td_gd,
555                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
556         }
557 #endif
558     }
559 }
560
561 /*
562  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
563  *
564  * This function could _only_ be used when caller is in the same thread
565  * as the message's target port owner thread.
566  */
567 static void
568 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
569 {
570     KASSERT(port->mpu_td == curthread,
571             ("message could only be dropped in the same thread "
572              "as the message target port thread"));
573     crit_enter_quick(port->mpu_td);
574     _lwkt_pullmsg(port, msg);
575     msg->ms_flags |= MSGF_DONE;
576     crit_exit_quick(port->mpu_td);
577 }
578
579 /*
580  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
581  *
582  * Called with the target port as an argument but in the context of the
583  * reply port.  This function always implements an asynchronous put to
584  * the target message port, and thus returns EASYNC.
585  *
586  * The message must already have cleared MSGF_DONE and MSGF_REPLY
587  */
588
589 #ifdef SMP
590
591 static
592 void
593 lwkt_thread_putport_remote(lwkt_msg_t msg)
594 {
595     lwkt_port_t port = msg->ms_target_port;
596
597     /*
598      * Chase any thread migration that occurs
599      */
600     if (port->mpu_td->td_gd != mycpu) {
601         lwkt_send_ipiq(port->mpu_td->td_gd,
602                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
603         return;
604     }
605
606     /*
607      * Cleanup
608      */
609 #ifdef INVARIANTS
610     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
611     msg->ms_flags &= ~MSGF_INTRANSIT;
612 #endif
613     _lwkt_pushmsg(port, msg);
614     if (port->mp_flags & MSGPORTF_WAITING)
615         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
616 }
617
618 #endif
619
620 static
621 int
622 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
623 {
624     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
625
626     msg->ms_target_port = port;
627 #ifdef SMP
628     if (port->mpu_td->td_gd == mycpu) {
629 #endif
630         crit_enter();
631         _lwkt_pushmsg(port, msg);
632         if (port->mp_flags & MSGPORTF_WAITING)
633             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
634         crit_exit();
635 #ifdef SMP
636     } else {
637 #ifdef INVARIANTS
638         msg->ms_flags |= MSGF_INTRANSIT;
639 #endif
640         lwkt_send_ipiq(port->mpu_td->td_gd,
641                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
642     }
643 #endif
644     return (EASYNC);
645 }
646
647 /*
648  * lwkt_thread_getport()
649  *
650  *      Retrieve the next message from the port or NULL if no messages
651  *      are ready.
652  */
653 static
654 void *
655 lwkt_thread_getport(lwkt_port_t port)
656 {
657     lwkt_msg_t msg;
658
659     KKASSERT(port->mpu_td == curthread);
660
661     crit_enter_quick(port->mpu_td);
662     if ((msg = _lwkt_pollmsg(port)) != NULL)
663         _lwkt_pullmsg(port, msg);
664     crit_exit_quick(port->mpu_td);
665     return(msg);
666 }
667
668 /*
669  * lwkt_thread_waitmsg()
670  *
671  *      Wait for a particular message to be replied.  We must be the only
672  *      thread waiting on the message.  The port must be owned by the
673  *      caller.
674  */
675 static
676 int
677 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
678 {
679     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
680             ("can't wait dropable message"));
681
682     if ((msg->ms_flags & MSGF_DONE) == 0) {
683         /*
684          * If the done bit was not set we have to block until it is.
685          */
686         lwkt_port_t port = msg->ms_reply_port;
687         thread_t td = curthread;
688         int sentabort;
689
690         KKASSERT(port->mpu_td == td);
691         crit_enter_quick(td);
692         sentabort = 0;
693
694         while ((msg->ms_flags & MSGF_DONE) == 0) {
695             port->mp_flags |= MSGPORTF_WAITING;
696             if (sentabort == 0) {
697                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
698                     lwkt_abortmsg(msg);
699                 }
700             } else {
701                 lwkt_sleep("waitabt", 0);
702             }
703             port->mp_flags &= ~MSGPORTF_WAITING;
704         }
705         if (msg->ms_flags & MSGF_QUEUED)
706             _lwkt_pullmsg(port, msg);
707         crit_exit_quick(td);
708     } else {
709         /*
710          * If the done bit was set we only have to mess around with the
711          * message if it is queued on the reply port.
712          */
713         if (msg->ms_flags & MSGF_QUEUED) {
714             lwkt_port_t port = msg->ms_reply_port;
715             thread_t td = curthread;
716
717             KKASSERT(port->mpu_td == td);
718             crit_enter_quick(td);
719             _lwkt_pullmsg(port, msg);
720             crit_exit_quick(td);
721         }
722     }
723     return(msg->ms_error);
724 }
725
726 /*
727  * lwkt_thread_waitport()
728  *
729  *      Wait for a new message to be available on the port.  We must be the
730  *      the only thread waiting on the port.  The port must be owned by caller.
731  */
732 static
733 void *
734 lwkt_thread_waitport(lwkt_port_t port, int flags)
735 {
736     thread_t td = curthread;
737     lwkt_msg_t msg;
738     int error;
739
740     KKASSERT(port->mpu_td == td);
741     crit_enter_quick(td);
742     while ((msg = _lwkt_pollmsg(port)) == NULL) {
743         port->mp_flags |= MSGPORTF_WAITING;
744         error = lwkt_sleep("waitport", flags);
745         port->mp_flags &= ~MSGPORTF_WAITING;
746         if (error)
747             goto done;
748     }
749     _lwkt_pullmsg(port, msg);
750 done:
751     crit_exit_quick(td);
752     return(msg);
753 }
754
755 /************************************************************************
756  *                         SPIN PORT BACKEND                            *
757  ************************************************************************
758  *
759  * This backend uses spinlocks instead of making assumptions about which
760  * thread is accessing the port.  It must be used when a port is not owned
761  * by a particular thread.  This is less optimal then thread ports but
762  * you don't have a choice if there are multiple threads accessing the port.
763  *
764  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
765  * on the message port, it is the responsibility of the code doing the
766  * wakeup to clear this flag rather then the blocked threads.  Some
767  * superfluous wakeups may occur, which is ok.
768  *
769  * XXX synchronous message wakeups are not current optimized.
770  */
771
772 static
773 void *
774 lwkt_spin_getport(lwkt_port_t port)
775 {
776     lwkt_msg_t msg;
777
778     spin_lock(&port->mpu_spin);
779     if ((msg = _lwkt_pollmsg(port)) != NULL)
780         _lwkt_pullmsg(port, msg);
781     spin_unlock(&port->mpu_spin);
782     return(msg);
783 }
784
785 static
786 int
787 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
788 {
789     int dowakeup;
790
791     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
792
793     msg->ms_target_port = port;
794     spin_lock(&port->mpu_spin);
795     _lwkt_pushmsg(port, msg);
796     dowakeup = 0;
797     if (port->mp_flags & MSGPORTF_WAITING) {
798         port->mp_flags &= ~MSGPORTF_WAITING;
799         dowakeup = 1;
800     }
801     spin_unlock(&port->mpu_spin);
802     if (dowakeup)
803         wakeup(port);
804     return (EASYNC);
805 }
806
807 static
808 int
809 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
810 {
811     lwkt_port_t port;
812     int sentabort;
813     int error;
814
815     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
816             ("can't wait dropable message"));
817
818     if ((msg->ms_flags & MSGF_DONE) == 0) {
819         port = msg->ms_reply_port;
820         sentabort = 0;
821         spin_lock(&port->mpu_spin);
822         while ((msg->ms_flags & MSGF_DONE) == 0) {
823             void *won;
824
825             /*
826              * If message was sent synchronously from the beginning
827              * the wakeup will be on the message structure, else it
828              * will be on the port structure.
829              */
830             if (msg->ms_flags & MSGF_SYNC) {
831                 won = msg;
832                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
833             } else {
834                 won = port;
835                 port->mp_flags |= MSGPORTF_WAITING;
836             }
837
838             /*
839              * Only messages which support abort can be interrupted.
840              * We must still wait for message completion regardless.
841              */
842             if ((flags & PCATCH) && sentabort == 0) {
843                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
844                 if (error) {
845                     sentabort = error;
846                     spin_unlock(&port->mpu_spin);
847                     lwkt_abortmsg(msg);
848                     spin_lock(&port->mpu_spin);
849                 }
850             } else {
851                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
852             }
853             /* see note at the top on the MSGPORTF_WAITING flag */
854         }
855         /*
856          * Turn EINTR into ERESTART if the signal indicates.
857          */
858         if (sentabort && msg->ms_error == EINTR)
859             msg->ms_error = sentabort;
860         if (msg->ms_flags & MSGF_QUEUED)
861             _lwkt_pullmsg(port, msg);
862         spin_unlock(&port->mpu_spin);
863     } else {
864         if (msg->ms_flags & MSGF_QUEUED) {
865             port = msg->ms_reply_port;
866             spin_lock(&port->mpu_spin);
867             _lwkt_pullmsg(port, msg);
868             spin_unlock(&port->mpu_spin);
869         }
870     }
871     return(msg->ms_error);
872 }
873
874 static
875 void *
876 lwkt_spin_waitport(lwkt_port_t port, int flags)
877 {
878     lwkt_msg_t msg;
879     int error;
880
881     spin_lock(&port->mpu_spin);
882     while ((msg = _lwkt_pollmsg(port)) == NULL) {
883         port->mp_flags |= MSGPORTF_WAITING;
884         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
885         /* see note at the top on the MSGPORTF_WAITING flag */
886         if (error) {
887             spin_unlock(&port->mpu_spin);
888             return(NULL);
889         }
890     }
891     _lwkt_pullmsg(port, msg);
892     spin_unlock(&port->mpu_spin);
893     return(msg);
894 }
895
896 static
897 void
898 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
899 {
900     int dowakeup;
901
902     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
903
904     if (msg->ms_flags & MSGF_SYNC) {
905         /*
906          * If a synchronous completion has been requested, just wakeup
907          * the message without bothering to queue it to the target port.
908          */
909         spin_lock(&port->mpu_spin);
910         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
911         dowakeup = 0;
912         if (msg->ms_flags & MSGF_WAITING) {
913                 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
914                 dowakeup = 1;
915         }
916         spin_unlock(&port->mpu_spin);
917         if (dowakeup)
918                 wakeup(msg);
919     } else {
920         /*
921          * If an asynchronous completion has been requested the message
922          * must be queued to the reply port.
923          */
924         spin_lock(&port->mpu_spin);
925         _lwkt_enqueue_reply(port, msg);
926         dowakeup = 0;
927         if (port->mp_flags & MSGPORTF_WAITING) {
928             port->mp_flags &= ~MSGPORTF_WAITING;
929             dowakeup = 1;
930         }
931         spin_unlock(&port->mpu_spin);
932         if (dowakeup)
933             wakeup(port);
934     }
935 }
936
937 /*
938  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
939  *
940  * This function could _only_ be used when caller is in the same thread
941  * as the message's target port owner thread.
942  */
943 static void
944 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
945 {
946     KASSERT(port->mpu_td == curthread,
947             ("message could only be dropped in the same thread "
948              "as the message target port thread\n"));
949     spin_lock(&port->mpu_spin);
950     _lwkt_pullmsg(port, msg);
951     msg->ms_flags |= MSGF_DONE;
952     spin_unlock(&port->mpu_spin);
953 }
954
955 /************************************************************************
956  *                        SERIALIZER PORT BACKEND                       *
957  ************************************************************************
958  *
959  * This backend uses serializer to protect port accessing.  Callers are
960  * assumed to have serializer held.  This kind of port is usually created
961  * by network device driver along with _one_ lwkt thread to pipeline
962  * operations which may temporarily release serializer.
963  *
964  * Implementation is based on SPIN PORT BACKEND.
965  */
966
967 static
968 void *
969 lwkt_serialize_getport(lwkt_port_t port)
970 {
971     lwkt_msg_t msg;
972
973     ASSERT_SERIALIZED(port->mpu_serialize);
974
975     if ((msg = _lwkt_pollmsg(port)) != NULL)
976         _lwkt_pullmsg(port, msg);
977     return(msg);
978 }
979
980 static
981 int
982 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
983 {
984     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
985     ASSERT_SERIALIZED(port->mpu_serialize);
986
987     msg->ms_target_port = port;
988     _lwkt_pushmsg(port, msg);
989     if (port->mp_flags & MSGPORTF_WAITING) {
990         port->mp_flags &= ~MSGPORTF_WAITING;
991         wakeup(port);
992     }
993     return (EASYNC);
994 }
995
996 static
997 int
998 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
999 {
1000     lwkt_port_t port;
1001     int sentabort;
1002     int error;
1003
1004     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1005             ("can't wait dropable message"));
1006
1007     if ((msg->ms_flags & MSGF_DONE) == 0) {
1008         port = msg->ms_reply_port;
1009
1010         ASSERT_SERIALIZED(port->mpu_serialize);
1011
1012         sentabort = 0;
1013         while ((msg->ms_flags & MSGF_DONE) == 0) {
1014             void *won;
1015
1016             /*
1017              * If message was sent synchronously from the beginning
1018              * the wakeup will be on the message structure, else it
1019              * will be on the port structure.
1020              */
1021             if (msg->ms_flags & MSGF_SYNC) {
1022                 won = msg;
1023             } else {
1024                 won = port;
1025                 port->mp_flags |= MSGPORTF_WAITING;
1026             }
1027
1028             /*
1029              * Only messages which support abort can be interrupted.
1030              * We must still wait for message completion regardless.
1031              */
1032             if ((flags & PCATCH) && sentabort == 0) {
1033                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1034                 if (error) {
1035                     sentabort = error;
1036                     lwkt_serialize_exit(port->mpu_serialize);
1037                     lwkt_abortmsg(msg);
1038                     lwkt_serialize_enter(port->mpu_serialize);
1039                 }
1040             } else {
1041                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1042             }
1043             /* see note at the top on the MSGPORTF_WAITING flag */
1044         }
1045         /*
1046          * Turn EINTR into ERESTART if the signal indicates.
1047          */
1048         if (sentabort && msg->ms_error == EINTR)
1049             msg->ms_error = sentabort;
1050         if (msg->ms_flags & MSGF_QUEUED)
1051             _lwkt_pullmsg(port, msg);
1052     } else {
1053         if (msg->ms_flags & MSGF_QUEUED) {
1054             port = msg->ms_reply_port;
1055
1056             ASSERT_SERIALIZED(port->mpu_serialize);
1057             _lwkt_pullmsg(port, msg);
1058         }
1059     }
1060     return(msg->ms_error);
1061 }
1062
1063 static
1064 void *
1065 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1066 {
1067     lwkt_msg_t msg;
1068     int error;
1069
1070     ASSERT_SERIALIZED(port->mpu_serialize);
1071
1072     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1073         port->mp_flags |= MSGPORTF_WAITING;
1074         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1075         /* see note at the top on the MSGPORTF_WAITING flag */
1076         if (error)
1077             return(NULL);
1078     }
1079     _lwkt_pullmsg(port, msg);
1080     return(msg);
1081 }
1082
1083 static
1084 void
1085 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1086 {
1087     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1088     ASSERT_SERIALIZED(port->mpu_serialize);
1089
1090     if (msg->ms_flags & MSGF_SYNC) {
1091         /*
1092          * If a synchronous completion has been requested, just wakeup
1093          * the message without bothering to queue it to the target port.
1094          */
1095         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1096         wakeup(msg);
1097     } else {
1098         /*
1099          * If an asynchronous completion has been requested the message
1100          * must be queued to the reply port.
1101          */
1102         _lwkt_enqueue_reply(port, msg);
1103         if (port->mp_flags & MSGPORTF_WAITING) {
1104             port->mp_flags &= ~MSGPORTF_WAITING;
1105             wakeup(port);
1106         }
1107     }
1108 }
1109
1110 /************************************************************************
1111  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1112  ************************************************************************/
1113
1114 /*
1115  * You can point a port's reply vector at this function if you just want
1116  * the message marked done, without any queueing or signaling.  This is
1117  * often used for structure-embedded messages.
1118  */
1119 static
1120 void
1121 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1122 {
1123     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1124 }
1125
1126 static
1127 void *
1128 lwkt_panic_getport(lwkt_port_t port)
1129 {
1130     panic("lwkt_getport() illegal on port %p", port);
1131 }
1132
1133 static
1134 int
1135 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1136 {
1137     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1138 }
1139
1140 static
1141 int
1142 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1143 {
1144     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1145 }
1146
1147 static
1148 void *
1149 lwkt_panic_waitport(lwkt_port_t port, int flags)
1150 {
1151     panic("port %p cannot be waited on", port);
1152 }
1153
1154 static
1155 void
1156 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1157 {
1158     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1159 }
1160
1161 static
1162 void
1163 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1164 {
1165     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1166 }