Merge branch 'vendor/DIFFUTILS'
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72
73 /************************************************************************
74  *                              MESSAGE FUNCTIONS                       *
75  ************************************************************************/
76
77 /*
78  * lwkt_sendmsg()
79  *
80  *      Request asynchronous completion and call lwkt_beginmsg().  The
81  *      target port can opt to execute the message synchronously or
82  *      asynchronously and this function will automatically queue the
83  *      response if the target executes the message synchronously.
84  *
85  *      NOTE: The message is in an indeterminant state until this call
86  *      returns.  The caller should not mess with it (e.g. try to abort it)
87  *      until then.
88  *
89  *      NOTE: Do not use this function to forward a message as we might
90  *      clobber ms_flags in a SMP race.
91  */
92 void
93 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
94 {
95     int error;
96
97     KKASSERT(msg->ms_reply_port != NULL &&
98              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
99     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
100     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
101         /*
102          * Target port opted to execute the message synchronously so
103          * queue the response.
104          */
105         lwkt_replymsg(msg, error);
106     }
107 }
108
109 void
110 lwkt_sendmsg_stage1(lwkt_port_t port, lwkt_msg_t msg)
111 {
112     KKASSERT(msg->ms_reply_port != NULL &&
113              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
114     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
115 }
116
117 void
118 lwkt_sendmsg_stage2(lwkt_port_t port, lwkt_msg_t msg)
119 {
120     int error;
121
122     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
123         /*
124          * Target port opted to execute the message synchronously so
125          * queue the response.
126          */
127         lwkt_replymsg(msg, error);
128     }
129 }
130
131 /*
132  * lwkt_domsg()
133  *
134  *      Request synchronous completion and call lwkt_beginmsg().  The
135  *      target port can opt to execute the message synchronously or
136  *      asynchronously and this function will automatically block and
137  *      wait for a response if the target executes the message
138  *      asynchronously.
139  *
140  *      NOTE: Do not use this function to forward a message as we might
141  *      clobber ms_flags in a SMP race.
142  */
143 int
144 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
145 {
146     int error;
147
148     KKASSERT(msg->ms_reply_port != NULL &&
149              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
150     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
151     msg->ms_flags |= MSGF_SYNC;
152     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
153         /*
154          * Target port opted to execute the message asynchronously so
155          * block and wait for a reply.
156          */
157         error = lwkt_waitmsg(msg, flags);
158     } else {
159         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
160     }
161     return(error);
162 }
163
164 /*
165  * lwkt_forwardmsg()
166  *
167  * Forward a message received on one port to another port.
168  */
169 int
170 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
171 {   
172     int error;
173
174     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
175     if ((error = port->mp_putport(port, msg)) != EASYNC)
176         lwkt_replymsg(msg, error);
177     return(error);
178 }
179
180 /*
181  * lwkt_abortmsg()
182  *
183  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
184  * The caller must ensure that the message will not be both replied AND
185  * destroyed while the abort is in progress.
186  *
187  * This function issues a callback which might block!
188  */
189 void
190 lwkt_abortmsg(lwkt_msg_t msg)
191 {
192     /*
193      * A critical section protects us from reply IPIs on this cpu.
194      */
195     crit_enter();
196
197     /*
198      * Shortcut the operation if the message has already been returned.
199      * The callback typically constructs a lwkt_msg with the abort request,
200      * issues it synchronously, and waits for completion.  The callback
201      * is not required to actually abort the message and the target port,
202      * upon receiving an abort request message generated by the callback
203      * should check whether the original message has already completed or
204      * not.
205      */
206     if (msg->ms_flags & MSGF_ABORTABLE) {
207         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
208             msg->ms_abortfn(msg);
209     }
210     crit_exit();
211 }
212
213 /************************************************************************
214  *                      PORT INITIALIZATION API                         *
215  ************************************************************************/
216
217 static void *lwkt_thread_getport(lwkt_port_t port);
218 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
219 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
220 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
221 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
222 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
223
224 static void *lwkt_spin_getport(lwkt_port_t port);
225 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
226 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
227 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
228 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
229 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
230
231 static void *lwkt_serialize_getport(lwkt_port_t port);
232 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
233 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
234 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
235 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
236
237 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
238 static void *lwkt_panic_getport(lwkt_port_t port);
239 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
240 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
241 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
242 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
243 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
244
245 /*
246  * Core port initialization (internal)
247  */
248 static __inline
249 void
250 _lwkt_initport(lwkt_port_t port,
251                void *(*gportfn)(lwkt_port_t),
252                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
253                int (*wmsgfn)(lwkt_msg_t, int),
254                void *(*wportfn)(lwkt_port_t, int),
255                void (*rportfn)(lwkt_port_t, lwkt_msg_t),
256                int (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
257 {
258     bzero(port, sizeof(*port));
259     TAILQ_INIT(&port->mp_msgq);
260     TAILQ_INIT(&port->mp_msgq_prio);
261     port->mp_getport = gportfn;
262     port->mp_putport = pportfn;
263     port->mp_waitmsg =  wmsgfn;
264     port->mp_waitport =  wportfn;
265     port->mp_replyport = rportfn;
266     port->mp_dropmsg = dmsgfn;
267 }
268
269 /*
270  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
271  * we tell the scheduler not to reschedule if td is at a higher priority.
272  *
273  * This routine is called even if the thread is already scheduled.
274  */
275 static __inline
276 void
277 _lwkt_schedule_msg(thread_t td, int flags)
278 {
279     lwkt_schedule(td);
280 }
281
282 /*
283  * lwkt_initport_thread()
284  *
285  *      Initialize a port for use by a particular thread.  The port may
286  *      only be used by <td>.
287  */
288 void
289 lwkt_initport_thread(lwkt_port_t port, thread_t td)
290 {
291     _lwkt_initport(port,
292                    lwkt_thread_getport,
293                    lwkt_thread_putport,
294                    lwkt_thread_waitmsg,
295                    lwkt_thread_waitport,
296                    lwkt_thread_replyport,
297                    lwkt_thread_dropmsg);
298     port->mpu_td = td;
299 }
300
301 /*
302  * lwkt_initport_spin()
303  *
304  *      Initialize a port for use with descriptors that might be accessed
305  *      via multiple LWPs, processes, or threads.  Has somewhat more
306  *      overhead then thread ports.
307  */
308 void
309 lwkt_initport_spin(lwkt_port_t port, thread_t td)
310 {
311     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
312
313     if (td == NULL)
314         dmsgfn = lwkt_panic_dropmsg;
315     else
316         dmsgfn = lwkt_spin_dropmsg;
317
318     _lwkt_initport(port,
319                    lwkt_spin_getport,
320                    lwkt_spin_putport,
321                    lwkt_spin_waitmsg,
322                    lwkt_spin_waitport,
323                    lwkt_spin_replyport,
324                    dmsgfn);
325     spin_init(&port->mpu_spin);
326     port->mpu_td = td;
327 }
328
329 /*
330  * lwkt_initport_serialize()
331  *
332  *      Initialize a port for use with descriptors that might be accessed
333  *      via multiple LWPs, processes, or threads.  Callers are assumed to
334  *      have held the serializer (slz).
335  */
336 void
337 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
338 {
339     _lwkt_initport(port,
340                    lwkt_serialize_getport,
341                    lwkt_serialize_putport,
342                    lwkt_serialize_waitmsg,
343                    lwkt_serialize_waitport,
344                    lwkt_serialize_replyport,
345                    lwkt_panic_dropmsg);
346     port->mpu_serialize = slz;
347 }
348
349 /*
350  * Similar to the standard initport, this function simply marks the message
351  * as being done and does not attempt to return it to an originating port.
352  */
353 void
354 lwkt_initport_replyonly_null(lwkt_port_t port)
355 {
356     _lwkt_initport(port,
357                    lwkt_panic_getport,
358                    lwkt_panic_putport,
359                    lwkt_panic_waitmsg,
360                    lwkt_panic_waitport,
361                    lwkt_null_replyport,
362                    lwkt_panic_dropmsg);
363 }
364
365 /*
366  * Initialize a reply-only port, typically used as a message sink.  Such
367  * ports can only be used as a reply port.
368  */
369 void
370 lwkt_initport_replyonly(lwkt_port_t port,
371                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
372 {
373     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
374                          lwkt_panic_waitmsg, lwkt_panic_waitport,
375                          rportfn, lwkt_panic_dropmsg);
376 }
377
378 void
379 lwkt_initport_putonly(lwkt_port_t port,
380                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
381 {
382     _lwkt_initport(port, lwkt_panic_getport, pportfn,
383                          lwkt_panic_waitmsg, lwkt_panic_waitport,
384                          lwkt_panic_replyport, lwkt_panic_dropmsg);
385 }
386
387 void
388 lwkt_initport_panic(lwkt_port_t port)
389 {
390     _lwkt_initport(port,
391                    lwkt_panic_getport, lwkt_panic_putport,
392                    lwkt_panic_waitmsg, lwkt_panic_waitport,
393                    lwkt_panic_replyport, lwkt_panic_dropmsg);
394 }
395
396 static __inline
397 void
398 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
399 {
400     lwkt_msg_queue *queue;
401
402     /*
403      * normal case, remove and return the message.
404      */
405     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
406         queue = &port->mp_msgq_prio;
407     else
408         queue = &port->mp_msgq;
409     TAILQ_REMOVE(queue, msg, ms_node);
410
411     /*
412      * atomic op needed for spin ports
413      */
414     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
415 }
416
417 static __inline
418 void
419 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
420 {
421     lwkt_msg_queue *queue;
422
423     /*
424      * atomic op needed for spin ports
425      */
426     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
427     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
428         queue = &port->mp_msgq_prio;
429     else
430         queue = &port->mp_msgq;
431     TAILQ_INSERT_TAIL(queue, msg, ms_node);
432 }
433
434 static __inline
435 lwkt_msg_t
436 _lwkt_pollmsg(lwkt_port_t port)
437 {
438     lwkt_msg_t msg;
439
440     msg = TAILQ_FIRST(&port->mp_msgq_prio);
441     if (__predict_false(msg != NULL))
442         return msg;
443
444     /*
445      * Priority queue has no message, fallback to non-priority queue.
446      */
447     return TAILQ_FIRST(&port->mp_msgq);
448 }
449
450 static __inline
451 void
452 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
453 {
454     /*
455      * atomic op needed for spin ports
456      */
457     _lwkt_pushmsg(port, msg);
458     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
459 }
460
461 /************************************************************************
462  *                      THREAD PORT BACKEND                             *
463  ************************************************************************
464  *
465  * This backend is used when the port a message is retrieved from is owned
466  * by a single thread (the calling thread).  Messages are IPId to the
467  * correct cpu before being enqueued to a port.  Note that this is fairly
468  * optimal since scheduling would have had to do an IPI anyway if the
469  * message were headed to a different cpu.
470  */
471
472 /*
473  * This function completes reply processing for the default case in the
474  * context of the originating cpu.
475  */
476 static
477 void
478 lwkt_thread_replyport_remote(lwkt_msg_t msg)
479 {
480     lwkt_port_t port = msg->ms_reply_port;
481     int flags;
482
483     /*
484      * Chase any thread migration that occurs
485      */
486     if (port->mpu_td->td_gd != mycpu) {
487         lwkt_send_ipiq(port->mpu_td->td_gd,
488                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
489         return;
490     }
491
492     /*
493      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
494      */
495 #ifdef INVARIANTS
496     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
497     msg->ms_flags &= ~MSGF_INTRANSIT;
498 #endif
499     flags = msg->ms_flags;
500     if (msg->ms_flags & MSGF_SYNC) {
501         cpu_sfence();
502         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
503     } else {
504         _lwkt_enqueue_reply(port, msg);
505     }
506     if (port->mp_flags & MSGPORTF_WAITING)
507         _lwkt_schedule_msg(port->mpu_td, flags);
508 }
509
510 /*
511  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
512  *
513  * Called with the reply port as an argument but in the context of the
514  * original target port.  Completion must occur on the target port's
515  * cpu.
516  *
517  * The critical section protects us from IPIs on the this CPU.
518  */
519 static
520 void
521 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
522 {
523     int flags;
524
525     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
526
527     if (msg->ms_flags & MSGF_SYNC) {
528         /*
529          * If a synchronous completion has been requested, just wakeup
530          * the message without bothering to queue it to the target port.
531          *
532          * Assume the target thread is non-preemptive, so no critical
533          * section is required.
534          */
535         if (port->mpu_td->td_gd == mycpu) {
536             crit_enter();
537             flags = msg->ms_flags;
538             cpu_sfence();
539             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
540             if (port->mp_flags & MSGPORTF_WAITING)
541                 _lwkt_schedule_msg(port->mpu_td, flags);
542             crit_exit();
543         } else {
544 #ifdef INVARIANTS
545             atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
546 #endif
547             atomic_set_int(&msg->ms_flags, MSGF_REPLY);
548             lwkt_send_ipiq(port->mpu_td->td_gd,
549                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
550         }
551     } else {
552         /*
553          * If an asynchronous completion has been requested the message
554          * must be queued to the reply port.
555          *
556          * A critical section is required to interlock the port queue.
557          */
558         if (port->mpu_td->td_gd == mycpu) {
559             crit_enter();
560             _lwkt_enqueue_reply(port, msg);
561             if (port->mp_flags & MSGPORTF_WAITING)
562                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
563             crit_exit();
564         } else {
565 #ifdef INVARIANTS
566             atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
567 #endif
568             atomic_set_int(&msg->ms_flags, MSGF_REPLY);
569             lwkt_send_ipiq(port->mpu_td->td_gd,
570                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
571         }
572     }
573 }
574
575 /*
576  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
577  *
578  * This function could _only_ be used when caller is in the same thread
579  * as the message's target port owner thread.
580  */
581 static int
582 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
583 {
584     int error;
585
586     KASSERT(port->mpu_td == curthread,
587             ("message could only be dropped in the same thread "
588              "as the message target port thread"));
589     crit_enter_quick(port->mpu_td);
590     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
591             _lwkt_pullmsg(port, msg);
592             atomic_set_int(&msg->ms_flags, MSGF_DONE);
593             error = 0;
594     } else {
595             error = ENOENT;
596     }
597     crit_exit_quick(port->mpu_td);
598
599     return (error);
600 }
601
602 /*
603  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
604  *
605  * Called with the target port as an argument but in the context of the
606  * reply port.  This function always implements an asynchronous put to
607  * the target message port, and thus returns EASYNC.
608  *
609  * The message must already have cleared MSGF_DONE and MSGF_REPLY
610  */
611 static
612 void
613 lwkt_thread_putport_remote(lwkt_msg_t msg)
614 {
615     lwkt_port_t port = msg->ms_target_port;
616
617     /*
618      * Chase any thread migration that occurs
619      */
620     if (port->mpu_td->td_gd != mycpu) {
621         lwkt_send_ipiq(port->mpu_td->td_gd,
622                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
623         return;
624     }
625
626     /*
627      * An atomic op is needed on ms_flags vs originator.  Also
628      * note that the originator might be using a different type
629      * of msgport.
630      */
631 #ifdef INVARIANTS
632     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
633     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
634 #endif
635     _lwkt_pushmsg(port, msg);
636     if (port->mp_flags & MSGPORTF_WAITING)
637         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
638 }
639
640 static
641 int
642 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
643 {
644     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
645
646     msg->ms_target_port = port;
647     if (port->mpu_td->td_gd == mycpu) {
648         crit_enter();
649         _lwkt_pushmsg(port, msg);
650         if (port->mp_flags & MSGPORTF_WAITING)
651             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
652         crit_exit();
653     } else {
654 #ifdef INVARIANTS
655         /*
656          * Cleanup.
657          *
658          * An atomic op is needed on ms_flags vs originator.  Also
659          * note that the originator might be using a different type
660          * of msgport.
661          */
662         atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
663 #endif
664         lwkt_send_ipiq(port->mpu_td->td_gd,
665                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
666     }
667     return (EASYNC);
668 }
669
670 /*
671  * lwkt_thread_getport()
672  *
673  *      Retrieve the next message from the port or NULL if no messages
674  *      are ready.
675  */
676 static
677 void *
678 lwkt_thread_getport(lwkt_port_t port)
679 {
680     lwkt_msg_t msg;
681
682     KKASSERT(port->mpu_td == curthread);
683
684     crit_enter_quick(port->mpu_td);
685     if ((msg = _lwkt_pollmsg(port)) != NULL)
686         _lwkt_pullmsg(port, msg);
687     crit_exit_quick(port->mpu_td);
688     return(msg);
689 }
690
691 /*
692  * lwkt_thread_waitmsg()
693  *
694  *      Wait for a particular message to be replied.  We must be the only
695  *      thread waiting on the message.  The port must be owned by the
696  *      caller.
697  */
698 static
699 int
700 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
701 {
702     thread_t td = curthread;
703
704     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
705             ("can't wait dropable message"));
706
707     if ((msg->ms_flags & MSGF_DONE) == 0) {
708         /*
709          * If the done bit was not set we have to block until it is.
710          */
711         lwkt_port_t port = msg->ms_reply_port;
712         int sentabort;
713
714         KKASSERT(port->mpu_td == td);
715         crit_enter_quick(td);
716         sentabort = 0;
717
718         while ((msg->ms_flags & MSGF_DONE) == 0) {
719             port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
720             if (sentabort == 0) {
721                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
722                     lwkt_abortmsg(msg);
723                 }
724             } else {
725                 lwkt_sleep("waitabt", 0);
726             }
727             port->mp_flags &= ~MSGPORTF_WAITING;
728         }
729         if (msg->ms_flags & MSGF_QUEUED)
730             _lwkt_pullmsg(port, msg);
731         crit_exit_quick(td);
732     } else {
733         /*
734          * If the done bit was set we only have to mess around with the
735          * message if it is queued on the reply port.
736          */
737         crit_enter_quick(td);
738         if (msg->ms_flags & MSGF_QUEUED) {
739             lwkt_port_t port = msg->ms_reply_port;
740             thread_t td __debugvar = curthread;
741
742             KKASSERT(port->mpu_td == td);
743             _lwkt_pullmsg(port, msg);
744         }
745         crit_exit_quick(td);
746     }
747     return(msg->ms_error);
748 }
749
750 /*
751  * lwkt_thread_waitport()
752  *
753  *      Wait for a new message to be available on the port.  We must be the
754  *      the only thread waiting on the port.  The port must be owned by caller.
755  */
756 static
757 void *
758 lwkt_thread_waitport(lwkt_port_t port, int flags)
759 {
760     thread_t td = curthread;
761     lwkt_msg_t msg;
762     int error;
763
764     KKASSERT(port->mpu_td == td);
765     crit_enter_quick(td);
766     while ((msg = _lwkt_pollmsg(port)) == NULL) {
767         port->mp_flags |= MSGPORTF_WAITING;
768         error = lwkt_sleep("waitport", flags);
769         port->mp_flags &= ~MSGPORTF_WAITING;
770         if (error)
771             goto done;
772     }
773     _lwkt_pullmsg(port, msg);
774 done:
775     crit_exit_quick(td);
776     return(msg);
777 }
778
779 /************************************************************************
780  *                         SPIN PORT BACKEND                            *
781  ************************************************************************
782  *
783  * This backend uses spinlocks instead of making assumptions about which
784  * thread is accessing the port.  It must be used when a port is not owned
785  * by a particular thread.  This is less optimal then thread ports but
786  * you don't have a choice if there are multiple threads accessing the port.
787  *
788  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
789  * on the message port, it is the responsibility of the code doing the
790  * wakeup to clear this flag rather then the blocked threads.  Some
791  * superfluous wakeups may occur, which is ok.
792  *
793  * XXX synchronous message wakeups are not current optimized.
794  */
795
796 static
797 void *
798 lwkt_spin_getport(lwkt_port_t port)
799 {
800     lwkt_msg_t msg;
801
802     spin_lock(&port->mpu_spin);
803     if ((msg = _lwkt_pollmsg(port)) != NULL)
804         _lwkt_pullmsg(port, msg);
805     spin_unlock(&port->mpu_spin);
806     return(msg);
807 }
808
809 static
810 int
811 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
812 {
813     int dowakeup;
814
815     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
816
817     msg->ms_target_port = port;
818     spin_lock(&port->mpu_spin);
819     _lwkt_pushmsg(port, msg);
820     dowakeup = 0;
821     if (port->mp_flags & MSGPORTF_WAITING) {
822         port->mp_flags &= ~MSGPORTF_WAITING;
823         dowakeup = 1;
824     }
825     spin_unlock(&port->mpu_spin);
826     if (dowakeup)
827         wakeup(port);
828     return (EASYNC);
829 }
830
831 static
832 int
833 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
834 {
835     lwkt_port_t port;
836     int sentabort;
837     int error;
838
839     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
840             ("can't wait dropable message"));
841     port = msg->ms_reply_port;
842
843     if ((msg->ms_flags & MSGF_DONE) == 0) {
844         sentabort = 0;
845         spin_lock(&port->mpu_spin);
846         while ((msg->ms_flags & MSGF_DONE) == 0) {
847             void *won;
848
849             /*
850              * If message was sent synchronously from the beginning
851              * the wakeup will be on the message structure, else it
852              * will be on the port structure.
853              *
854              * ms_flags needs atomic op originator vs target MSGF_QUEUED
855              */
856             if (msg->ms_flags & MSGF_SYNC) {
857                 won = msg;
858                 atomic_set_int(&msg->ms_flags, MSGF_WAITING);
859             } else {
860                 won = port;
861                 port->mp_flags |= MSGPORTF_WAITING;
862             }
863
864             /*
865              * Only messages which support abort can be interrupted.
866              * We must still wait for message completion regardless.
867              */
868             if ((flags & PCATCH) && sentabort == 0) {
869                 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
870                 if (error) {
871                     sentabort = error;
872                     spin_unlock(&port->mpu_spin);
873                     lwkt_abortmsg(msg);
874                     spin_lock(&port->mpu_spin);
875                 }
876             } else {
877                 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
878             }
879             /* see note at the top on the MSGPORTF_WAITING flag */
880         }
881         /*
882          * Turn EINTR into ERESTART if the signal indicates.
883          */
884         if (sentabort && msg->ms_error == EINTR)
885             msg->ms_error = sentabort;
886         if (msg->ms_flags & MSGF_QUEUED)
887             _lwkt_pullmsg(port, msg);
888         spin_unlock(&port->mpu_spin);
889     } else {
890         spin_lock(&port->mpu_spin);
891         if (msg->ms_flags & MSGF_QUEUED) {
892             _lwkt_pullmsg(port, msg);
893         }
894         spin_unlock(&port->mpu_spin);
895     }
896     return(msg->ms_error);
897 }
898
899 static
900 void *
901 lwkt_spin_waitport(lwkt_port_t port, int flags)
902 {
903     lwkt_msg_t msg;
904     int error;
905
906     spin_lock(&port->mpu_spin);
907     while ((msg = _lwkt_pollmsg(port)) == NULL) {
908         port->mp_flags |= MSGPORTF_WAITING;
909         error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
910         /* see note at the top on the MSGPORTF_WAITING flag */
911         if (error) {
912             spin_unlock(&port->mpu_spin);
913             return(NULL);
914         }
915     }
916     _lwkt_pullmsg(port, msg);
917     spin_unlock(&port->mpu_spin);
918     return(msg);
919 }
920
921 static
922 void
923 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
924 {
925     int dowakeup;
926
927     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
928
929     if (msg->ms_flags & MSGF_SYNC) {
930         /*
931          * If a synchronous completion has been requested, just wakeup
932          * the message without bothering to queue it to the target port.
933          *
934          * ms_flags protected by reply port spinlock
935          */
936         spin_lock(&port->mpu_spin);
937         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
938         dowakeup = 0;
939         if (msg->ms_flags & MSGF_WAITING) {
940                 msg->ms_flags &= ~MSGF_WAITING;
941                 dowakeup = 1;
942         }
943         spin_unlock(&port->mpu_spin);
944         if (dowakeup)
945                 wakeup(msg);
946     } else {
947         /*
948          * If an asynchronous completion has been requested the message
949          * must be queued to the reply port.
950          */
951         spin_lock(&port->mpu_spin);
952         _lwkt_enqueue_reply(port, msg);
953         dowakeup = 0;
954         if (port->mp_flags & MSGPORTF_WAITING) {
955             port->mp_flags &= ~MSGPORTF_WAITING;
956             dowakeup = 1;
957         }
958         spin_unlock(&port->mpu_spin);
959         if (dowakeup)
960             wakeup(port);
961     }
962 }
963
964 /*
965  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
966  *
967  * This function could _only_ be used when caller is in the same thread
968  * as the message's target port owner thread.
969  */
970 static int
971 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
972 {
973     int error;
974
975     KASSERT(port->mpu_td == curthread,
976             ("message could only be dropped in the same thread "
977              "as the message target port thread\n"));
978     spin_lock(&port->mpu_spin);
979     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
980             _lwkt_pullmsg(port, msg);
981             msg->ms_flags |= MSGF_DONE;
982             error = 0;
983     } else {
984             error = ENOENT;
985     }
986     spin_unlock(&port->mpu_spin);
987
988     return (error);
989 }
990
991 /************************************************************************
992  *                        SERIALIZER PORT BACKEND                       *
993  ************************************************************************
994  *
995  * This backend uses serializer to protect port accessing.  Callers are
996  * assumed to have serializer held.  This kind of port is usually created
997  * by network device driver along with _one_ lwkt thread to pipeline
998  * operations which may temporarily release serializer.
999  *
1000  * Implementation is based on SPIN PORT BACKEND.
1001  */
1002
1003 static
1004 void *
1005 lwkt_serialize_getport(lwkt_port_t port)
1006 {
1007     lwkt_msg_t msg;
1008
1009     ASSERT_SERIALIZED(port->mpu_serialize);
1010
1011     if ((msg = _lwkt_pollmsg(port)) != NULL)
1012         _lwkt_pullmsg(port, msg);
1013     return(msg);
1014 }
1015
1016 static
1017 int
1018 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1019 {
1020     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1021     ASSERT_SERIALIZED(port->mpu_serialize);
1022
1023     msg->ms_target_port = port;
1024     _lwkt_pushmsg(port, msg);
1025     if (port->mp_flags & MSGPORTF_WAITING) {
1026         port->mp_flags &= ~MSGPORTF_WAITING;
1027         wakeup(port);
1028     }
1029     return (EASYNC);
1030 }
1031
1032 static
1033 int
1034 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1035 {
1036     lwkt_port_t port;
1037     int sentabort;
1038     int error;
1039
1040     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1041             ("can't wait dropable message"));
1042
1043     if ((msg->ms_flags & MSGF_DONE) == 0) {
1044         port = msg->ms_reply_port;
1045
1046         ASSERT_SERIALIZED(port->mpu_serialize);
1047
1048         sentabort = 0;
1049         while ((msg->ms_flags & MSGF_DONE) == 0) {
1050             void *won;
1051
1052             /*
1053              * If message was sent synchronously from the beginning
1054              * the wakeup will be on the message structure, else it
1055              * will be on the port structure.
1056              */
1057             if (msg->ms_flags & MSGF_SYNC) {
1058                 won = msg;
1059             } else {
1060                 won = port;
1061                 port->mp_flags |= MSGPORTF_WAITING;
1062             }
1063
1064             /*
1065              * Only messages which support abort can be interrupted.
1066              * We must still wait for message completion regardless.
1067              */
1068             if ((flags & PCATCH) && sentabort == 0) {
1069                 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1070                 if (error) {
1071                     sentabort = error;
1072                     lwkt_serialize_exit(port->mpu_serialize);
1073                     lwkt_abortmsg(msg);
1074                     lwkt_serialize_enter(port->mpu_serialize);
1075                 }
1076             } else {
1077                 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1078             }
1079             /* see note at the top on the MSGPORTF_WAITING flag */
1080         }
1081         /*
1082          * Turn EINTR into ERESTART if the signal indicates.
1083          */
1084         if (sentabort && msg->ms_error == EINTR)
1085             msg->ms_error = sentabort;
1086         if (msg->ms_flags & MSGF_QUEUED)
1087             _lwkt_pullmsg(port, msg);
1088     } else {
1089         if (msg->ms_flags & MSGF_QUEUED) {
1090             port = msg->ms_reply_port;
1091
1092             ASSERT_SERIALIZED(port->mpu_serialize);
1093             _lwkt_pullmsg(port, msg);
1094         }
1095     }
1096     return(msg->ms_error);
1097 }
1098
1099 static
1100 void *
1101 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1102 {
1103     lwkt_msg_t msg;
1104     int error;
1105
1106     ASSERT_SERIALIZED(port->mpu_serialize);
1107
1108     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1109         port->mp_flags |= MSGPORTF_WAITING;
1110         error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1111         /* see note at the top on the MSGPORTF_WAITING flag */
1112         if (error)
1113             return(NULL);
1114     }
1115     _lwkt_pullmsg(port, msg);
1116     return(msg);
1117 }
1118
1119 static
1120 void
1121 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1122 {
1123     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1124     ASSERT_SERIALIZED(port->mpu_serialize);
1125
1126     if (msg->ms_flags & MSGF_SYNC) {
1127         /*
1128          * If a synchronous completion has been requested, just wakeup
1129          * the message without bothering to queue it to the target port.
1130          *
1131          * (both sides synchronized via serialized reply port)
1132          */
1133         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1134         wakeup(msg);
1135     } else {
1136         /*
1137          * If an asynchronous completion has been requested the message
1138          * must be queued to the reply port.
1139          */
1140         _lwkt_enqueue_reply(port, msg);
1141         if (port->mp_flags & MSGPORTF_WAITING) {
1142             port->mp_flags &= ~MSGPORTF_WAITING;
1143             wakeup(port);
1144         }
1145     }
1146 }
1147
1148 /************************************************************************
1149  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
1150  ************************************************************************/
1151
1152 /*
1153  * You can point a port's reply vector at this function if you just want
1154  * the message marked done, without any queueing or signaling.  This is
1155  * often used for structure-embedded messages.
1156  */
1157 static
1158 void
1159 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1160 {
1161     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1162 }
1163
1164 static
1165 void *
1166 lwkt_panic_getport(lwkt_port_t port)
1167 {
1168     panic("lwkt_getport() illegal on port %p", port);
1169 }
1170
1171 static
1172 int
1173 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1174 {
1175     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1176 }
1177
1178 static
1179 int
1180 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1181 {
1182     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1183 }
1184
1185 static
1186 void *
1187 lwkt_panic_waitport(lwkt_port_t port, int flags)
1188 {
1189     panic("port %p cannot be waited on", port);
1190 }
1191
1192 static
1193 void
1194 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1195 {
1196     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1197 }
1198
1199 static
1200 int
1201 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1202 {
1203     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1204     /* NOT REACHED */
1205     return (ENOENT);
1206 }