kernel/lwkt_msgport: Improve comments a bit.
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #ifdef SMP
69 #include <machine/smp.h>
70 #endif
71
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
74
75 /************************************************************************
76  *                              MESSAGE FUNCTIONS                       *
77  ************************************************************************/
78
79 /*
80  * lwkt_sendmsg()
81  *
82  *      Request asynchronous completion and call lwkt_beginmsg().  The
83  *      target port can opt to execute the message synchronously or
84  *      asynchronously and this function will automatically queue the
85  *      response if the target executes the message synchronously.
86  *
87  *      NOTE: The message is in an indeterminant state until this call
88  *      returns.  The caller should not mess with it (e.g. try to abort it)
89  *      until then.
90  */
91 void
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
93 {
94     int error;
95
96     KKASSERT(msg->ms_reply_port != NULL &&
97              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
100         /*
101          * Target port opted to execute the message synchronously so
102          * queue the response.
103          */
104         lwkt_replymsg(msg, error);
105     }
106 }
107
108 /*
109  * lwkt_domsg()
110  *
111  *      Request synchronous completion and call lwkt_beginmsg().  The
112  *      target port can opt to execute the message synchronously or
113  *      asynchronously and this function will automatically block and
114  *      wait for a response if the target executes the message
115  *      asynchronously.
116  */
117 int
118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
119 {
120     int error;
121
122     KKASSERT(msg->ms_reply_port != NULL &&
123              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
124     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
125     msg->ms_flags |= MSGF_SYNC;
126     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
127         /*
128          * Target port opted to execute the message asynchronously so
129          * block and wait for a reply.
130          */
131         error = lwkt_waitmsg(msg, flags);
132     } else {
133         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
134     }
135     return(error);
136 }
137
138 /*
139  * lwkt_forwardmsg()
140  *
141  * Forward a message received on one port to another port.
142  */
143 int
144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
145 {   
146     int error;
147
148     crit_enter();
149     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
150     if ((error = port->mp_putport(port, msg)) != EASYNC)
151         lwkt_replymsg(msg, error);
152     crit_exit();
153     return(error);
154 }
155
156 /*
157  * lwkt_abortmsg()
158  *
159  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
160  * The caller must ensure that the message will not be both replied AND
161  * destroyed while the abort is in progress.
162  *
163  * This function issues a callback which might block!
164  */
165 void
166 lwkt_abortmsg(lwkt_msg_t msg)
167 {
168     /*
169      * A critical section protects us from reply IPIs on this cpu.
170      */
171     crit_enter();
172
173     /*
174      * Shortcut the operation if the message has already been returned.
175      * The callback typically constructs a lwkt_msg with the abort request,
176      * issues it synchronously, and waits for completion.  The callback
177      * is not required to actually abort the message and the target port,
178      * upon receiving an abort request message generated by the callback
179      * should check whether the original message has already completed or
180      * not.
181      */
182     if (msg->ms_flags & MSGF_ABORTABLE) {
183         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
184             msg->ms_abortfn(msg);
185     }
186     crit_exit();
187 }
188
189 /************************************************************************
190  *                      PORT INITIALIZATION API                         *
191  ************************************************************************/
192
193 static void *lwkt_thread_getport(lwkt_port_t port);
194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
199
200 static void *lwkt_spin_getport(lwkt_port_t port);
201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
206
207 static void *lwkt_serialize_getport(lwkt_port_t port);
208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
212
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
220
221 /*
222  * Core port initialization (internal)
223  */
224 static __inline
225 void
226 _lwkt_initport(lwkt_port_t port,
227                void *(*gportfn)(lwkt_port_t),
228                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
229                int (*wmsgfn)(lwkt_msg_t, int),
230                void *(*wportfn)(lwkt_port_t, int),
231                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
232                void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     TAILQ_INIT(&port->mp_msgq_prio);
237     port->mp_getport = gportfn;
238     port->mp_putport = pportfn;
239     port->mp_waitmsg =  wmsgfn;
240     port->mp_waitport =  wportfn;
241     port->mp_replyport = rportfn;
242     port->mp_dropmsg = dmsgfn;
243 }
244
245 /*
246  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
247  * we tell the scheduler not to reschedule if td is at a higher priority.
248  *
249  * This routine is called even if the thread is already scheduled.
250  */
251 static __inline
252 void
253 _lwkt_schedule_msg(thread_t td, int flags)
254 {
255     lwkt_schedule(td);
256 }
257
258 /*
259  * lwkt_initport_thread()
260  *
261  *      Initialize a port for use by a particular thread.  The port may
262  *      only be used by <td>.
263  */
264 void
265 lwkt_initport_thread(lwkt_port_t port, thread_t td)
266 {
267     _lwkt_initport(port,
268                    lwkt_thread_getport,
269                    lwkt_thread_putport,
270                    lwkt_thread_waitmsg,
271                    lwkt_thread_waitport,
272                    lwkt_thread_replyport,
273                    lwkt_thread_dropmsg);
274     port->mpu_td = td;
275 }
276
277 /*
278  * lwkt_initport_spin()
279  *
280  *      Initialize a port for use with descriptors that might be accessed
281  *      via multiple LWPs, processes, or threads.  Has somewhat more
282  *      overhead then thread ports.
283  */
284 void
285 lwkt_initport_spin(lwkt_port_t port)
286 {
287     _lwkt_initport(port,
288                    lwkt_spin_getport,
289                    lwkt_spin_putport,
290                    lwkt_spin_waitmsg,
291                    lwkt_spin_waitport,
292                    lwkt_spin_replyport,
293                    lwkt_spin_dropmsg);
294     spin_init(&port->mpu_spin);
295 }
296
297 /*
298  * lwkt_initport_serialize()
299  *
300  *      Initialize a port for use with descriptors that might be accessed
301  *      via multiple LWPs, processes, or threads.  Callers are assumed to
302  *      have held the serializer (slz).
303  */
304 void
305 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
306 {
307     _lwkt_initport(port,
308                    lwkt_serialize_getport,
309                    lwkt_serialize_putport,
310                    lwkt_serialize_waitmsg,
311                    lwkt_serialize_waitport,
312                    lwkt_serialize_replyport,
313                    lwkt_panic_dropmsg);
314     port->mpu_serialize = slz;
315 }
316
317 /*
318  * Similar to the standard initport, this function simply marks the message
319  * as being done and does not attempt to return it to an originating port.
320  */
321 void
322 lwkt_initport_replyonly_null(lwkt_port_t port)
323 {
324     _lwkt_initport(port,
325                    lwkt_panic_getport,
326                    lwkt_panic_putport,
327                    lwkt_panic_waitmsg,
328                    lwkt_panic_waitport,
329                    lwkt_null_replyport,
330                    lwkt_panic_dropmsg);
331 }
332
333 /*
334  * Initialize a reply-only port, typically used as a message sink.  Such
335  * ports can only be used as a reply port.
336  */
337 void
338 lwkt_initport_replyonly(lwkt_port_t port,
339                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
340 {
341     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
342                          lwkt_panic_waitmsg, lwkt_panic_waitport,
343                          rportfn, lwkt_panic_dropmsg);
344 }
345
346 void
347 lwkt_initport_putonly(lwkt_port_t port,
348                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
349 {
350     _lwkt_initport(port, lwkt_panic_getport, pportfn,
351                          lwkt_panic_waitmsg, lwkt_panic_waitport,
352                          lwkt_panic_replyport, lwkt_panic_dropmsg);
353 }
354
355 void
356 lwkt_initport_panic(lwkt_port_t port)
357 {
358     _lwkt_initport(port,
359                    lwkt_panic_getport, lwkt_panic_putport,
360                    lwkt_panic_waitmsg, lwkt_panic_waitport,
361                    lwkt_panic_replyport, lwkt_panic_dropmsg);
362 }
363
364 static __inline
365 void
366 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
367 {
368     lwkt_msg_queue *queue;
369
370     /*
371      * normal case, remove and return the message.
372      */
373     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
374         queue = &port->mp_msgq_prio;
375     else
376         queue = &port->mp_msgq;
377     TAILQ_REMOVE(queue, msg, ms_node);
378
379     /*
380      * atomic op needed for spin ports
381      */
382     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
383 }
384
385 static __inline
386 void
387 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
388 {
389     lwkt_msg_queue *queue;
390
391     /*
392      * atomic op needed for spin ports
393      */
394     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
395     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
396         queue = &port->mp_msgq_prio;
397     else
398         queue = &port->mp_msgq;
399     TAILQ_INSERT_TAIL(queue, msg, ms_node);
400 }
401
402 static __inline
403 lwkt_msg_t
404 _lwkt_pollmsg(lwkt_port_t port)
405 {
406     lwkt_msg_t msg;
407
408     msg = TAILQ_FIRST(&port->mp_msgq_prio);
409     if (__predict_false(msg != NULL))
410         return msg;
411
412     /*
413      * Priority queue has no message, fallback to non-priority queue.
414      */
415     return TAILQ_FIRST(&port->mp_msgq);
416 }
417
418 static __inline
419 void
420 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
421 {
422     /*
423      * atomic op needed for spin ports
424      */
425     _lwkt_pushmsg(port, msg);
426     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
427 }
428
429 /************************************************************************
430  *                      THREAD PORT BACKEND                             *
431  ************************************************************************
432  *
433  * This backend is used when the port a message is retrieved from is owned
434  * by a single thread (the calling thread).  Messages are IPId to the
435  * correct cpu before being enqueued to a port.  Note that this is fairly
436  * optimal since scheduling would have had to do an IPI anyway if the
437  * message were headed to a different cpu.
438  */
439
440 #ifdef SMP
441
442 /*
443  * This function completes reply processing for the default case in the
444  * context of the originating cpu.
445  */
446 static
447 void
448 lwkt_thread_replyport_remote(lwkt_msg_t msg)
449 {
450     lwkt_port_t port = msg->ms_reply_port;
451     int flags;
452
453     /*
454      * Chase any thread migration that occurs
455      */
456     if (port->mpu_td->td_gd != mycpu) {
457         lwkt_send_ipiq(port->mpu_td->td_gd,
458                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
459         return;
460     }
461
462     /*
463      * Cleanup
464      */
465 #ifdef INVARIANTS
466     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
467     msg->ms_flags &= ~MSGF_INTRANSIT;
468 #endif
469     flags = msg->ms_flags;
470     if (msg->ms_flags & MSGF_SYNC) {
471         cpu_sfence();
472         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
473     } else {
474         _lwkt_enqueue_reply(port, msg);
475     }
476     if (port->mp_flags & MSGPORTF_WAITING)
477         _lwkt_schedule_msg(port->mpu_td, flags);
478 }
479
480 #endif
481
482 /*
483  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
484  *
485  * Called with the reply port as an argument but in the context of the
486  * original target port.  Completion must occur on the target port's
487  * cpu.
488  *
489  * The critical section protects us from IPIs on the this CPU.
490  */
491 static
492 void
493 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
494 {
495     int flags;
496
497     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
498
499     if (msg->ms_flags & MSGF_SYNC) {
500         /*
501          * If a synchronous completion has been requested, just wakeup
502          * the message without bothering to queue it to the target port.
503          *
504          * Assume the target thread is non-preemptive, so no critical
505          * section is required.
506          */
507 #ifdef SMP
508         if (port->mpu_td->td_gd == mycpu) {
509 #endif
510             flags = msg->ms_flags;
511             cpu_sfence();
512             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
513             if (port->mp_flags & MSGPORTF_WAITING)
514                 _lwkt_schedule_msg(port->mpu_td, flags);
515 #ifdef SMP
516         } else {
517 #ifdef INVARIANTS
518             msg->ms_flags |= MSGF_INTRANSIT;
519 #endif
520             msg->ms_flags |= MSGF_REPLY;
521             lwkt_send_ipiq(port->mpu_td->td_gd,
522                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
523         }
524 #endif
525     } else {
526         /*
527          * If an asynchronous completion has been requested the message
528          * must be queued to the reply port.
529          *
530          * A critical section is required to interlock the port queue.
531          */
532 #ifdef SMP
533         if (port->mpu_td->td_gd == mycpu) {
534 #endif
535             crit_enter();
536             _lwkt_enqueue_reply(port, msg);
537             if (port->mp_flags & MSGPORTF_WAITING)
538                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
539             crit_exit();
540 #ifdef SMP
541         } else {
542 #ifdef INVARIANTS
543             msg->ms_flags |= MSGF_INTRANSIT;
544 #endif
545             msg->ms_flags |= MSGF_REPLY;
546             lwkt_send_ipiq(port->mpu_td->td_gd,
547                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
548         }
549 #endif
550     }
551 }
552
553 /*
554  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
555  *
556  * This function could _only_ be used when caller is in the same thread
557  * as the message's target port owner thread.
558  */
559 static void
560 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
561 {
562     KASSERT(port->mpu_td == curthread,
563             ("message could only be dropped in the same thread "
564              "as the message target port thread"));
565     crit_enter_quick(port->mpu_td);
566     _lwkt_pullmsg(port, msg);
567     msg->ms_flags |= MSGF_DONE;
568     crit_exit_quick(port->mpu_td);
569 }
570
571 /*
572  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
573  *
574  * Called with the target port as an argument but in the context of the
575  * reply port.  This function always implements an asynchronous put to
576  * the target message port, and thus returns EASYNC.
577  *
578  * The message must already have cleared MSGF_DONE and MSGF_REPLY
579  */
580
581 #ifdef SMP
582
583 static
584 void
585 lwkt_thread_putport_remote(lwkt_msg_t msg)
586 {
587     lwkt_port_t port = msg->ms_target_port;
588
589     /*
590      * Chase any thread migration that occurs
591      */
592     if (port->mpu_td->td_gd != mycpu) {
593         lwkt_send_ipiq(port->mpu_td->td_gd,
594                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
595         return;
596     }
597
598     /*
599      * Cleanup
600      */
601 #ifdef INVARIANTS
602     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
603     msg->ms_flags &= ~MSGF_INTRANSIT;
604 #endif
605     _lwkt_pushmsg(port, msg);
606     if (port->mp_flags & MSGPORTF_WAITING)
607         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
608 }
609
610 #endif
611
612 static
613 int
614 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
615 {
616     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
617
618     msg->ms_target_port = port;
619 #ifdef SMP
620     if (port->mpu_td->td_gd == mycpu) {
621 #endif
622         crit_enter();
623         _lwkt_pushmsg(port, msg);
624         if (port->mp_flags & MSGPORTF_WAITING)
625             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
626         crit_exit();
627 #ifdef SMP
628     } else {
629 #ifdef INVARIANTS
630         msg->ms_flags |= MSGF_INTRANSIT;
631 #endif
632         lwkt_send_ipiq(port->mpu_td->td_gd,
633                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
634     }
635 #endif
636     return (EASYNC);
637 }
638
639 /*
640  * lwkt_thread_getport()
641  *
642  *      Retrieve the next message from the port or NULL if no messages
643  *      are ready.
644  */
645 static
646 void *
647 lwkt_thread_getport(lwkt_port_t port)
648 {
649     lwkt_msg_t msg;
650
651     KKASSERT(port->mpu_td == curthread);
652
653     crit_enter_quick(port->mpu_td);
654     if ((msg = _lwkt_pollmsg(port)) != NULL)
655         _lwkt_pullmsg(port, msg);
656     crit_exit_quick(port->mpu_td);
657     return(msg);
658 }
659
660 /*
661  * lwkt_thread_waitmsg()
662  *
663  *      Wait for a particular message to be replied.  We must be the only
664  *      thread waiting on the message.  The port must be owned by the
665  *      caller.
666  */
667 static
668 int
669 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
670 {
671     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
672             ("can't wait dropable message"));
673
674     if ((msg->ms_flags & MSGF_DONE) == 0) {
675         /*
676          * If the done bit was not set we have to block until it is.
677          */
678         lwkt_port_t port = msg->ms_reply_port;
679         thread_t td = curthread;
680         int sentabort;
681
682         KKASSERT(port->mpu_td == td);
683         crit_enter_quick(td);
684         sentabort = 0;
685
686         while ((msg->ms_flags & MSGF_DONE) == 0) {
687             port->mp_flags |= MSGPORTF_WAITING;
688             if (sentabort == 0) {
689                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
690                     lwkt_abortmsg(msg);
691                 }
692             } else {
693                 lwkt_sleep("waitabt", 0);
694             }
695             port->mp_flags &= ~MSGPORTF_WAITING;
696         }
697         if (msg->ms_flags & MSGF_QUEUED)
698             _lwkt_pullmsg(port, msg);
699         crit_exit_quick(td);
700     } else {
701         /*
702          * If the done bit was set we only have to mess around with the
703          * message if it is queued on the reply port.
704          */
705         if (msg->ms_flags & MSGF_QUEUED) {
706             lwkt_port_t port = msg->ms_reply_port;
707             thread_t td = curthread;
708
709             KKASSERT(port->mpu_td == td);
710             crit_enter_quick(td);
711             _lwkt_pullmsg(port, msg);
712             crit_exit_quick(td);
713         }
714     }
715     return(msg->ms_error);
716 }
717
718 /*
719  * lwkt_thread_waitport()
720  *
721  *      Wait for a new message to be available on the port.  We must be the
722  *      the only thread waiting on the port.  The port must be owned by caller.
723  */
724 static
725 void *
726 lwkt_thread_waitport(lwkt_port_t port, int flags)
727 {
728     thread_t td = curthread;
729     lwkt_msg_t msg;
730     int error;
731
732     KKASSERT(port->mpu_td == td);
733     crit_enter_quick(td);
734     while ((msg = _lwkt_pollmsg(port)) == NULL) {
735         port->mp_flags |= MSGPORTF_WAITING;
736         error = lwkt_sleep("waitport", flags);
737         port->mp_flags &= ~MSGPORTF_WAITING;
738         if (error)
739             goto done;
740     }
741     _lwkt_pullmsg(port, msg);
742 done:
743     crit_exit_quick(td);
744     return(msg);
745 }
746
747 /************************************************************************
748  *                         SPIN PORT BACKEND                            *
749  ************************************************************************
750  *
751  * This backend uses spinlocks instead of making assumptions about which
752  * thread is accessing the port.  It must be used when a port is not owned
753  * by a particular thread.  This is less optimal then thread ports but
754  * you don't have a choice if there are multiple threads accessing the port.
755  *
756  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
757  * on the message port, it is the responsibility of the code doing the
758  * wakeup to clear this flag rather then the blocked threads.  Some
759  * superfluous wakeups may occur, which is ok.
760  *
761  * XXX synchronous message wakeups are not current optimized.
762  */
763
764 static
765 void *
766 lwkt_spin_getport(lwkt_port_t port)
767 {
768     lwkt_msg_t msg;
769
770     spin_lock(&port->mpu_spin);
771     if ((msg = _lwkt_pollmsg(port)) != NULL)
772         _lwkt_pullmsg(port, msg);
773     spin_unlock(&port->mpu_spin);
774     return(msg);
775 }
776
777 static
778 int
779 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
780 {
781     int dowakeup;
782
783     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
784
785     msg->ms_target_port = port;
786     spin_lock(&port->mpu_spin);
787     _lwkt_pushmsg(port, msg);
788     dowakeup = 0;
789     if (port->mp_flags & MSGPORTF_WAITING) {
790         port->mp_flags &= ~MSGPORTF_WAITING;
791         dowakeup = 1;
792     }
793     spin_unlock(&port->mpu_spin);
794     if (dowakeup)
795         wakeup(port);
796     return (EASYNC);
797 }
798
799 static
800 int
801 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
802 {
803     lwkt_port_t port;
804     int sentabort;
805     int error;
806
807     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
808             ("can't wait dropable message"));
809
810     if ((msg->ms_flags & MSGF_DONE) == 0) {
811         port = msg->ms_reply_port;
812         sentabort = 0;
813         spin_lock(&port->mpu_spin);
814         while ((msg->ms_flags & MSGF_DONE) == 0) {
815             void *won;
816
817             /*
818              * If message was sent synchronously from the beginning
819              * the wakeup will be on the message structure, else it
820              * will be on the port structure.
821              */
822             if (msg->ms_flags & MSGF_SYNC) {
823                 won = msg;
824                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
825             } else {
826                 won = port;
827                 port->mp_flags |= MSGPORTF_WAITING;
828             }
829
830             /*
831              * Only messages which support abort can be interrupted.
832              * We must still wait for message completion regardless.
833              */
834             if ((flags & PCATCH) && sentabort == 0) {
835                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
836                 if (error) {
837                     sentabort = error;
838                     spin_unlock(&port->mpu_spin);
839                     lwkt_abortmsg(msg);
840                     spin_lock(&port->mpu_spin);
841                 }
842             } else {
843                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
844             }
845             /* see note at the top on the MSGPORTF_WAITING flag */
846         }
847         /*
848          * Turn EINTR into ERESTART if the signal indicates.
849          */
850         if (sentabort && msg->ms_error == EINTR)
851             msg->ms_error = sentabort;
852         if (msg->ms_flags & MSGF_QUEUED)
853             _lwkt_pullmsg(port, msg);
854         spin_unlock(&port->mpu_spin);
855     } else {
856         if (msg->ms_flags & MSGF_QUEUED) {
857             port = msg->ms_reply_port;
858             spin_lock(&port->mpu_spin);
859             _lwkt_pullmsg(port, msg);
860             spin_unlock(&port->mpu_spin);
861         }
862     }
863     return(msg->ms_error);
864 }
865
866 static
867 void *
868 lwkt_spin_waitport(lwkt_port_t port, int flags)
869 {
870     lwkt_msg_t msg;
871     int error;
872
873     spin_lock(&port->mpu_spin);
874     while ((msg = _lwkt_pollmsg(port)) == NULL) {
875         port->mp_flags |= MSGPORTF_WAITING;
876         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
877         /* see note at the top on the MSGPORTF_WAITING flag */
878         if (error) {
879             spin_unlock(&port->mpu_spin);
880             return(NULL);
881         }
882     }
883     _lwkt_pullmsg(port, msg);
884     spin_unlock(&port->mpu_spin);
885     return(msg);
886 }
887
888 static
889 void
890 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
891 {
892     int dowakeup;
893
894     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
895
896     if (msg->ms_flags & MSGF_SYNC) {
897         /*
898          * If a synchronous completion has been requested, just wakeup
899          * the message without bothering to queue it to the target port.
900          */
901         spin_lock(&port->mpu_spin);
902         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
903         dowakeup = 0;
904         if (msg->ms_flags & MSGF_WAITING) {
905                 atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
906                 dowakeup = 1;
907         }
908         spin_unlock(&port->mpu_spin);
909         if (dowakeup)
910                 wakeup(msg);
911     } else {
912         /*
913          * If an asynchronous completion has been requested the message
914          * must be queued to the reply port.
915          */
916         spin_lock(&port->mpu_spin);
917         _lwkt_enqueue_reply(port, msg);
918         dowakeup = 0;
919         if (port->mp_flags & MSGPORTF_WAITING) {
920             port->mp_flags &= ~MSGPORTF_WAITING;
921             dowakeup = 1;
922         }
923         spin_unlock(&port->mpu_spin);
924         if (dowakeup)
925             wakeup(port);
926     }
927 }
928
929 /*
930  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
931  *
932  * This function could _only_ be used when caller is in the same thread
933  * as the message's target port owner thread.
934  */
935 static void
936 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
937 {
938     KASSERT(port->mpu_td == curthread,
939             ("message could only be dropped in the same thread "
940              "as the message target port thread\n"));
941     spin_lock(&port->mpu_spin);
942     _lwkt_pullmsg(port, msg);
943     msg->ms_flags |= MSGF_DONE;
944     spin_unlock(&port->mpu_spin);
945 }
946
947 /************************************************************************
948  *                        SERIALIZER PORT BACKEND                       *
949  ************************************************************************
950  *
951  * This backend uses serializer to protect port accessing.  Callers are
952  * assumed to have serializer held.  This kind of port is usually created
953  * by network device driver along with _one_ lwkt thread to pipeline
954  * operations which may temporarily release serializer.
955  *
956  * Implementation is based on SPIN PORT BACKEND.
957  */
958
959 static
960 void *
961 lwkt_serialize_getport(lwkt_port_t port)
962 {
963     lwkt_msg_t msg;
964
965     ASSERT_SERIALIZED(port->mpu_serialize);
966
967     if ((msg = _lwkt_pollmsg(port)) != NULL)
968         _lwkt_pullmsg(port, msg);
969     return(msg);
970 }
971
972 static
973 int
974 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
975 {
976     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
977     ASSERT_SERIALIZED(port->mpu_serialize);
978
979     msg->ms_target_port = port;
980     _lwkt_pushmsg(port, msg);
981     if (port->mp_flags & MSGPORTF_WAITING) {
982         port->mp_flags &= ~MSGPORTF_WAITING;
983         wakeup(port);
984     }
985     return (EASYNC);
986 }
987
988 static
989 int
990 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
991 {
992     lwkt_port_t port;
993     int sentabort;
994     int error;
995
996     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
997             ("can't wait dropable message"));
998
999     if ((msg->ms_flags & MSGF_DONE) == 0) {
1000         port = msg->ms_reply_port;
1001
1002         ASSERT_SERIALIZED(port->mpu_serialize);
1003
1004         sentabort = 0;
1005         while ((msg->ms_flags & MSGF_DONE) == 0) {
1006             void *won;
1007
1008             /*
1009              * If message was sent synchronously from the beginning
1010              * the wakeup will be on the message structure, else it
1011              * will be on the port structure.
1012              */
1013             if (msg->ms_flags & MSGF_SYNC) {
1014                 won = msg;
1015             } else {
1016                 won = port;
1017                 port->mp_flags |= MSGPORTF_WAITING;
1018             }
1019
1020             /*
1021              * Only messages which support abort can be interrupted.
1022              * We must still wait for message completion regardless.
1023              */
1024             if ((flags & PCATCH) && sentabort == 0) {
1025                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1026                 if (error) {
1027                     sentabort = error;
1028                     lwkt_serialize_exit(port->mpu_serialize);
1029                     lwkt_abortmsg(msg);
1030                     lwkt_serialize_enter(port->mpu_serialize);
1031                 }
1032             } else {
1033                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1034             }
1035             /* see note at the top on the MSGPORTF_WAITING flag */
1036         }
1037         /*
1038          * Turn EINTR into ERESTART if the signal indicates.
1039          */
1040         if (sentabort && msg->ms_error == EINTR)
1041             msg->ms_error = sentabort;
1042         if (msg->ms_flags & MSGF_QUEUED)
1043             _lwkt_pullmsg(port, msg);
1044     } else {
1045         if (msg->ms_flags & MSGF_QUEUED) {
1046             port = msg->ms_reply_port;
1047
1048             ASSERT_SERIALIZED(port->mpu_serialize);
1049             _lwkt_pullmsg(port, msg);
1050         }
1051     }
1052     return(msg->ms_error);
1053 }
1054
1055 static
1056 void *
1057 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1058 {
1059     lwkt_msg_t msg;
1060     int error;
1061
1062     ASSERT_SERIALIZED(port->mpu_serialize);
1063
1064     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1065         port->mp_flags |= MSGPORTF_WAITING;
1066         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1067         /* see note at the top on the MSGPORTF_WAITING flag */
1068         if (error)
1069             return(NULL);
1070     }
1071     _lwkt_pullmsg(port, msg);
1072     return(msg);
1073 }
1074
1075 static
1076 void
1077 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1078 {
1079     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1080     ASSERT_SERIALIZED(port->mpu_serialize);
1081
1082     if (msg->ms_flags & MSGF_SYNC) {
1083         /*
1084          * If a synchronous completion has been requested, just wakeup
1085          * the message without bothering to queue it to the target port.
1086          */
1087         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1088         wakeup(msg);
1089     } else {
1090         /*
1091          * If an asynchronous completion has been requested the message
1092          * must be queued to the reply port.
1093          */
1094         _lwkt_enqueue_reply(port, msg);
1095         if (port->mp_flags & MSGPORTF_WAITING) {
1096             port->mp_flags &= ~MSGPORTF_WAITING;
1097             wakeup(port);
1098         }
1099     }
1100 }
1101
1102 /************************************************************************
1103  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1104  ************************************************************************/
1105
1106 /*
1107  * You can point a port's reply vector at this function if you just want
1108  * the message marked done, without any queueing or signaling.  This is
1109  * often used for structure-embedded messages.
1110  */
1111 static
1112 void
1113 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1114 {
1115     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1116 }
1117
1118 static
1119 void *
1120 lwkt_panic_getport(lwkt_port_t port)
1121 {
1122     panic("lwkt_getport() illegal on port %p", port);
1123 }
1124
1125 static
1126 int
1127 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1128 {
1129     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1130 }
1131
1132 static
1133 int
1134 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1135 {
1136     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1137 }
1138
1139 static
1140 void *
1141 lwkt_panic_waitport(lwkt_port_t port, int flags)
1142 {
1143     panic("port %p cannot be waited on", port);
1144 }
1145
1146 static
1147 void
1148 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1149 {
1150     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1151 }
1152
1153 static
1154 void
1155 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1156 {
1157     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1158 }