Merge from vendor branch OPENSSL:
[dragonfly.git] / sys / kern / lwkt_msgport.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.47 2008/09/09 07:21:54 dillon Exp $
38  */
39
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76
77 /************************************************************************
78  *                              MESSAGE FUNCTIONS                       *
79  ************************************************************************/
80
81 /*
82  * lwkt_sendmsg()
83  *
84  *      Request asynchronous completion and call lwkt_beginmsg().  The
85  *      target port can opt to execute the message synchronously or
86  *      asynchronously and this function will automatically queue the
87  *      response if the target executes the message synchronously.
88  *
89  *      NOTE: The message is in an indeterminant state until this call
90  *      returns.  The caller should not mess with it (e.g. try to abort it)
91  *      until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97
98     KKASSERT(msg->ms_reply_port != NULL &&
99              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102         lwkt_replymsg(msg, error);
103     }
104 }
105
106 /*
107  * lwkt_domsg()
108  *
109  *      Request asynchronous completion and call lwkt_beginmsg().  The
110  *      target port can opt to execute the message synchronously or
111  *      asynchronously and this function will automatically queue the
112  *      response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118
119     KKASSERT(msg->ms_reply_port != NULL &&
120              (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124         error = lwkt_waitmsg(msg, flags);
125     } else {
126         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {   
139     int error;
140
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144         lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176         if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177             msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181
182 /************************************************************************
183  *                      PORT INITIALIZATION API                         *
184  ************************************************************************/
185
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191
192 static void *lwkt_spin_getport(lwkt_port_t port);
193 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
194 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
195 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
196 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
197
198 static void *lwkt_serialize_getport(lwkt_port_t port);
199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
203
204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_panic_getport(lwkt_port_t port);
206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
210
211 /*
212  * Core port initialization (internal)
213  */
214 static __inline
215 void
216 _lwkt_initport(lwkt_port_t port,
217                void *(*gportfn)(lwkt_port_t),
218                int (*pportfn)(lwkt_port_t, lwkt_msg_t),
219                int (*wmsgfn)(lwkt_msg_t, int),
220                void *(*wportfn)(lwkt_port_t, int),
221                void (*rportfn)(lwkt_port_t, lwkt_msg_t))
222 {
223     bzero(port, sizeof(*port));
224     TAILQ_INIT(&port->mp_msgq);
225     port->mp_getport = gportfn;
226     port->mp_putport = pportfn;
227     port->mp_waitmsg =  wmsgfn;
228     port->mp_waitport =  wportfn;
229     port->mp_replyport = rportfn;
230 }
231
232 /*
233  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
234  * we tell the scheduler not to reschedule if td is at a higher priority.
235  *
236  * This routine is called even if the thread is already scheduled so messages
237  * without NORESCHED will cause the target thread to be rescheduled even if
238  * prior messages did not.
239  */
240 static __inline
241 void
242 _lwkt_schedule_msg(thread_t td, int flags)
243 {
244         if (flags & MSGF_NORESCHED)
245             lwkt_schedule_noresched(td);
246         else
247             lwkt_schedule(td);
248 }
249
250 /*
251  * lwkt_initport_thread()
252  *
253  *      Initialize a port for use by a particular thread.  The port may
254  *      only be used by <td>.
255  */
256 void
257 lwkt_initport_thread(lwkt_port_t port, thread_t td)
258 {
259     _lwkt_initport(port,
260                    lwkt_thread_getport,
261                    lwkt_thread_putport,
262                    lwkt_thread_waitmsg,
263                    lwkt_thread_waitport,
264                    lwkt_thread_replyport);
265     port->mpu_td = td;
266 }
267
268 /*
269  * lwkt_initport_spin()
270  *
271  *      Initialize a port for use with descriptors that might be accessed
272  *      via multiple LWPs, processes, or threads.  Has somewhat more
273  *      overhead then thread ports.
274  */
275 void
276 lwkt_initport_spin(lwkt_port_t port)
277 {
278     _lwkt_initport(port,
279                    lwkt_spin_getport,
280                    lwkt_spin_putport,
281                    lwkt_spin_waitmsg,
282                    lwkt_spin_waitport,
283                    lwkt_spin_replyport);
284     spin_init(&port->mpu_spin);
285 }
286
287 void
288 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
289 {
290     _lwkt_initport(port,
291                    lwkt_serialize_getport,
292                    lwkt_serialize_putport,
293                    lwkt_serialize_waitmsg,
294                    lwkt_serialize_waitport,
295                    lwkt_serialize_replyport);
296     port->mpu_serialize = slz;
297 }
298
299 /*
300  * Similar to the standard initport, this function simply marks the message
301  * as being done and does not attempt to return it to an originating port.
302  */
303 void
304 lwkt_initport_replyonly_null(lwkt_port_t port)
305 {
306     _lwkt_initport(port,
307                    lwkt_panic_getport,
308                    lwkt_panic_putport,
309                    lwkt_panic_waitmsg,
310                    lwkt_panic_waitport,
311                    lwkt_null_replyport);
312 }
313
314 /*
315  * Initialize a reply-only port, typically used as a message sink.  Such
316  * ports can only be used as a reply port.
317  */
318 void
319 lwkt_initport_replyonly(lwkt_port_t port,
320                         void (*rportfn)(lwkt_port_t, lwkt_msg_t))
321 {
322     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
323                          lwkt_panic_waitmsg, lwkt_panic_waitport,
324                          rportfn);
325 }
326
327 void
328 lwkt_initport_putonly(lwkt_port_t port,
329                       int (*pportfn)(lwkt_port_t, lwkt_msg_t))
330 {
331     _lwkt_initport(port, lwkt_panic_getport, pportfn,
332                          lwkt_panic_waitmsg, lwkt_panic_waitport,
333                          lwkt_panic_replyport);
334 }
335
336 void
337 lwkt_initport_panic(lwkt_port_t port)
338 {
339     _lwkt_initport(port,
340                    lwkt_panic_getport, lwkt_panic_putport,
341                    lwkt_panic_waitmsg, lwkt_panic_waitport,
342                    lwkt_panic_replyport);
343 }
344
345 /*
346  * lwkt_getport()
347  *
348  *      Retrieve the next message from the port's message queue, return NULL
349  *      if no messages are pending.  The retrieved message will either be a
350  *      request or a reply based on the MSGF_REPLY bit.
351  *
352  *      The calling thread MUST own the port.
353  */
354
355 static __inline
356 void
357 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
358 {
359     /*
360      * normal case, remove and return the message.
361      */
362     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
363     msg->ms_flags &= ~MSGF_QUEUED;
364 }
365
366 /************************************************************************
367  *                      THREAD PORT BACKEND                             *
368  ************************************************************************
369  *
370  * This backend is used when the port a message is retrieved from is owned
371  * by a single thread (the calling thread).  Messages are IPId to the
372  * correct cpu before being enqueued to a port.  Note that this is fairly
373  * optimal since scheduling would have had to do an IPI anyway if the
374  * message were headed to a different cpu.
375  */
376
377 #ifdef SMP
378
379 /*
380  * This function completes reply processing for the default case in the
381  * context of the originating cpu.
382  */
383 static
384 void
385 lwkt_thread_replyport_remote(lwkt_msg_t msg)
386 {
387     lwkt_port_t port = msg->ms_reply_port;
388     int flags;
389
390     /*
391      * Chase any thread migration that occurs
392      */
393     if (port->mpu_td->td_gd != mycpu) {
394         lwkt_send_ipiq(port->mpu_td->td_gd,
395                        (ipifunc1_t)lwkt_thread_replyport_remote, msg);
396         return;
397     }
398
399     /*
400      * Cleanup
401      */
402 #ifdef INVARIANTS
403     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
404     msg->ms_flags &= ~MSGF_INTRANSIT;
405 #endif
406     flags = msg->ms_flags;
407     if (msg->ms_flags & MSGF_SYNC) {
408         cpu_sfence();
409         msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
410     } else {
411         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
412         msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
413     }
414     if (port->mp_flags & MSGPORTF_WAITING)
415         _lwkt_schedule_msg(port->mpu_td, flags);
416 }
417
418 #endif
419
420 /*
421  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
422  *
423  * Called with the reply port as an argument but in the context of the
424  * original target port.  Completion must occur on the target port's
425  * cpu.
426  *
427  * The critical section protects us from IPIs on the this CPU.
428  */
429 void
430 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
431 {
432     int flags;
433
434     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
435
436     if (msg->ms_flags & MSGF_SYNC) {
437         /*
438          * If a synchronous completion has been requested, just wakeup
439          * the message without bothering to queue it to the target port.
440          *
441          * Assume the target thread is non-preemptive, so no critical
442          * section is required.
443          */
444 #ifdef SMP
445         if (port->mpu_td->td_gd == mycpu) {
446 #endif
447             flags = msg->ms_flags;
448             cpu_sfence();
449             msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
450             if (port->mp_flags & MSGPORTF_WAITING)
451                 _lwkt_schedule_msg(port->mpu_td, flags);
452 #ifdef SMP
453         } else {
454 #ifdef INVARIANTS
455             msg->ms_flags |= MSGF_INTRANSIT;
456 #endif
457             msg->ms_flags |= MSGF_REPLY;
458             lwkt_send_ipiq(port->mpu_td->td_gd,
459                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
460         }
461 #endif
462     } else {
463         /*
464          * If an asynchronous completion has been requested the message
465          * must be queued to the reply port.
466          *
467          * A critical section is required to interlock the port queue.
468          */
469 #ifdef SMP
470         if (port->mpu_td->td_gd == mycpu) {
471 #endif
472             crit_enter();
473             TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
474             msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
475             if (port->mp_flags & MSGPORTF_WAITING)
476                 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
477             crit_exit();
478 #ifdef SMP
479         } else {
480 #ifdef INVARIANTS
481             msg->ms_flags |= MSGF_INTRANSIT;
482 #endif
483             msg->ms_flags |= MSGF_REPLY;
484             lwkt_send_ipiq(port->mpu_td->td_gd,
485                            (ipifunc1_t)lwkt_thread_replyport_remote, msg);
486         }
487 #endif
488     }
489 }
490
491 /*
492  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
493  *
494  * Called with the target port as an argument but in the context of the
495  * reply port.  This function always implements an asynchronous put to
496  * the target message port, and thus returns EASYNC.
497  *
498  * The message must already have cleared MSGF_DONE and MSGF_REPLY
499  */
500
501 #ifdef SMP
502
503 static
504 void
505 lwkt_thread_putport_remote(lwkt_msg_t msg)
506 {
507     lwkt_port_t port = msg->ms_target_port;
508
509     /*
510      * Chase any thread migration that occurs
511      */
512     if (port->mpu_td->td_gd != mycpu) {
513         lwkt_send_ipiq(port->mpu_td->td_gd,
514                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
515         return;
516     }
517
518     /*
519      * Cleanup
520      */
521 #ifdef INVARIANTS
522     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
523     msg->ms_flags &= ~MSGF_INTRANSIT;
524 #endif
525     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
526     msg->ms_flags |= MSGF_QUEUED;
527     if (port->mp_flags & MSGPORTF_WAITING)
528         _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
529 }
530
531 #endif
532
533 static
534 int
535 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
536 {
537     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
538
539     msg->ms_target_port = port;
540 #ifdef SMP
541     if (port->mpu_td->td_gd == mycpu) {
542 #endif
543         crit_enter();
544         msg->ms_flags |= MSGF_QUEUED;
545         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
546         if (port->mp_flags & MSGPORTF_WAITING)
547             _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
548         crit_exit();
549 #ifdef SMP
550     } else {
551 #ifdef INVARIANTS
552         msg->ms_flags |= MSGF_INTRANSIT;
553 #endif
554         lwkt_send_ipiq(port->mpu_td->td_gd,
555                         (ipifunc1_t)lwkt_thread_putport_remote, msg);
556     }
557 #endif
558     return (EASYNC);
559 }
560
561 /*
562  * lwkt_thread_getport()
563  *
564  *      Retrieve the next message from the port or NULL if no messages
565  *      are ready.
566  */
567 void *
568 lwkt_thread_getport(lwkt_port_t port)
569 {
570     lwkt_msg_t msg;
571
572     KKASSERT(port->mpu_td == curthread);
573
574     crit_enter_quick(port->mpu_td);
575     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
576         _lwkt_pullmsg(port, msg);
577     crit_exit_quick(port->mpu_td);
578     return(msg);
579 }
580
581 /*
582  * lwkt_thread_waitmsg()
583  *
584  *      Wait for a particular message to be replied.  We must be the only
585  *      thread waiting on the message.  The port must be owned by the
586  *      caller.
587  */
588 int
589 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
590 {
591     if ((msg->ms_flags & MSGF_DONE) == 0) {
592         /*
593          * If the done bit was not set we have to block until it is.
594          */
595         lwkt_port_t port = msg->ms_reply_port;
596         thread_t td = curthread;
597         int sentabort;
598
599         KKASSERT(port->mpu_td == td);
600         crit_enter_quick(td);
601         sentabort = 0;
602
603         while ((msg->ms_flags & MSGF_DONE) == 0) {
604             port->mp_flags |= MSGPORTF_WAITING;
605             if (sentabort == 0) {
606                 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
607                     lwkt_abortmsg(msg);
608                 }
609             } else {
610                 lwkt_sleep("waitabt", 0);
611             }
612             port->mp_flags &= ~MSGPORTF_WAITING;
613         }
614         if (msg->ms_flags & MSGF_QUEUED)
615             _lwkt_pullmsg(port, msg);
616         crit_exit_quick(td);
617     } else {
618         /*
619          * If the done bit was set we only have to mess around with the
620          * message if it is queued on the reply port.
621          */
622         if (msg->ms_flags & MSGF_QUEUED) {
623             lwkt_port_t port = msg->ms_reply_port;
624             thread_t td = curthread;
625
626             KKASSERT(port->mpu_td == td);
627             crit_enter_quick(td);
628             _lwkt_pullmsg(port, msg);
629             crit_exit_quick(td);
630         }
631     }
632     return(msg->ms_error);
633 }
634
635 void *
636 lwkt_thread_waitport(lwkt_port_t port, int flags)
637 {
638     thread_t td = curthread;
639     lwkt_msg_t msg;
640     int error;
641
642     KKASSERT(port->mpu_td == td);
643     crit_enter_quick(td);
644     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
645         port->mp_flags |= MSGPORTF_WAITING;
646         error = lwkt_sleep("waitport", flags);
647         port->mp_flags &= ~MSGPORTF_WAITING;
648         if (error)
649                 goto done;
650     }
651     _lwkt_pullmsg(port, msg);
652 done:
653     crit_exit_quick(td);
654     return(msg);
655 }
656
657 /************************************************************************
658  *                         SPIN PORT BACKEND                            *
659  ************************************************************************
660  *
661  * This backend uses spinlocks instead of making assumptions about which
662  * thread is accessing the port.  It must be used when a port is not owned
663  * by a particular thread.  This is less optimal then thread ports but
664  * you don't have a choice if there are multiple threads accessing the port.
665  *
666  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
667  * on the message port, it is the responsibility of the code doing the
668  * wakeup to clear this flag rather then the blocked threads.  Some
669  * superfluous wakeups may occur, which is ok.
670  *
671  * XXX synchronous message wakeups are not current optimized.
672  */
673
674 static
675 void *
676 lwkt_spin_getport(lwkt_port_t port)
677 {
678     lwkt_msg_t msg;
679
680     spin_lock_wr(&port->mpu_spin);
681     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
682         _lwkt_pullmsg(port, msg);
683     spin_unlock_wr(&port->mpu_spin);
684     return(msg);
685 }
686
687 static
688 int
689 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
690 {
691     int dowakeup;
692
693     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
694
695     msg->ms_target_port = port;
696     spin_lock_wr(&port->mpu_spin);
697     msg->ms_flags |= MSGF_QUEUED;
698     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
699     dowakeup = 0;
700     if (port->mp_flags & MSGPORTF_WAITING) {
701         port->mp_flags &= ~MSGPORTF_WAITING;
702         dowakeup = 1;
703     }
704     spin_unlock_wr(&port->mpu_spin);
705     if (dowakeup)
706         wakeup(port);
707     return (EASYNC);
708 }
709
710 static
711 int
712 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
713 {
714     lwkt_port_t port;
715     int sentabort;
716     int error;
717
718     if ((msg->ms_flags & MSGF_DONE) == 0) {
719         port = msg->ms_reply_port;
720         sentabort = 0;
721         spin_lock_wr(&port->mpu_spin);
722         while ((msg->ms_flags & MSGF_DONE) == 0) {
723             void *won;
724
725             /*
726              * If message was sent synchronously from the beginning
727              * the wakeup will be on the message structure, else it
728              * will be on the port structure.
729              */
730             if (msg->ms_flags & MSGF_SYNC) {
731                 won = msg;
732             } else {
733                 won = port;
734                 port->mp_flags |= MSGPORTF_WAITING;
735             }
736
737             /*
738              * Only messages which support abort can be interrupted.
739              * We must still wait for message completion regardless.
740              */
741             if ((flags & PCATCH) && sentabort == 0) {
742                 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
743                 if (error) {
744                     sentabort = error;
745                     spin_unlock_wr(&port->mpu_spin);
746                     lwkt_abortmsg(msg);
747                     spin_lock_wr(&port->mpu_spin);
748                 }
749             } else {
750                 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
751             }
752             /* see note at the top on the MSGPORTF_WAITING flag */
753         }
754         /*
755          * Turn EINTR into ERESTART if the signal indicates.
756          */
757         if (sentabort && msg->ms_error == EINTR)
758             msg->ms_error = sentabort;
759         if (msg->ms_flags & MSGF_QUEUED)
760                 _lwkt_pullmsg(port, msg);
761         spin_unlock_wr(&port->mpu_spin);
762     } else {
763         if (msg->ms_flags & MSGF_QUEUED) {
764             port = msg->ms_reply_port;
765             spin_lock_wr(&port->mpu_spin);
766             _lwkt_pullmsg(port, msg);
767             spin_unlock_wr(&port->mpu_spin);
768         }
769     }
770     return(msg->ms_error);
771 }
772
773 static
774 void *
775 lwkt_spin_waitport(lwkt_port_t port, int flags)
776 {
777     lwkt_msg_t msg;
778     int error;
779
780     spin_lock_wr(&port->mpu_spin);
781     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
782         port->mp_flags |= MSGPORTF_WAITING;
783         error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
784         /* see note at the top on the MSGPORTF_WAITING flag */
785         if (error) {
786             spin_unlock_wr(&port->mpu_spin);
787             return(NULL);
788         }
789     }
790     _lwkt_pullmsg(port, msg);
791     spin_unlock_wr(&port->mpu_spin);
792     return(msg);
793 }
794
795 static
796 void
797 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
798 {
799     int dowakeup;
800
801     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
802
803     if (msg->ms_flags & MSGF_SYNC) {
804         /*
805          * If a synchronous completion has been requested, just wakeup
806          * the message without bothering to queue it to the target port.
807          */
808         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
809         wakeup(msg);
810     } else {
811         /*
812          * If an asynchronous completion has been requested the message
813          * must be queued to the reply port.
814          */
815         spin_lock_wr(&port->mpu_spin);
816         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
817         msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
818         dowakeup = 0;
819         if (port->mp_flags & MSGPORTF_WAITING) {
820             port->mp_flags &= ~MSGPORTF_WAITING;
821             dowakeup = 1;
822         }
823         spin_unlock_wr(&port->mpu_spin);
824         if (dowakeup)
825             wakeup(port);
826     }
827 }
828
829 /************************************************************************
830  *                        SERIALIZER PORT BACKEND                       *
831  ************************************************************************
832  *
833  * This backend uses serializer to protect port accessing.  Callers are
834  * assumed to have serializer held.  This kind of port is usually created
835  * by network device driver along with _one_ lwkt thread to pipeline
836  * operations which may temporarily release serializer.
837  *
838  * Implementation is based on SPIN PORT BACKEND.
839  */
840
841 static
842 void *
843 lwkt_serialize_getport(lwkt_port_t port)
844 {
845     lwkt_msg_t msg;
846
847     ASSERT_SERIALIZED(port->mpu_serialize);
848
849     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
850         _lwkt_pullmsg(port, msg);
851     return(msg);
852 }
853
854 static
855 int
856 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
857 {
858     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
859     ASSERT_SERIALIZED(port->mpu_serialize);
860
861     msg->ms_target_port = port;
862     msg->ms_flags |= MSGF_QUEUED;
863     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
864     if (port->mp_flags & MSGPORTF_WAITING) {
865         port->mp_flags &= ~MSGPORTF_WAITING;
866         wakeup(port);
867     }
868     return (EASYNC);
869 }
870
871 static
872 int
873 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
874 {
875     lwkt_port_t port;
876     int sentabort;
877     int error;
878
879     if ((msg->ms_flags & MSGF_DONE) == 0) {
880         port = msg->ms_reply_port;
881
882         ASSERT_SERIALIZED(port->mpu_serialize);
883
884         sentabort = 0;
885         while ((msg->ms_flags & MSGF_DONE) == 0) {
886             void *won;
887
888             /*
889              * If message was sent synchronously from the beginning
890              * the wakeup will be on the message structure, else it
891              * will be on the port structure.
892              */
893             if (msg->ms_flags & MSGF_SYNC) {
894                 won = msg;
895             } else {
896                 won = port;
897                 port->mp_flags |= MSGPORTF_WAITING;
898             }
899
900             /*
901              * Only messages which support abort can be interrupted.
902              * We must still wait for message completion regardless.
903              */
904             if ((flags & PCATCH) && sentabort == 0) {
905                 error = serialize_sleep(won, port->mpu_serialize, PCATCH,
906                                         "waitmsg", 0);
907                 if (error) {
908                     sentabort = error;
909                     lwkt_serialize_exit(port->mpu_serialize);
910                     lwkt_abortmsg(msg);
911                     lwkt_serialize_enter(port->mpu_serialize);
912                 }
913             } else {
914                 error = serialize_sleep(won, port->mpu_serialize, 0,
915                                         "waitmsg", 0);
916             }
917             /* see note at the top on the MSGPORTF_WAITING flag */
918         }
919         /*
920          * Turn EINTR into ERESTART if the signal indicates.
921          */
922         if (sentabort && msg->ms_error == EINTR)
923             msg->ms_error = sentabort;
924         if (msg->ms_flags & MSGF_QUEUED)
925                 _lwkt_pullmsg(port, msg);
926     } else {
927         if (msg->ms_flags & MSGF_QUEUED) {
928             port = msg->ms_reply_port;
929
930             ASSERT_SERIALIZED(port->mpu_serialize);
931             _lwkt_pullmsg(port, msg);
932         }
933     }
934     return(msg->ms_error);
935 }
936
937 static
938 void *
939 lwkt_serialize_waitport(lwkt_port_t port, int flags)
940 {
941     lwkt_msg_t msg;
942     int error;
943
944     ASSERT_SERIALIZED(port->mpu_serialize);
945
946     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
947         port->mp_flags |= MSGPORTF_WAITING;
948         error = serialize_sleep(port, port->mpu_serialize, flags,
949                                 "waitport", 0);
950         /* see note at the top on the MSGPORTF_WAITING flag */
951         if (error)
952             return(NULL);
953     }
954     _lwkt_pullmsg(port, msg);
955     return(msg);
956 }
957
958 static
959 void
960 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
961 {
962     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
963     ASSERT_SERIALIZED(port->mpu_serialize);
964
965     if (msg->ms_flags & MSGF_SYNC) {
966         /*
967          * If a synchronous completion has been requested, just wakeup
968          * the message without bothering to queue it to the target port.
969          */
970         msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
971         wakeup(msg);
972     } else {
973         /*
974          * If an asynchronous completion has been requested the message
975          * must be queued to the reply port.
976          */
977         TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
978         msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
979         if (port->mp_flags & MSGPORTF_WAITING) {
980             port->mp_flags &= ~MSGPORTF_WAITING;
981             wakeup(port);
982         }
983     }
984 }
985
986 /************************************************************************
987  *                   PANIC AND SPECIAL PORT FUNCTIONS                   *
988  ************************************************************************/
989
990 /*
991  * You can point a port's reply vector at this function if you just want
992  * the message marked done, without any queueing or signaling.  This is
993  * often used for structure-embedded messages.
994  */
995 static
996 void
997 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
998 {
999     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1000 }
1001
1002 static
1003 void *
1004 lwkt_panic_getport(lwkt_port_t port)
1005 {
1006     panic("lwkt_getport() illegal on port %p", port);
1007 }
1008
1009 static
1010 int
1011 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1012 {
1013     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1014 }
1015
1016 static
1017 int
1018 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1019 {
1020     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1021 }
1022
1023 static
1024 void *
1025 lwkt_panic_waitport(lwkt_port_t port, int flags)
1026 {
1027     panic("port %p cannot be waited on", port);
1028 }
1029
1030 static
1031 void
1032 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1033 {
1034     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1035 }
1036