Revamp the initial lwkt_abortmsg() support to normalize the abstraction. Now
authorMatthew Dillon <dillon@dragonflybsd.org>
Tue, 20 Apr 2004 01:52:28 +0000 (01:52 +0000)
committerMatthew Dillon <dillon@dragonflybsd.org>
Tue, 20 Apr 2004 01:52:28 +0000 (01:52 +0000)
a message's primary command is always processed by the target even if an
abort is requested before the target has retrieved the message from the
message port.  The message will then be requeued and the abort command copied
into lwkt_msg_t->ms_cmd.  Thus the target is always guarenteed to see the
original message and then a second, abort message (the same message with
ms_cmd = ms_abort) regardless of whether the abort was requested before
or after the target retrieved the original message.

ms_cmd is now an opaque union.  LWKT makes no assumptions as to its contents.
The NET code now stores nm_handler in ms_cmd as a function vector, and
nm_handler has been removed from all netmsg structures.

The ms_cmd function vector support nominally returns an integer error code
which is intended to support synchronous/asynchronous optimizations in the
future (to bypass messaging queueing and dequeueing in those situations
where they can be bypassed, without messing up the messaging abstraction).

The connect() predicate for which signal/abort support was added in the last
commit now uses the new abort mechanism.  Instead of having the handler
function check whether a message represents an abort or not, a different
handler vector is stored in ms_abort and run when an abort is processed
(making for an easy separation of function).

The large netmsg switch has been replaced by individual function vectors
using the new ms_cmd function vector support.  This will soon be removed
entirely in favor of direct assignment of LWKT-aware PRU vectors to the
messages command vector.

NOTE ADDITIONAL: eventually the SYSCALL, VFS, and DEV interfaces will use
the new message opaque ms_cmd 'function vector' support instead of a
command index.

Work by: Matthew Dillon and Jeffrey Hsu

18 files changed:
sys/i386/i386/trap.c
sys/kern/kern_device.c
sys/kern/lwkt_msgport.c
sys/kern/subr_disk.c
sys/kern/tty_cons.c
sys/kern/uipc_msg.c
sys/kern/uipc_socket2.c
sys/kern/uipc_syscalls.c
sys/net/netisr.c
sys/net/netisr.h
sys/net/netmsg.h
sys/netinet/ip_demux.c
sys/netinet/ip_input.c
sys/netinet/tcp_subr.c
sys/netinet/tcp_usrreq.c
sys/platform/pc32/i386/trap.c
sys/sys/msgport.h
sys/sys/msgport2.h

index 170d1db..14eaab3 100644 (file)
@@ -36,7 +36,7 @@
  *
  *     from: @(#)trap.c        7.4 (Berkeley) 5/13/91
  * $FreeBSD: src/sys/i386/i386/trap.c,v 1.147.2.11 2003/02/27 19:09:59 luoqi Exp $
- * $DragonFly: src/sys/i386/i386/Attic/trap.c,v 1.49 2004/04/10 20:55:20 dillon Exp $
+ * $DragonFly: src/sys/i386/i386/Attic/trap.c,v 1.50 2004/04/20 01:52:17 dillon Exp $
  */
 
 /*
@@ -1282,7 +1282,8 @@ syscall2(struct trapframe frame)
         * results are returned.  Since edx is loaded from fds[1] when the 
         * system call returns we pre-set it here.
         */
-       lwkt_initmsg_rp(&args.lmsg, &td->td_msgport, code);
+       lwkt_initmsg(&args.lmsg, &td->td_msgport, 0,
+                       lwkt_cmd_op(code), lwkt_cmd_op_none);
        args.sysmsg_copyout = NULL;
        args.sysmsg_fds[0] = 0;
        args.sysmsg_fds[1] = frame.tf_edx;
@@ -1532,8 +1533,9 @@ sendsys2(struct trapframe frame)
         * Initialize the kernel message from the copied-in data and
         * pull in appropriate flags from the userland message.
         */
-       lwkt_initmsg_rp(&sysun->lmsg, &td->td_msgport, 
-           sysun->nosys.usrmsg.umsg.ms_cmd);
+       lwkt_initmsg(&sysun->lmsg, &td->td_msgport, 0,
+                       sysun->nosys.usrmsg.umsg.ms_cmd,
+                       lwkt_cmd_op_none);
        sysun->sysmsg_copyout = NULL;
        sysun->lmsg.opaque.ms_umsg = umsg;
        sysun->lmsg.ms_flags |= sysun->nosys.usrmsg.umsg.ms_flags & MSGF_ASYNC;
@@ -1542,7 +1544,7 @@ sendsys2(struct trapframe frame)
         * Extract the system call number, lookup the system call, and
         * set the default return value.
         */
-       code = (u_int)sysun->lmsg.ms_cmd;
+       code = (u_int)sysun->lmsg.ms_cmd.cm_op;
        if (code >= p->p_sysent->sv_size) {
                error = ENOSYS;
                goto bad1;
index bb05db9..9c96f43 100644 (file)
@@ -25,7 +25,7 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/kern_device.c,v 1.8 2004/03/06 19:40:28 dillon Exp $
+ * $DragonFly: src/sys/kern/kern_device.c,v 1.9 2004/04/20 01:52:22 dillon Exp $
  */
 #include <sys/param.h>
 #include <sys/kernel.h>
@@ -61,6 +61,10 @@ static int cdevsw_putport(lwkt_port_t port, lwkt_msg_t msg);
  *
  * Don't worry too much about optimizing this code, the critical devices
  * will implement their own port messaging functions directly.
+ *
+ * YYY NOTE: ms_cmd can now hold a function pointer, should this code be
+ * converted from an integer op to a function pointer with a flag to
+ * indicate legacy operation?
  */
 static void
 init_default_cdevsw_port(lwkt_port_t port)
@@ -81,7 +85,7 @@ cdevsw_putport(lwkt_port_t port, lwkt_msg_t lmsg)
      * If queueable then officially queue the message
      */
     if (port->mp_td) {
-       int mask = (1 << (msg->am_lmsg.ms_cmd & MSG_SUBCMD_MASK));
+       int mask = (1 << (msg->am_lmsg.ms_cmd.cm_op & MSG_SUBCMD_MASK));
        if (csw->d_autoq & mask) 
            return(lwkt_beginmsg(port, &msg->am_lmsg));
     }
@@ -90,7 +94,7 @@ cdevsw_putport(lwkt_port_t port, lwkt_msg_t lmsg)
      * Run the device switch function synchronously in the context of the
      * caller and return a synchronous error code (anything not EASYNC).
      */
-    switch(msg->am_lmsg.ms_cmd) {
+    switch(msg->am_lmsg.ms_cmd.cm_op) {
     case CDEV_CMD_OPEN:
        error = csw->old_open(
                    msg->am_open.msg.dev,
@@ -188,7 +192,7 @@ _init_cdevmsg(dev_t dev, cdevmsg_t msg, int cmd)
 {
     struct cdevsw *csw;
 
-    lwkt_initmsg(&msg->msg, cmd);
+    lwkt_initmsg_simple(&msg->msg, cmd);
     msg->dev = dev;
     msg->csw = csw = _devsw(dev);
     if (csw != NULL) {                 /* YYY too hackish */
index 2f77217..ffb99c2 100644 (file)
@@ -26,7 +26,7 @@
  * NOTE! This file may be compiled for userland libraries as well as for
  * the kernel.
  *
- * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.19 2004/04/15 00:50:03 dillon Exp $
+ * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.20 2004/04/20 01:52:22 dillon Exp $
  */
 
 #ifdef _KERNEL
@@ -99,7 +99,8 @@ static void lwkt_putport_remote(lwkt_msg_t msg);
  *
  *     The message's ms_cmd must be initialized and its ms_flags must
  *     be zero'd out.  lwkt_sendmsg() will initialize the ms_abort_port
- *     (abort chasing port).
+ *     (abort chasing port).  If abort is supported, ms_abort must also be
+ *     initialized.
  *
  *     NOTE: you cannot safely request an abort until lwkt_sendmsg() returns
  *     to the caller.
@@ -110,7 +111,8 @@ lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
     int error;
 
     msg->ms_flags |= MSGF_ASYNC;
-    msg->ms_flags &= ~(MSGF_REPLY1 | MSGF_REPLY2 | MSGF_QUEUED);
+    msg->ms_flags &= ~(MSGF_REPLY1 | MSGF_REPLY2 | MSGF_QUEUED | \
+                       MSGF_ABORTED | MSGF_RETRIEVED);
     KKASSERT(msg->ms_reply_port != NULL);
     msg->ms_abort_port = msg->ms_reply_port;
     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
@@ -132,7 +134,8 @@ lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
  *
  *     The message's ms_cmd must be initialized, and its ms_flags must be
  *     at least zero'd out.  lwkt_domsg() will initialize the message's
- *     ms_abort_port (abort chasing port).
+ *     ms_abort_port (abort chasing port).  If abort is supported, ms_abort
+ *     must also be initialized.
  *
  *     NOTE: you cannot safely request an abort until lwkt_domsg() blocks.
  *     XXX this probably needs some work.
@@ -142,7 +145,8 @@ lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg)
 {
     int error;
 
-    msg->ms_flags &= ~(MSGF_ASYNC | MSGF_REPLY1 | MSGF_REPLY2 | MSGF_QUEUED);
+    msg->ms_flags &= ~(MSGF_ASYNC | MSGF_REPLY1 | MSGF_REPLY2 | \
+                       MSGF_QUEUED | MSGF_ABORTED | MSGF_RETRIEVED);
     KKASSERT(msg->ms_reply_port != NULL);
     msg->ms_abort_port = msg->ms_reply_port;
     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
@@ -185,8 +189,49 @@ lwkt_initport(lwkt_port_t port, thread_t td)
  *     Note that once a message has been dequeued it is subject to being
  *     requeued via an IPI based abort request if it is not marked MSGF_DONE.
  *
+ *     If the message has been aborted we have to guarentee that abort 
+ *     semantics are properly followed.   The target port will always see
+ *     the original message at least once, and if it does not reply the 
+ *     message before looping on its message port again it will then see
+ *     the message again with ms_cmd set to ms_abort.
+ *
  *     The calling thread MUST own the port.
  */
+
+static __inline
+void
+_lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    if ((msg->ms_flags & MSGF_ABORTED) == 0) {
+       /*
+        * normal case, remove and return the message.
+        */
+       TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+       msg->ms_flags = (msg->ms_flags & ~MSGF_QUEUED) | MSGF_RETRIEVED;
+    } else {
+       if (msg->ms_flags & MSGF_RETRIEVED) {
+           /*
+            * abort case, message already returned once, remvoe and
+            * return the aborted message a second time after setting
+            * ms_cmd to ms_abort.
+            */
+           TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+           msg->ms_flags &= ~MSGF_QUEUED;
+           msg->ms_cmd = msg->ms_abort;
+       } else {
+           /*
+            * abort case, abort races initial message retrieval.  The
+            * message is returned normally but not removed from the 
+            * queue.  On the next loop the 'aborted' message will be
+            * dequeued and returned.  Note that if the caller replies
+            * to the message it will be dequeued (the abort becomes a
+            * NOP).
+            */
+           msg->ms_flags |= MSGF_RETRIEVED;
+       }
+    }
+}
+
 void *
 lwkt_getport(lwkt_port_t port)
 {
@@ -195,10 +240,8 @@ lwkt_getport(lwkt_port_t port)
     KKASSERT(port->mp_td == curthread);
 
     crit_enter_quick(port->mp_td);
-    if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) {
-       TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
-       msg->ms_flags &= ~MSGF_QUEUED;
-    }
+    if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
+       _lwkt_pullmsg(port, msg);
     crit_exit_quick(port->mp_td);
     return(msg);
 }
@@ -209,7 +252,7 @@ lwkt_getport(lwkt_port_t port)
  *
  * The message is being returned to the specified port.  The port is
  * owned by the mp_td thread.  If we are on the same cpu as the mp_td
- * thread we can trivially queue the message to the messageq and schedule
+ * thread we can trivially queue the message to the reply port and schedule
  * the target thread, otherwise we have to send an ipi message to the
  * correct cpu.
  *
@@ -225,8 +268,8 @@ _lwkt_replyport(lwkt_port_t port, lwkt_msg_t msg, int force)
 
     if (force || td->td_gd == mycpu) {
        /*
-        * If an abort is racing us we cannot queue the reply now, the
-        * abort code will have to do it when it catches up.
+        * We can only reply the message if the abort has caught up with us,
+        * or if no abort was issued (same case).
         */
        if (msg->ms_abort_port == port) {
            KKASSERT((msg->ms_flags & MSGF_QUEUED) == 0);
@@ -256,15 +299,21 @@ lwkt_replyport_remote(lwkt_msg_t msg)
  * Note that the lwkt_replymsg() inline has already set MSGF_REPLY1 and
  * entered a critical section for us.
  */
+
 void
 lwkt_default_replyport(lwkt_port_t port, lwkt_msg_t msg)
 {
+    crit_enter();
+    msg->ms_flags |= MSGF_REPLY1;
     if (msg->ms_flags & MSGF_ASYNC) {
        /*
         * An abort may have caught up to us while we were processing the
-        * message.  If this occured we have to dequeue the message before
-        * we can reply it.  If an abort occurs after we reply the MSGF_REPLY1
-        * flag will prevent it from being queued to the target port.
+        * message.  If this occured we have to dequeue the message from the
+        * target port in the context of our current cpu before we can
+        * finish replying it.
+        *
+        * If an abort occurs after we reply the MSGF_REPLY1 flag will
+        * prevent it from being requeued to the target port.
         */
        if (msg->ms_flags & MSGF_QUEUED) {
            KKASSERT(msg->ms_flags & MSGF_ABORTED);
@@ -273,10 +322,16 @@ lwkt_default_replyport(lwkt_port_t port, lwkt_msg_t msg)
        }
        _lwkt_replyport(port, msg, 0);
     } else {
+       /*
+        * Synchronously executed messages cannot be aborted and are just
+        * marked done.  YYY MSGF_DONE should already be set, change flag set
+        * to KKASSERT.
+        */
        msg->ms_flags |= MSGF_DONE;
        if (port->mp_flags & MSGPORTF_WAITING)
            lwkt_schedule(port->mp_td);
     }
+    crit_exit();
 }
 
 /*
@@ -350,6 +405,7 @@ lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
        TAILQ_REMOVE(&msg->ms_target_port->mp_msgq, msg, ms_node);
        msg->ms_flags &= ~MSGF_QUEUED;
     }
+    msg->ms_flags &= ~MSGF_RETRIEVED;
     if ((error = port->mp_putport(port, msg)) != EASYNC)
        lwkt_replymsg(msg, error);
     crit_exit();
@@ -376,12 +432,13 @@ lwkt_abortmsg(lwkt_msg_t msg)
     thread_t td;
 
     /*
-     * A critical section protects us from reply IPIs on this cpu.  If
-     * the message is marked done it has already been completely processed
-     * and replied before we caled lwkt_abortmsg().
+     * A critical section protects us from reply IPIs on this cpu.   We 
+     * can only abort messages that have not yet completed (DONE), are not
+     * in the midst of being replied (REPLY1), and which support the
+     * abort function (ABORTABLE).
      */
     crit_enter();
-    if ((msg->ms_flags & MSGF_DONE) == 0) {
+    if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY1|MSGF_ABORTABLE)) == MSGF_ABORTABLE) {
        /*
         * Chase the message.  If REPLY1 is set the message has been replied
         * all the way back to the originator, otherwise it is sitting on
@@ -450,7 +507,9 @@ lwkt_default_abortport(lwkt_port_t port, lwkt_msg_t msg)
     if (msg->ms_flags & MSGF_REPLY2) {
        /*
         * If REPLY2 is set we must have chased it all the way back to
-        * the reply port.  We become responsible for 
+        * the reply port, but the replyport code has not queued the message
+        * (because it was waiting for the abort to catch up).  We become
+        * responsible for queueing the message to the reply port.
         */
        KKASSERT((msg->ms_flags & MSGF_QUEUED) == 0);
        KKASSERT(port == msg->ms_reply_port);
@@ -459,10 +518,21 @@ lwkt_default_abortport(lwkt_port_t port, lwkt_msg_t msg)
        if (port->mp_flags & MSGPORTF_WAITING)
            lwkt_schedule(port->mp_td);
     } else if ((msg->ms_flags & (MSGF_QUEUED|MSGF_REPLY1)) == 0) {
+       /*
+        * Abort on the target port.  The message has not yet been replied
+        * and must be requeued to the target port.
+        */
        msg->ms_flags |= MSGF_ABORTED | MSGF_QUEUED;
        TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
        if (port->mp_flags & MSGPORTF_WAITING)
            lwkt_schedule(port->mp_td);
+    } else if ((msg->ms_flags & MSGF_REPLY1) == 0) {
+       /*
+        * The message has not yet been retrieved by the target port, set
+        * MSGF_ABORTED so the target port can requeue the message abort after
+        * retrieving it.
+        */
+       msg->ms_flags |= MSGF_ABORTED;
     }
 }
 
@@ -474,7 +544,8 @@ lwkt_default_abortport(lwkt_port_t port, lwkt_msg_t msg)
  *     returns NULL.
  *
  *     If msg is non-NULL, block until the requested message has been returned
- *     to the port then dequeue and return it.
+ *     to the port then dequeue and return it.  DO NOT USE THIS TO WAIT FOR
+ *     INCOMING REQUESTS, ONLY USE THIS TO WAIT FOR REPLIES.
  *
  *     Note that the API does not currently support multiple threads waiting
  *     on a port.  By virtue of owning the port it is controlled by our
@@ -497,22 +568,19 @@ lwkt_default_waitport(lwkt_port_t port, lwkt_msg_t msg)
            } while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL);
            port->mp_flags &= ~MSGPORTF_WAITING;
        }
-       TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
-       msg->ms_flags &= ~MSGF_QUEUED;
+       _lwkt_pullmsg(port, msg);
     } else {
        /*
-        * If the message is marked done but not queued it has already been
-        * pulled off the port and returned and we do not have to do anything.
-        * Otherwise we do not own the message have to wait for message
-        * completion.  Beware of cpu races if MSGF_DONE is not found to be
-        * set!
+        * If a message is not marked done, or if it is queued, we have work
+        * to do.  Note that MSGF_DONE is always set in the context of the
+        * reply port's cpu.
         */
-       if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY1)) != MSGF_DONE) {
+       if ((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) != MSGF_DONE) {
            /*
             * We must own the reply port to safely mess with it's contents.
             */
            port = msg->ms_reply_port;
-           KKASSERT(port->mp_td == curthread);
+           KKASSERT(port->mp_td == td);
 
            if ((msg->ms_flags & MSGF_DONE) == 0) {
                port->mp_flags |= MSGPORTF_WAITING; /* saved by the BGL */
index b77c8a1..cfff488 100644 (file)
@@ -45,7 +45,7 @@
  *     @(#)ufs_disksubr.c      8.5 (Berkeley) 1/21/94
  * $FreeBSD: src/sys/kern/subr_disk.c,v 1.20.2.6 2001/10/05 07:14:57 peter Exp $
  * $FreeBSD: src/sys/ufs/ufs/ufs_disksubr.c,v 1.44.2.3 2001/03/05 05:42:19 obrien Exp $
- * $DragonFly: src/sys/kern/subr_disk.c,v 1.8 2004/02/18 06:59:15 dillon Exp $
+ * $DragonFly: src/sys/kern/subr_disk.c,v 1.9 2004/04/20 01:52:22 dillon Exp $
  */
 
 #include <sys/param.h>
@@ -220,7 +220,7 @@ disk_putport(lwkt_port_t port, lwkt_msg_t lmsg)
        cdevallmsg_t msg = (cdevallmsg_t)lmsg;
        int error;
 
-       switch(msg->am_lmsg.ms_cmd) {
+       switch(msg->am_lmsg.ms_cmd.cm_op) {
        case CDEV_CMD_OPEN:
                error = diskopen(
                            msg->am_open.msg.dev,
index 04f6e65..4bfc56d 100644 (file)
@@ -37,7 +37,7 @@
  *
  *     from: @(#)cons.c        7.2 (Berkeley) 5/9/91
  * $FreeBSD: src/sys/kern/tty_cons.c,v 1.81.2.4 2001/12/17 18:44:41 guido Exp $
- * $DragonFly: src/sys/kern/tty_cons.c,v 1.10 2003/11/24 20:46:01 dillon Exp $
+ * $DragonFly: src/sys/kern/tty_cons.c,v 1.11 2004/04/20 01:52:22 dillon Exp $
  */
 
 #include "opt_ddb.h"
@@ -265,7 +265,7 @@ console_putport(lwkt_port_t port, lwkt_msg_t lmsg)
        cdevallmsg_t msg = (cdevallmsg_t)lmsg;
        int error;
 
-       switch(msg->am_lmsg.ms_cmd) {
+       switch(msg->am_lmsg.ms_cmd.cm_op) {
        case CDEV_CMD_OPEN:
                error = cnopen(
                            msg->am_open.msg.dev,
index 0c404b5..93e1f6f 100644 (file)
@@ -27,7 +27,7 @@
  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/uipc_msg.c,v 1.8 2004/04/10 00:48:06 hsu Exp $
+ * $DragonFly: src/sys/kern/uipc_msg.c,v 1.9 2004/04/20 01:52:22 dillon Exp $
  */
 
 #include <sys/param.h>
@@ -44,8 +44,6 @@
 #include <net/netisr.h>
 #include <net/netmsg.h>
 
-static void netmsg_pru_dispatcher(struct netmsg *msg);
-
 int
 so_pru_abort(struct socket *so)
 {
@@ -57,8 +55,8 @@ so_pru_abort(struct socket *so)
                return ((*so->so_proto->pr_usrreqs->pru_abort)(so));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_ABORT);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_abort), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_abort;
        msg.nm_so = so;
        error = lwkt_domsg(port, &msg.nm_lmsg);
@@ -76,8 +74,8 @@ so_pru_accept(struct socket *so, struct sockaddr **nam)
                return ((*so->so_proto->pr_usrreqs->pru_accept)(so, nam));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_ACCEPT);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_accept), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_accept;
        msg.nm_so = so;
        msg.nm_nam = nam;
@@ -96,9 +94,8 @@ so_pru_attach(struct socket *so, int proto, struct pru_attach_info *ai)
                return ((*so->so_proto->pr_usrreqs->pru_attach)(so, proto, ai));
 
        port = so->so_proto->pr_mport(NULL, NULL);
-
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_ATTACH);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_attach), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_attach;
        msg.nm_so = so;
        msg.nm_proto = proto;
@@ -119,8 +116,8 @@ so_pru_bind(struct socket *so, struct sockaddr *nam, struct thread *td)
 
        /* Send mesg to thread for new address. */
        port = so->so_proto->pr_mport(NULL, nam);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_BIND);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_bind), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_bind;
        msg.nm_so = so;
        msg.nm_nam = nam;
@@ -140,8 +137,8 @@ so_pru_connect(struct socket *so, struct sockaddr *nam, struct thread *td)
                return ((*so->so_proto->pr_usrreqs->pru_connect)(so, nam, td));
 
        port = so->so_proto->pr_mport(so, nam);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_CONNECT);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_connect), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_connect;
        msg.nm_so = so;
        msg.nm_nam = nam;
@@ -167,8 +164,8 @@ so_pru_connect2(struct socket *so1, struct socket *so2)
         */
        panic("connect2 on socket type %d", so1->so_type);
        port = so1->so_proto->pr_mport(so1, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_CONNECT2);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_connect2), lwkt_cmd_op_none);
        msg.nm_prufn = so1->so_proto->pr_usrreqs->pru_connect2;
        msg.nm_so1 = so1;
        msg.nm_so2 = so2;
@@ -192,8 +189,8 @@ so_pru_control(struct socket *so, u_long cmd, caddr_t data, struct ifnet *ifp,
                    ifp, td));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_CONTROL);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_control), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_control;
        msg.nm_so = so;
        msg.nm_cmd = cmd;
@@ -216,8 +213,8 @@ so_pru_detach(struct socket *so)
                return ((*so->so_proto->pr_usrreqs->pru_detach)(so));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_DETACH);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_detach), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_detach;
        msg.nm_so = so;
        error = lwkt_domsg(port, &msg.nm_lmsg);
@@ -235,8 +232,8 @@ so_pru_disconnect(struct socket *so)
                return ((*so->so_proto->pr_usrreqs->pru_disconnect)(so));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_DISCONNECT);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_disconnect), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_disconnect;
        msg.nm_so = so;
        error = lwkt_domsg(port, &msg.nm_lmsg);
@@ -254,8 +251,8 @@ so_pru_listen(struct socket *so, struct thread *td)
                return ((*so->so_proto->pr_usrreqs->pru_listen)(so, td));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_LISTEN);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_listen), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_listen;
        msg.nm_so = so;
        msg.nm_td = td;         /* used only for prison_ip() XXX JH */
@@ -274,8 +271,8 @@ so_pru_peeraddr(struct socket *so, struct sockaddr **nam)
                return ((*so->so_proto->pr_usrreqs->pru_peeraddr)(so, nam));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_PEERADDR);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_peeraddr), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_peeraddr;
        msg.nm_so = so;
        msg.nm_nam = nam;
@@ -294,8 +291,8 @@ so_pru_rcvd(struct socket *so, int flags)
                return ((*so->so_proto->pr_usrreqs->pru_rcvd)(so, flags));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_RCVD);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_rcvd), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_rcvd;
        msg.nm_so = so;
        msg.nm_flags = flags;
@@ -314,8 +311,8 @@ so_pru_rcvoob(struct socket *so, struct mbuf *m, int flags)
                return ((*so->so_proto->pr_usrreqs->pru_rcvoob)(so, m, flags));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_RCVOOB);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_rcvoob), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_rcvoob;
        msg.nm_so = so;
        msg.nm_m = m;
@@ -337,8 +334,8 @@ so_pru_send(struct socket *so, int flags, struct mbuf *m, struct sockaddr *addr,
                    addr, control, td));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_SEND);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_send), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_send;
        msg.nm_so = so;
        msg.nm_flags = flags;
@@ -361,8 +358,8 @@ so_pru_sense(struct socket *so, struct stat *sb)
                return ((*so->so_proto->pr_usrreqs->pru_sense)(so, sb));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_SENSE);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_sense), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_sense;
        msg.nm_so = so;
        msg.nm_stat = sb;
@@ -381,8 +378,8 @@ so_pru_shutdown(struct socket *so)
                return ((*so->so_proto->pr_usrreqs->pru_shutdown)(so));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_SHUTDOWN);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_shutdown), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_shutdown;
        msg.nm_so = so;
        error = lwkt_domsg(port, &msg.nm_lmsg);
@@ -400,8 +397,8 @@ so_pru_sockaddr(struct socket *so, struct sockaddr **nam)
                return ((*so->so_proto->pr_usrreqs->pru_sockaddr)(so, nam));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_SOCKADDR);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_sockaddr), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_sockaddr;
        msg.nm_so = so;
        msg.nm_nam = nam;
@@ -422,8 +419,8 @@ so_pru_sopoll(struct socket *so, int events, struct ucred *cred,
                    cred, td));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PRU_SOPOLL);
-       msg.nm_handler = netmsg_pru_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_sopoll), lwkt_cmd_op_none);
        msg.nm_prufn = so->so_proto->pr_usrreqs->pru_sopoll;
        msg.nm_so = so;
        msg.nm_events = events;
@@ -446,8 +443,8 @@ so_pr_ctloutput(struct socket *so, struct sockopt *sopt)
                return ((*so->so_proto->pr_ctloutput)(so, sopt));
 
        port = so->so_proto->pr_mport(so, NULL);
-       lwkt_initmsg(&msg.nm_lmsg, CMD_NETMSG_PR_CTLOUTPUT);
-       msg.nm_handler = netmsg_pr_dispatcher;
+       lwkt_initmsg(&msg.nm_lmsg, &curthread->td_msgport, 0,
+               lwkt_cmd_func(netmsg_pru_ctloutput), lwkt_cmd_op_none);
        msg.nm_prfn = so->so_proto->pr_ctloutput;
        msg.nm_so = so;
        msg.nm_sopt = sopt;
@@ -457,208 +454,243 @@ so_pr_ctloutput(struct socket *so, struct sockopt *sopt)
 }
 
 /*
- * If we convert all the pru_usrreq functions for all the protocols
- * to take a message directly, this layer can go away.
+ * If we convert all the protosw pr_ functions for all the protocols
+ * to take a message directly, this layer can go away.  For the moment
+ * our dispatcher ignores the return value, but since we are handling
+ * the replymsg ourselves we return EASYNC by convention.
  */
-static void
-netmsg_pru_dispatcher(struct netmsg *msg)
+int
+netmsg_pru_abort(lwkt_msg_t msg)
+{
+       struct netmsg_pru_abort *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so));
+       return(EASYNC);
+}
+
+int
+netmsg_pru_accept(lwkt_msg_t msg)
+{
+       struct netmsg_pru_accept *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_nam));
+       return(EASYNC);
+}
+
+int
+netmsg_pru_attach(lwkt_msg_t msg)
 {
+       struct netmsg_pru_attach *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_proto, nm->nm_ai));
+       return(EASYNC);
+}
+
+int
+netmsg_pru_bind(lwkt_msg_t msg)
+{
+       struct netmsg_pru_bind *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_nam, nm->nm_td));
+       return(EASYNC);
+}
+
+int
+netmsg_pru_connect(lwkt_msg_t msg)
+{
+       struct netmsg_pru_connect *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_nam, nm->nm_td));
+       return(EASYNC);
+}
+
+int
+netmsg_pru_connect2(lwkt_msg_t msg)
+{
+       struct netmsg_pru_connect2 *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so1, nm->nm_so2));
+       return(EASYNC);
+}
+
+int
+netmsg_pru_control(lwkt_msg_t msg)
+{
+       struct netmsg_pru_control *nm = (void *)msg;
        int error;
 
-       switch (msg->nm_lmsg.ms_cmd) {
-       case CMD_NETMSG_PRU_ABORT:
-       {
-               struct netmsg_pru_abort *nm = (struct netmsg_pru_abort *)msg;
+       error = nm->nm_prufn(nm->nm_so, nm->nm_cmd, nm->nm_data,
+                               nm->nm_ifp, nm->nm_td);
+       lwkt_replymsg(msg, error);
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so);
-               break;
-       }
-       case CMD_NETMSG_PRU_ACCEPT:
-       {
-               struct netmsg_pru_accept *nm = (struct netmsg_pru_accept *)msg;
+int
+netmsg_pru_detach(lwkt_msg_t msg)
+{
+       struct netmsg_pru_detach *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_nam);
-               break;
-       }
-       case CMD_NETMSG_PRU_ATTACH:
-       {
-               struct netmsg_pru_attach *nm = (struct netmsg_pru_attach *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so));
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_proto, nm->nm_ai);
-               break;
-       }
-       case CMD_NETMSG_PRU_BIND:
-       {
-               struct netmsg_pru_bind *nm = (struct netmsg_pru_bind *)msg;
+int
+netmsg_pru_disconnect(lwkt_msg_t msg)
+{
+       struct netmsg_pru_disconnect *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_nam, nm->nm_td);
-               break;
-       }
-       case CMD_NETMSG_PRU_CONNECT:
-       {
-               struct netmsg_pru_connect *nm =
-                   (struct netmsg_pru_connect *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so));
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_nam, nm->nm_td);
-               break;
-       }
-       case CMD_NETMSG_PRU_CONNECT2:
-       {
-               struct netmsg_pru_connect2 *nm =
-                   (struct netmsg_pru_connect2 *)msg;
+int
+netmsg_pru_listen(lwkt_msg_t msg)
+{
+       struct netmsg_pru_listen *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so1, nm->nm_so2);
-               break;
-       }
-       case CMD_NETMSG_PRU_CONTROL:
-       {
-               struct netmsg_pru_control *nm =
-                   (struct netmsg_pru_control *)msg;
-
-               error = nm->nm_prufn(nm->nm_so, nm->nm_cmd, nm->nm_data,
-                   nm->nm_ifp, nm->nm_td);
-               break;
-       }
-       case CMD_NETMSG_PRU_DETACH:
-       {
-               struct netmsg_pru_detach *nm = (struct netmsg_pru_detach *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_td));
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so);
-               break;
-       }
-       case CMD_NETMSG_PRU_DISCONNECT:
-       {
-               struct netmsg_pru_disconnect *nm =
-                   (struct netmsg_pru_disconnect *)msg;
+int
+netmsg_pru_peeraddr(lwkt_msg_t msg)
+{
+       struct netmsg_pru_peeraddr *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so);
-               break;
-       }
-       case CMD_NETMSG_PRU_LISTEN:
-       {
-               struct netmsg_pru_listen *nm = (struct netmsg_pru_listen *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_nam));
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_td);
-               break;
-       }
-       case CMD_NETMSG_PRU_PEERADDR:
-       {
-               struct netmsg_pru_peeraddr *nm =
-                   (struct netmsg_pru_peeraddr *)msg;
+int
+netmsg_pru_rcvd(lwkt_msg_t msg)
+{
+       struct netmsg_pru_rcvd *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_nam);
-               break;
-       }
-       case CMD_NETMSG_PRU_RCVD:
-       {
-               struct netmsg_pru_rcvd *nm = (struct netmsg_pru_rcvd *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_flags));
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_flags);
-               break;
-       }
-       case CMD_NETMSG_PRU_RCVOOB:
-       {
-               struct netmsg_pru_rcvoob *nm = (struct netmsg_pru_rcvoob *)msg;
+int
+netmsg_pru_rcvoob(lwkt_msg_t msg)
+{
+       struct netmsg_pru_rcvoob *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_m, nm->nm_flags);
-               break;
-       }
-       case CMD_NETMSG_PRU_SEND:
-       {
-               struct netmsg_pru_send *nm = (struct netmsg_pru_send *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_m, nm->nm_flags));
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_flags, nm->nm_m,
-                   nm->nm_addr, nm->nm_control, nm->nm_td);
-               break;
-       }
-       case CMD_NETMSG_PRU_SENSE:
-       {
-               struct netmsg_pru_sense *nm = (struct netmsg_pru_sense *)msg;
+int
+netmsg_pru_send(lwkt_msg_t msg)
+{
+       struct netmsg_pru_send *nm = (void *)msg;
+       int error;
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_stat);
-               break;
-       }
-       case CMD_NETMSG_PRU_SHUTDOWN:
-       {
-               struct netmsg_pru_shutdown *nm =
-                   (struct netmsg_pru_shutdown *)msg;
+       error = nm->nm_prufn(nm->nm_so, nm->nm_flags, nm->nm_m,
+                               nm->nm_addr, nm->nm_control, nm->nm_td);
+       lwkt_replymsg(msg, error);
+       return(EASYNC);
+}
 
-               error = nm->nm_prufn(nm->nm_so);
-               break;
-       }
-       case CMD_NETMSG_PRU_SOCKADDR:
-       {
-               struct netmsg_pru_sockaddr *nm =
-                   (struct netmsg_pru_sockaddr *)msg;
+int
+netmsg_pru_sense(lwkt_msg_t msg)
+{
+       struct netmsg_pru_sense *nm = (void *)msg;
 
-               error = nm->nm_prufn(nm->nm_so, nm->nm_nam);
-               break;
-       }
-       case CMD_NETMSG_PRU_SOPOLL:
-       {
-               struct netmsg_pru_sopoll *nm =
-                   (struct netmsg_pru_sopoll *)msg;
-
-               error = nm->nm_prufn(nm->nm_so, nm->nm_events, nm->nm_cred,
-                   nm->nm_td);
-               break;
-       }
-       default:
-               panic("unknown netmsg %d", msg->nm_lmsg.ms_cmd);
-               break;
-       }
-       lwkt_replymsg(&msg->nm_lmsg, error);
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_stat));
+       return(EASYNC);
 }
 
-/*
- * If we convert all the protosw pr_ functions for all the protocols
- * to take a message directly, this layer can go away.
- */
-void
-netmsg_pr_dispatcher(struct netmsg *msg)
+int
+netmsg_pru_shutdown(lwkt_msg_t msg)
 {
-       int error = 0;
+       struct netmsg_pru_shutdown *nm = (void *)msg;
 
-       switch (msg->nm_lmsg.ms_cmd) {
-       case CMD_NETMSG_PR_CTLOUTPUT:
-       {
-               struct netmsg_pr_ctloutput *nm =
-                   (struct netmsg_pr_ctloutput *)msg;
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so));
+       return(EASYNC);
+}
 
-               error = nm->nm_prfn(nm->nm_so, nm->nm_sopt);
-               break;
-       }
-       case CMD_NETMSG_PR_TIMEOUT:
-       {
-               struct netmsg_pr_timeout *nm = (struct netmsg_pr_timeout *)msg;
+int
+netmsg_pru_sockaddr(lwkt_msg_t msg)
+{
+       struct netmsg_pru_sockaddr *nm = (void *)msg;
 
-               nm->nm_prfn();
-               break;
-       }
-       default:
-               panic("unknown netmsg %d", msg->nm_lmsg.ms_cmd);
-               break;
-       }
-       lwkt_replymsg(&msg->nm_lmsg, error);
+       lwkt_replymsg(msg, nm->nm_prufn(nm->nm_so, nm->nm_nam));
+       return(EASYNC);
 }
 
-void
-msg_notify_handler(struct netmsg *msg0)
+int
+netmsg_pru_sopoll(lwkt_msg_t msg)
 {
-       struct netmsg_so_notify *msg = (struct netmsg_so_notify *)msg0;
+       struct netmsg_pru_sopoll *nm = (void *)msg;
+       int error;
+
+       error = nm->nm_prufn(nm->nm_so, nm->nm_events, nm->nm_cred, nm->nm_td);
+       lwkt_replymsg(msg, error);
+       return(EASYNC);
+}
+
+int
+netmsg_pr_ctloutput(lwkt_msg_t msg)
+{
+       struct netmsg_pr_ctloutput *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prfn(nm->nm_so, nm->nm_sopt));
+       return(EASYNC);
+}
+
+int
+netmsg_pr_timeout(lwkt_msg_t msg)
+{
+       struct netmsg_pr_timeout *nm = (void *)msg;
+
+       lwkt_replymsg(msg, nm->nm_prfn());
+       return(EASYNC);
+}
+
+/*
+ * Handle a predicate event request.  This function is only called once
+ * when the predicate message queueing request is received.
+ */
+int
+netmsg_so_notify(lwkt_msg_t lmsg)
+{
+       struct netmsg_so_notify *msg = (void *)lmsg;
        struct sockbuf *sb;
 
-       /* Check if event occurred. */
-       if (msg->nm_predicate(msg0)) {
-               lwkt_replymsg(&msg->nm_lmsg, msg->nm_lmsg.ms_error);
-               return;
+       sb = (msg->nm_etype & NM_REVENT) ?
+                       &msg->nm_so->so_rcv :
+                       &msg->nm_so->so_snd;
+
+       /*
+        * Reply immediately if the event has occured, otherwise queue the
+        * request.
+        */
+       if (msg->nm_predicate((struct netmsg *)msg)) {
+               lwkt_replymsg(lmsg, lmsg->ms_error);
+       } else {
+               TAILQ_INSERT_TAIL(&sb->sb_sel.si_mlist, msg, nm_list);
+               sb->sb_flags |= SB_MEVENT;
        }
+       return(EASYNC);
+}
+
+/*
+ * Predicate requests can be aborted.  This function is only called once
+ * and will interlock against processing/reply races (since such races
+ * occur on the same thread that controls the port where the abort is 
+ * requeued).
+ */
+int
+netmsg_so_notify_abort(lwkt_msg_t lmsg)
+{
+       struct netmsg_so_notify *msg = (void *)lmsg;
+       struct sockbuf *sb;
 
-       /* If not, queue the predicate check. */
        sb = (msg->nm_etype & NM_REVENT) ?
                        &msg->nm_so->so_rcv :
                        &msg->nm_so->so_snd;
-
-       TAILQ_INSERT_TAIL(&sb->sb_sel.si_mlist, msg, nm_list);
-       sb->sb_flags |= SB_MEVENT;
+       TAILQ_REMOVE(&sb->sb_sel.si_mlist, msg, nm_list);
+       lwkt_replymsg(lmsg, EINTR);
+       return(EASYNC);
 }
+
index ce5a733..f6525a9 100644 (file)
@@ -32,7 +32,7 @@
  *
  *     @(#)uipc_socket2.c      8.1 (Berkeley) 6/10/93
  * $FreeBSD: src/sys/kern/uipc_socket2.c,v 1.55.2.17 2002/08/31 19:04:55 dwmalone Exp $
- * $DragonFly: src/sys/kern/uipc_socket2.c,v 1.9 2004/04/10 00:48:06 hsu Exp $
+ * $DragonFly: src/sys/kern/uipc_socket2.c,v 1.10 2004/04/20 01:52:22 dillon Exp $
  */
 
 #include "opt_param.h"
@@ -292,9 +292,8 @@ sb_lock(sb)
 }
 
 /*
- * Wakeup processes waiting on a socket buffer.
- * Do asynchronous notification via SIGIO
- * if the socket has the SS_ASYNC flag set.
+ * Wakeup processes waiting on a socket buffer.  Do asynchronous notification
+ * via SIGIO if the socket has the SS_ASYNC flag set.
  */
 void
 sowakeup(so, sb)
@@ -321,13 +320,11 @@ sowakeup(so, sb)
 
                TAILQ_FOREACH_MUTABLE(msg, &selinfo->si_mlist, nm_list, nmsg) {
                        if (msg->nm_predicate((struct netmsg *)msg)) {
-                               struct lwkt_msg *lmsg = &msg->nm_lmsg;
-
-                               lwkt_replymsg(lmsg, lmsg->ms_error);
                                TAILQ_REMOVE(&selinfo->si_mlist, msg, nm_list);
+                               lwkt_replymsg(&msg->nm_lmsg, 
+                                               msg->nm_lmsg.ms_error);
                        }
                }
-
                if (TAILQ_EMPTY(&sb->sb_sel.si_mlist))
                        sb->sb_flags &= ~SB_MEVENT;
        }
index c0a73be..2887b7a 100644 (file)
@@ -35,7 +35,7 @@
  *
  *     @(#)uipc_syscalls.c     8.4 (Berkeley) 2/21/94
  * $FreeBSD: src/sys/kern/uipc_syscalls.c,v 1.65.2.17 2003/04/04 17:11:16 tegge Exp $
- * $DragonFly: src/sys/kern/uipc_syscalls.c,v 1.31 2004/04/10 10:01:54 hsu Exp $
+ * $DragonFly: src/sys/kern/uipc_syscalls.c,v 1.32 2004/04/20 01:52:22 dillon Exp $
  */
 
 #include "opt_ktrace.h"
@@ -74,6 +74,9 @@
 #include <sys/file2.h>
 #include <sys/signalvar.h>
 
+#include <sys/thread2.h>
+#include <sys/msgport2.h>
+
 /*
  * System call interface to the socket abstraction.
  */
@@ -370,6 +373,24 @@ accept(struct accept_args *uap)
        return (error);
 }
 
+/*
+ * Returns TRUE if predicate satisfied.
+ */
+static boolean_t
+soconnected_predicate(struct netmsg *msg0)
+{
+       struct netmsg_so_notify *msg = (struct netmsg_so_notify *)msg0;
+       struct socket *so = msg->nm_so;
+
+       /* check predicate */
+       if (!(so->so_state & SS_ISCONNECTING) || so->so_error != 0) {
+               msg->nm_lmsg.ms_error = so->so_error;
+               return (TRUE);
+       }
+
+       return (FALSE);
+}
+
 int
 kern_connect(int s, struct sockaddr *sa)
 {
@@ -394,17 +415,25 @@ kern_connect(int s, struct sockaddr *sa)
                error = EINPROGRESS;
                goto done;
        }
-       s = splnet();
-       while ((so->so_state & SS_ISCONNECTING) && so->so_error == 0) {
-               error = tsleep((caddr_t)&so->so_timeo, PCATCH, "connec", 0);
-               if (error)
-                       break;
+       if ((so->so_state & SS_ISCONNECTING) && so->so_error == 0) {
+               struct netmsg_so_notify msg;
+               lwkt_port_t port;
+
+               port = so->so_proto->pr_mport(so, sa);
+               lwkt_initmsg(&msg.nm_lmsg, 
+                           &curthread->td_msgport,
+                           MSGF_PCATCH | MSGF_ABORTABLE,
+                           lwkt_cmd_func(netmsg_so_notify),
+                           lwkt_cmd_func(netmsg_so_notify_abort));
+               msg.nm_predicate = soconnected_predicate;
+               msg.nm_so = so;
+               msg.nm_etype = NM_REVENT;
+               error = lwkt_domsg(port, &msg.nm_lmsg);
        }
        if (error == 0) {
                error = so->so_error;
                so->so_error = 0;
        }
-       splx(s);
 bad:
        so->so_state &= ~SS_ISCONNECTING;
        if (error == ERESTART)
index 66b8fe5..0a8f68d 100644 (file)
@@ -3,7 +3,7 @@
  * Copyright (c) 2003 Jonathan Lemon
  * Copyright (c) 2003 Matthew Dillon
  *
- * $DragonFly: src/sys/net/netisr.c,v 1.12 2004/04/17 00:46:28 dillon Exp $
+ * $DragonFly: src/sys/net/netisr.c,v 1.13 2004/04/20 01:52:26 dillon Exp $
  */
 
 #include <sys/param.h>
@@ -62,7 +62,7 @@ netmsg_service_loop(void *arg)
     struct netmsg *msg;
 
     while ((msg = lwkt_waitport(&curthread->td_msgport, NULL)))
-       msg->nm_handler(msg);
+       msg->nm_lmsg.ms_cmd.cm_func(&msg->nm_lmsg);
 }
 
 /*
@@ -103,9 +103,9 @@ netisr_queue(int num, struct mbuf *m)
     /* use better message allocation system with limits later XXX JH */
     pmsg = malloc(sizeof(struct netmsg_packet), M_LWKTMSG, M_WAITOK);
 
-    lwkt_initmsg_rp(&pmsg->nm_lmsg, &netisr_afree_rport, CMD_NETMSG_NEWPKT);
+    lwkt_initmsg(&pmsg->nm_lmsg, &netisr_afree_rport, 0,
+               lwkt_cmd_func((void *)ni->ni_handler), lwkt_cmd_op_none);
     pmsg->nm_packet = m;
-    pmsg->nm_handler = ni->ni_handler;
     lwkt_sendmsg(port, &pmsg->nm_lmsg);
     return (0);
 }
@@ -163,7 +163,7 @@ schednetisr(int num)
     if (!(pmsg = malloc(sizeof(struct netmsg), M_LWKTMSG, M_NOWAIT)))
        return;
 
-    lwkt_initmsg_rp(&pmsg->nm_lmsg, &netisr_afree_rport, CMD_NETMSG_POLL);
-    pmsg->nm_handler = ni->ni_handler;
+    lwkt_initmsg(&pmsg->nm_lmsg, &netisr_afree_rport, 0,
+               lwkt_cmd_func((void *)ni->ni_handler), lwkt_cmd_op_none);
     lwkt_sendmsg(port, &pmsg->nm_lmsg);
 }
index 22da7d0..f761a2b 100644 (file)
@@ -32,7 +32,7 @@
  *
  *     @(#)netisr.h    8.1 (Berkeley) 6/10/93
  * $FreeBSD: src/sys/net/netisr.h,v 1.21.2.5 2002/02/09 23:02:39 luigi Exp $
- * $DragonFly: src/sys/net/netisr.h,v 1.14 2004/04/10 09:35:34 hsu Exp $
+ * $DragonFly: src/sys/net/netisr.h,v 1.15 2004/04/20 01:52:26 dillon Exp $
  */
 
 #ifndef _NET_NETISR_H_
@@ -89,18 +89,15 @@ typedef boolean_t (*msg_predicate_fn_t)(struct netmsg *);
  */
 struct netmsg {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
 };
 
 struct netmsg_packet {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     struct mbuf                *nm_packet;
 };
 
 struct netmsg_pr_ctloutput {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     int                        (*nm_prfn) (struct socket *, struct sockopt *);
     struct socket      *nm_so;
     struct sockopt     *nm_sopt;
@@ -108,13 +105,11 @@ struct netmsg_pr_ctloutput {
 
 struct netmsg_pr_timeout {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
-    void               (*nm_prfn) (void);
+    int                        (*nm_prfn) (void);
 };
 
 struct netmsg_so_notify {
     struct lwkt_msg                    nm_lmsg;
-    netisr_fn_t                                nm_handler;
     msg_predicate_fn_t                 nm_predicate;
     struct socket                      *nm_so;
     int                                        nm_etype;  /* receive or send event */
@@ -128,39 +123,31 @@ struct netmsg_so_notify {
  * for dispatching pr_ functions,
  * until they can be converted to message-passing
  */
-void netmsg_pr_dispatcher(struct netmsg *);
-
-#define CMD_NETMSG_NEWPKT              (MSG_CMD_NETMSG | 0x0001)
-#define CMD_NETMSG_POLL                        (MSG_CMD_NETMSG | 0x0002)
-
-#define CMD_NETMSG_PRU_ABORT           (MSG_CMD_NETMSG | 0x0003)
-#define CMD_NETMSG_PRU_ACCEPT          (MSG_CMD_NETMSG | 0x0004)
-#define CMD_NETMSG_PRU_ATTACH          (MSG_CMD_NETMSG | 0x0005)
-#define CMD_NETMSG_PRU_BIND            (MSG_CMD_NETMSG | 0x0006)
-#define CMD_NETMSG_PRU_CONNECT         (MSG_CMD_NETMSG | 0x0007)
-#define CMD_NETMSG_PRU_CONNECT2                (MSG_CMD_NETMSG | 0x0008)
-#define CMD_NETMSG_PRU_CONTROL         (MSG_CMD_NETMSG | 0x0009)
-#define CMD_NETMSG_PRU_DETACH          (MSG_CMD_NETMSG | 0x000a)
-#define CMD_NETMSG_PRU_DISCONNECT      (MSG_CMD_NETMSG | 0x000b)
-#define CMD_NETMSG_PRU_LISTEN          (MSG_CMD_NETMSG | 0x000c)
-#define CMD_NETMSG_PRU_PEERADDR                (MSG_CMD_NETMSG | 0x000d)
-#define CMD_NETMSG_PRU_RCVD            (MSG_CMD_NETMSG | 0x000e)
-#define CMD_NETMSG_PRU_RCVOOB          (MSG_CMD_NETMSG | 0x000f)
-#define CMD_NETMSG_PRU_SEND            (MSG_CMD_NETMSG | 0x0010)
-#define CMD_NETMSG_PRU_SENSE           (MSG_CMD_NETMSG | 0x0011)
-#define CMD_NETMSG_PRU_SHUTDOWN                (MSG_CMD_NETMSG | 0x0012)
-#define CMD_NETMSG_PRU_SOCKADDR                (MSG_CMD_NETMSG | 0x0013)
-#define CMD_NETMSG_PRU_SOSEND          (MSG_CMD_NETMSG | 0x0014)
-#define CMD_NETMSG_PRU_SORECEIVE       (MSG_CMD_NETMSG | 0x0015)
-#define CMD_NETMSG_PRU_SOPOLL          (MSG_CMD_NETMSG | 0x0016)
-
-#define CMD_NETMSG_PR_CTLOUTPUT                (MSG_CMD_NETMSG | 0x0017)
-#define CMD_NETMSG_PR_TIMEOUT          (MSG_CMD_NETMSG | 0x0018)
-
-#define        CMD_NETMSG_ONCPU                (MSG_CMD_NETMSG | 0x0019)
-#define        CMD_NETMSG_NOTIFY               (MSG_CMD_NETMSG | 0x0020)
-
-void msg_notify_handler(struct netmsg *);
+int netmsg_pru_abort(lwkt_msg_t);
+int netmsg_pru_accept(lwkt_msg_t);
+int netmsg_pru_attach(lwkt_msg_t);
+int netmsg_pru_bind(lwkt_msg_t);
+int netmsg_pru_connect(lwkt_msg_t);
+int netmsg_pru_connect2(lwkt_msg_t);
+int netmsg_pru_control(lwkt_msg_t);
+int netmsg_pru_detach(lwkt_msg_t);
+int netmsg_pru_disconnect(lwkt_msg_t);
+int netmsg_pru_listen(lwkt_msg_t);
+int netmsg_pru_peeraddr(lwkt_msg_t);
+int netmsg_pru_rcvd(lwkt_msg_t);
+int netmsg_pru_rcvoob(lwkt_msg_t);
+int netmsg_pru_send(lwkt_msg_t);
+int netmsg_pru_sense(lwkt_msg_t);
+int netmsg_pru_shutdown(lwkt_msg_t);
+int netmsg_pru_sockaddr(lwkt_msg_t);
+
+int netmsg_pru_sopoll(lwkt_msg_t);
+
+int netmsg_pr_ctloutput(lwkt_msg_t);
+int netmsg_pr_timeout(lwkt_msg_t);
+
+int netmsg_so_notify(lwkt_msg_t);
+int netmsg_so_notify_abort(lwkt_msg_t);
 
 typedef lwkt_port_t (*lwkt_portfn_t)(struct mbuf *);
 
index 1ecd805..e9c2094 100644 (file)
@@ -27,7 +27,7 @@
  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  *
- * $DragonFly: src/sys/net/netmsg.h,v 1.1 2004/03/06 02:36:25 hsu Exp $
+ * $DragonFly: src/sys/net/netmsg.h,v 1.2 2004/04/20 01:52:26 dillon Exp $
  */
 
 #ifndef _NETMSG_H_
  */
 struct netmsg_pru_abort {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_abort_fn_t     nm_prufn;
     struct socket      *nm_so;
 };
 
 struct netmsg_pru_accept {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_accept_fn_t    nm_prufn;
     struct socket      *nm_so;
     struct sockaddr    **nm_nam;
@@ -53,7 +51,6 @@ struct netmsg_pru_accept {
 
 struct netmsg_pru_attach {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_attach_fn_t    nm_prufn;
     struct socket      *nm_so;
     int                        nm_proto;
@@ -62,7 +59,6 @@ struct netmsg_pru_attach {
 
 struct netmsg_pru_bind {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_bind_fn_t      nm_prufn;
     struct socket      *nm_so;
     struct sockaddr    *nm_nam;
@@ -71,7 +67,6 @@ struct netmsg_pru_bind {
 
 struct netmsg_pru_connect {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_connect_fn_t   nm_prufn;
     struct socket      *nm_so;
     struct sockaddr    *nm_nam;
@@ -80,7 +75,6 @@ struct netmsg_pru_connect {
 
 struct netmsg_pru_connect2 {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_connect2_fn_t  nm_prufn;
     struct socket      *nm_so1;
     struct socket      *nm_so2;
@@ -88,7 +82,6 @@ struct netmsg_pru_connect2 {
 
 struct netmsg_pru_control {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_control_fn_t   nm_prufn;
     struct socket      *nm_so;
     u_long             nm_cmd;
@@ -99,21 +92,18 @@ struct netmsg_pru_control {
 
 struct netmsg_pru_detach {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_detach_fn_t    nm_prufn;
     struct socket      *nm_so;
 };
 
 struct netmsg_pru_disconnect {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_disconnect_fn_t        nm_prufn;
     struct socket      *nm_so;
 };
 
 struct netmsg_pru_listen {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_listen_fn_t    nm_prufn;
     struct socket      *nm_so;
     struct thread      *nm_td;
@@ -121,7 +111,6 @@ struct netmsg_pru_listen {
 
 struct netmsg_pru_peeraddr {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_peeraddr_fn_t  nm_prufn;
     struct socket      *nm_so;
     struct sockaddr    **nm_nam;
@@ -129,7 +118,6 @@ struct netmsg_pru_peeraddr {
 
 struct netmsg_pru_rcvd {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_rcvd_fn_t      nm_prufn;
     struct socket      *nm_so;
     int                        nm_flags;
@@ -137,7 +125,6 @@ struct netmsg_pru_rcvd {
 
 struct netmsg_pru_rcvoob {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_rcvoob_fn_t    nm_prufn;
     struct socket      *nm_so;
     struct mbuf                *nm_m;
@@ -146,7 +133,6 @@ struct netmsg_pru_rcvoob {
 
 struct netmsg_pru_send {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_send_fn_t      nm_prufn;
     struct socket      *nm_so;
     int                        nm_flags;
@@ -158,7 +144,6 @@ struct netmsg_pru_send {
 
 struct netmsg_pru_sense {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_sense_fn_t     nm_prufn;
     struct socket      *nm_so;
     struct stat                *nm_stat;
@@ -166,14 +151,12 @@ struct netmsg_pru_sense {
 
 struct netmsg_pru_shutdown {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_shutdown_fn_t  nm_prufn;
     struct socket      *nm_so;
 };
 
 struct netmsg_pru_sockaddr {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_sockaddr_fn_t  nm_prufn;
     struct socket      *nm_so;
     struct sockaddr    **nm_nam;
@@ -181,7 +164,6 @@ struct netmsg_pru_sockaddr {
 
 struct netmsg_pru_sosend {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_sosend_fn_t    nm_prufn;
     struct socket      *nm_so;
     struct sockaddr    *nm_addr;
@@ -194,7 +176,6 @@ struct netmsg_pru_sosend {
 
 struct netmsg_pru_soreceive {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_soreceive_fn_t nm_prufn;
     struct sockaddr    *nm_addr;
     struct socket      *nm_so;
@@ -207,7 +188,6 @@ struct netmsg_pru_soreceive {
 
 struct netmsg_pru_sopoll {
     struct lwkt_msg    nm_lmsg;
-    netisr_fn_t                nm_handler;
     pru_sopoll_fn_t    nm_prufn;
     struct socket      *nm_so;
     int                        nm_events;
index db61674..e65333a 100644 (file)
@@ -2,7 +2,7 @@
  * Copyright (c) 2003 Jeffrey Hsu
  * All rights reserved.
  *
- * $DragonFly: src/sys/netinet/ip_demux.c,v 1.18 2004/04/10 00:10:42 hsu Exp $
+ * $DragonFly: src/sys/netinet/ip_demux.c,v 1.19 2004/04/20 01:52:28 dillon Exp $
  */
 
 #include "opt_inet.h"
@@ -258,26 +258,26 @@ tcp_cport(int cpu)
 /*
  * We must construct a custom putport function (which runs in the context
  * of the message originator)
+ *
  * Our custom putport must check for self-referential messages, which can
  * occur when the so_upcall routine is called (e.g. nfs).  Self referential
- * messages are simply executed synchronously.
+ * messages are executed synchronously.  However, we must panic if the message
+ * is not marked DONE on completion because the self-referential case cannot
+ * block without deadlocking.
  */
 static int
 netmsg_put_port(lwkt_port_t port, lwkt_msg_t lmsg)
 {
-    /*
-     * If it's a synchronous message for the same thread,
-     * execute it directly.
-     */
-    if (!(lmsg->ms_flags & MSGF_ASYNC) && port->mp_td == curthread) {
-       struct netmsg *msg = (struct netmsg *)lmsg;
-
-       msg->nm_handler(msg);
+    int error;
+
+    if ((lmsg->ms_flags & MSGF_ASYNC) == 0 && port->mp_td == curthread) {
+       error = lmsg->ms_cmd.cm_func(lmsg);
+       if (error == EASYNC && (lmsg->ms_flags & MSGF_DONE) == 0)
+           panic("netmsg_put_port: self-referential deadlock on netport");
+       return(error);
     } else {
-        lwkt_default_putport(port, lmsg);
+       return(lwkt_default_putport(port, lmsg));
     }
-
-    return (EASYNC);
 }
 
 void
index 5ce02db..38dc58d 100644 (file)
@@ -32,7 +32,7 @@
  *
  *     @(#)ip_input.c  8.2 (Berkeley) 1/4/94
  * $FreeBSD: src/sys/netinet/ip_input.c,v 1.130.2.52 2003/03/07 07:01:28 silby Exp $
- * $DragonFly: src/sys/netinet/ip_input.c,v 1.18 2004/04/13 00:14:01 hsu Exp $
+ * $DragonFly: src/sys/netinet/ip_input.c,v 1.19 2004/04/20 01:52:28 dillon Exp $
  */
 
 #define        _IP_VHL
@@ -331,25 +331,24 @@ transport_processing_oncpu(struct mbuf *m, int hlen, struct ip *ip,
 
 struct netmsg_transport_packet {
        struct lwkt_msg         nm_lmsg;
-       netisr_fn_t             nm_handler;
        struct mbuf             *nm_mbuf;
        int                     nm_hlen;
        boolean_t               nm_hasnexthop;
        struct sockaddr_in      nm_nexthop;
 };
 
-static void
-transport_processing_handler(struct netmsg *msg0)
+static int
+transport_processing_handler(lwkt_msg_t lmsg)
 {
-       struct netmsg_transport_packet *msg =
-           (struct netmsg_transport_packet *)msg0;
+       struct netmsg_transport_packet *msg = (void *)lmsg;
        struct sockaddr_in *nexthop;
        struct ip *ip;
 
        ip = mtod(msg->nm_mbuf, struct ip *);
        nexthop = msg->nm_hasnexthop ? &msg->nm_nexthop : NULL;
        transport_processing_oncpu(msg->nm_mbuf, msg->nm_hlen, ip, nexthop);
-       lwkt_replymsg(&msg0->nm_lmsg, 0);
+       lwkt_replymsg(lmsg, 0);
+       return(EASYNC);
 }
 
 static void
@@ -995,9 +994,10 @@ DPRINTF(("ip_input: no SP, packet discarded\n"));/*XXX*/
                    M_LWKTMSG, M_NOWAIT);
                if (!msg)
                        goto bad;
-               lwkt_initmsg_rp(&msg->nm_lmsg, &netisr_afree_rport,
-                   CMD_NETMSG_ONCPU);
-               msg->nm_handler = transport_processing_handler;
+
+               lwkt_initmsg(&msg->nm_lmsg, &netisr_afree_rport, 0,
+                       lwkt_cmd_func(transport_processing_handler),
+                       lwkt_cmd_op_none);
                msg->nm_mbuf = m;
                msg->nm_hlen = hlen;
                msg->nm_hasnexthop = (args.next_hop != NULL);
index 7196f72..030366b 100644 (file)
@@ -32,7 +32,7 @@
  *
  *     @(#)tcp_subr.c  8.2 (Berkeley) 5/24/95
  * $FreeBSD: src/sys/netinet/tcp_subr.c,v 1.73.2.31 2003/01/24 05:11:34 sam Exp $
- * $DragonFly: src/sys/netinet/tcp_subr.c,v 1.26 2004/04/18 20:05:09 hsu Exp $
+ * $DragonFly: src/sys/netinet/tcp_subr.c,v 1.27 2004/04/20 01:52:28 dillon Exp $
  */
 
 #include "opt_compat.h"
@@ -794,17 +794,17 @@ tcp_drain_oncpu(struct inpcbhead *head)
 #ifdef SMP
 struct netmsg_tcp_drain {
        struct lwkt_msg         nm_lmsg;
-       netisr_fn_t             nm_handler;
        struct inpcbhead        *nm_head;
 };
 
-static void
-tcp_drain_handler(struct netmsg *msg0)
+static int
+tcp_drain_handler(lwkt_msg_t lmsg)
 {
-       struct netmsg_tcp_drain *nm = (struct netmsg_tcp_drain *)msg0;
+       struct netmsg_tcp_drain *nm = (void *)lmsg;
 
        tcp_drain_oncpu(nm->nm_head);
-       lwkt_replymsg(&msg0->nm_lmsg, 0);
+       lwkt_replymsg(lmsg, 0);
+       return(EASYNC);
 }
 #endif
 
@@ -837,9 +837,9 @@ tcp_drain()
                            M_LWKTMSG, M_NOWAIT);
                        if (!msg)
                                continue;
-                       lwkt_initmsg_rp(&msg->nm_lmsg, &netisr_afree_rport,
-                           CMD_NETMSG_ONCPU);
-                       msg->nm_handler = tcp_drain_handler;
+                       lwkt_initmsg(&msg->nm_lmsg, &netisr_afree_rport, 0,
+                               lwkt_cmd_func(tcp_drain_handler),
+                               lwkt_cmd_op_none);
                        msg->nm_head = &tcbinfo[cpu].listhead;
                        lwkt_sendmsg(tcp_cport(cpu), &msg->nm_lmsg);
                }
index 71e465d..f96a0f8 100644 (file)
@@ -32,7 +32,7 @@
  *
  *     From: @(#)tcp_usrreq.c  8.2 (Berkeley) 1/3/94
  * $FreeBSD: src/sys/netinet/tcp_usrreq.c,v 1.51.2.17 2002/10/11 11:46:44 ume Exp $
- * $DragonFly: src/sys/netinet/tcp_usrreq.c,v 1.15 2004/04/13 07:10:34 hsu Exp $
+ * $DragonFly: src/sys/netinet/tcp_usrreq.c,v 1.16 2004/04/20 01:52:28 dillon Exp $
  */
 
 #include "opt_ipsec.h"
@@ -700,7 +700,6 @@ struct pr_usrreqs tcp6_usrreqs = {
 
 struct netmsg_tcp_connect {
        struct lwkt_msg         nm_lmsg;
-       netisr_fn_t             nm_handler;
        struct tcpcb            *nm_tp;
        struct sockaddr_in      *nm_sin;
        struct sockaddr_in      *nm_ifsin;
@@ -771,16 +770,21 @@ tcp_connect_oncpu(struct tcpcb *tp, struct sockaddr_in *sin,
        return (0);
 }
 
-static void
-tcp_connect_handler(struct netmsg *msg0)
+#if defined(SMP)
+
+static int
+tcp_connect_handler(lwkt_msg_t lmsg)
 {
-       struct netmsg_tcp_connect *msg = (struct netmsg_tcp_connect *)msg0;
+       struct netmsg_tcp_connect *msg = (void *)lmsg;
        int error;
 
        error = tcp_connect_oncpu(msg->nm_tp, msg->nm_sin, msg->nm_ifsin);
-       lwkt_replymsg(&msg0->nm_lmsg, error);
+       lwkt_replymsg(lmsg, error);
+       return(EASYNC);
 }
 
+#endif
+
 /*
  * Common subroutine to open a TCP connection to remote host specified
  * by struct sockaddr_in in mbuf *nam.  Call in_pcbbind to assign a local
@@ -799,7 +803,7 @@ tcp_connect(struct tcpcb *tp, struct sockaddr *nam, struct thread *td)
        struct sockaddr_in *if_sin;
        int error;
        boolean_t didbind = FALSE;
-#ifdef SMP
+#if defined(SMP)
        lwkt_port_t port;
 #endif
 
@@ -819,7 +823,7 @@ tcp_connect(struct tcpcb *tp, struct sockaddr *nam, struct thread *td)
        if (error)
                return (error);
 
-#ifdef SMP
+#if defined(SMP)
        port = tcp_addrport(sin->sin_addr.s_addr, sin->sin_port,
            inp->inp_laddr.s_addr ?
                inp->inp_laddr.s_addr : if_sin->sin_addr.s_addr,
@@ -838,8 +842,9 @@ tcp_connect(struct tcpcb *tp, struct sockaddr *nam, struct thread *td)
                        }
                        return (ENOMEM);
                }
-               lwkt_initmsg(&msg->nm_lmsg, CMD_NETMSG_ONCPU);
-               msg->nm_handler = tcp_connect_handler;
+               lwkt_initmsg(&msg->nm_lmsg, &curthread->td_msgport, 0,
+                       lwkt_cmd_func(tcp_connect_handler),
+                       lwkt_cmd_op_none);
                msg->nm_tp = tp;
                msg->nm_sin = sin;
                msg->nm_ifsin = if_sin;
index 140bac9..49b408d 100644 (file)
@@ -36,7 +36,7 @@
  *
  *     from: @(#)trap.c        7.4 (Berkeley) 5/13/91
  * $FreeBSD: src/sys/i386/i386/trap.c,v 1.147.2.11 2003/02/27 19:09:59 luoqi Exp $
- * $DragonFly: src/sys/platform/pc32/i386/trap.c,v 1.49 2004/04/10 20:55:20 dillon Exp $
+ * $DragonFly: src/sys/platform/pc32/i386/trap.c,v 1.50 2004/04/20 01:52:17 dillon Exp $
  */
 
 /*
@@ -1282,7 +1282,8 @@ syscall2(struct trapframe frame)
         * results are returned.  Since edx is loaded from fds[1] when the 
         * system call returns we pre-set it here.
         */
-       lwkt_initmsg_rp(&args.lmsg, &td->td_msgport, code);
+       lwkt_initmsg(&args.lmsg, &td->td_msgport, 0,
+                       lwkt_cmd_op(code), lwkt_cmd_op_none);
        args.sysmsg_copyout = NULL;
        args.sysmsg_fds[0] = 0;
        args.sysmsg_fds[1] = frame.tf_edx;
@@ -1532,8 +1533,9 @@ sendsys2(struct trapframe frame)
         * Initialize the kernel message from the copied-in data and
         * pull in appropriate flags from the userland message.
         */
-       lwkt_initmsg_rp(&sysun->lmsg, &td->td_msgport, 
-           sysun->nosys.usrmsg.umsg.ms_cmd);
+       lwkt_initmsg(&sysun->lmsg, &td->td_msgport, 0,
+                       sysun->nosys.usrmsg.umsg.ms_cmd,
+                       lwkt_cmd_op_none);
        sysun->sysmsg_copyout = NULL;
        sysun->lmsg.opaque.ms_umsg = umsg;
        sysun->lmsg.ms_flags |= sysun->nosys.usrmsg.umsg.ms_flags & MSGF_ASYNC;
@@ -1542,7 +1544,7 @@ sendsys2(struct trapframe frame)
         * Extract the system call number, lookup the system call, and
         * set the default return value.
         */
-       code = (u_int)sysun->lmsg.ms_cmd;
+       code = (u_int)sysun->lmsg.ms_cmd.cm_op;
        if (code >= p->p_sysent->sv_size) {
                error = ENOSYS;
                goto bad1;
index 3ac5305..e5b386b 100644 (file)
@@ -3,7 +3,7 @@
  *
  *     Implements LWKT messages and ports.
  * 
- * $DragonFly: src/sys/sys/msgport.h,v 1.16 2004/04/15 00:50:05 dillon Exp $
+ * $DragonFly: src/sys/sys/msgport.h,v 1.17 2004/04/20 01:52:24 dillon Exp $
  */
 
 #ifndef _SYS_MSGPORT_H_
@@ -25,6 +25,18 @@ typedef struct lwkt_port     *lwkt_port_t;
 
 typedef TAILQ_HEAD(lwkt_msg_queue, lwkt_msg) lwkt_msg_queue;
 
+/*
+ * LWKT command message operator type.  This type holds a message's
+ * 'command'.  The command format is opaque to the LWKT messaging system,
+ * meaning that it is specific to whatever convention the API chooses.
+ * By convention lwkt_cmd_t is passed by value and is expected to
+ * efficiently fit into a machine register.
+ */
+typedef union lwkt_cmd {
+    int                cm_op;
+    int                (*cm_func)(lwkt_msg_t msg);
+} lwkt_cmd_t;
+
 /*
  * The standard message and port structure for communications between
  * threads.  See kern/lwkt_msgport.c for documentation on how messages and
@@ -44,10 +56,14 @@ typedef TAILQ_HEAD(lwkt_msg_queue, lwkt_msg) lwkt_msg_queue;
  * a forwarding or reply op).  An abort may cause a reply to be delayed
  * until the abort catches up to it.
  *
- * Finally, note that an abort can requeue a message to its current target
- * port after the message has been pulled off of it, so you CANNOT use
- * ms_node for your own purposes after you have pulled a message request
- * off its port.
+ * Messages which support an abort will have MSGF_ABORTABLE set, indicating
+ * that the ms_abort field has been initialized.  An abort will cause a
+ * message to be requeued to the target port so the target sees the same
+ * message twice:  once during initial processing of the message, and a
+ * second time to process the abort request.  lwkt_getport() will detect
+ * the requeued abort and will copy ms_abort into ms_cmd before returning
+ * the requeued message the second time.  This makes target processing a 
+ * whole lot less complex.
  *
  * NOTE! 64-bit-align this structure.
  */
@@ -61,7 +77,8 @@ typedef struct lwkt_msg {
     lwkt_port_t ms_target_port;                /* current target or relay port */
     lwkt_port_t        ms_reply_port;          /* async replies returned here */
     lwkt_port_t ms_abort_port;         /* abort chasing port */
-    int                ms_cmd;                 /* message command */
+    lwkt_cmd_t ms_cmd;                 /* message command operator */
+    lwkt_cmd_t ms_abort;               /* message abort operator */
     int                ms_flags;               /* message flags */
 #define ms_copyout_start       ms_msgsize
     int                ms_msgsize;             /* size of message */
@@ -85,14 +102,15 @@ typedef struct lwkt_msg {
 #define MSGF_REPLY1    0x0002          /* asynch message has been returned */
 #define MSGF_QUEUED    0x0004          /* message has been queued sanitychk */
 #define MSGF_ASYNC     0x0008          /* sync/async hint */
-#define MSGF_ABORTED   0x0010          /* message was aborted flag */
+#define MSGF_ABORTED   0x0010          /* indicate pending abort */
 #define MSGF_PCATCH    0x0020          /* catch proc signal while waiting */
 #define MSGF_REPLY2    0x0040          /* reply processed by rport cpu */
+#define MSGF_ABORTABLE 0x0080          /* message supports abort */
+#define MSGF_RETRIEVED 0x0100          /* message retrieved on target */
 
 #define MSG_CMD_CDEV   0x00010000
 #define MSG_CMD_VFS    0x00020000
 #define MSG_CMD_SYSCALL        0x00030000
-#define MSG_CMD_NETMSG 0x00040000
 #define MSG_SUBCMD_MASK        0x0000FFFF
 
 #ifdef _KERNEL
index 7302ba9..7ea26bb 100644 (file)
@@ -3,7 +3,7 @@
  *
  *     Implements Inlines for LWKT messages and ports.
  * 
- * $DragonFly: src/sys/sys/msgport2.h,v 1.8 2004/04/15 00:50:05 dillon Exp $
+ * $DragonFly: src/sys/sys/msgport2.h,v 1.9 2004/04/20 01:52:24 dillon Exp $
  */
 
 #ifndef _SYS_MSGPORT2_H_
 #include <sys/thread2.h>
 #endif
 
+#define lwkt_cmd_op_none       lwkt_cmd_op(0)
+
+typedef int (*lwkt_cmd_func_t)(lwkt_msg_t);
+
+/*
+ * Initialize a LWKT message structure.  Note that if the message supports
+ * an abort MSGF_ABORTABLE must be passed in flags and an abort command
+ * supplied.  If abort is not supported then lwkt_cmd_op_none is passed as
+ * the abort command argument by convention.
+ */
 static __inline
 void
-lwkt_initmsg(lwkt_msg_t msg, int cmd)
+lwkt_initmsg(lwkt_msg_t msg, lwkt_port_t rport, int flags, 
+               lwkt_cmd_t cmd, lwkt_cmd_t abort)
 {
-    msg->ms_cmd = cmd;
-    msg->ms_flags = MSGF_DONE;
-    msg->ms_reply_port = &curthread->td_msgport;
+    msg->ms_cmd = cmd;         /* opaque */
+    if (flags & MSGF_ABORTABLE)        /* constant optimized conditional */
+       msg->ms_abort = abort;  /* opaque */
+    msg->ms_flags = MSGF_DONE | flags;
+    msg->ms_reply_port = rport;
     msg->ms_msgsize = 0;
 }
 
+/*
+ * These inlines convert specific types to the lwkt_cmd_t type.  The compiler
+ * should be able to optimize this whole mess out.
+ */
+static __inline
+lwkt_cmd_t
+lwkt_cmd_op(int op)
+{
+    lwkt_cmd_t cmd;
+
+    cmd.cm_op = op;
+    return(cmd);
+}
+
+static __inline
+lwkt_cmd_t
+lwkt_cmd_func(int (*func)(lwkt_msg_t))
+{
+    lwkt_cmd_t cmd;
+
+    cmd.cm_func = func;
+    return(cmd);
+}
+
 static __inline
 void
-lwkt_initmsg_rp(lwkt_msg_t msg, lwkt_port_t rport, int cmd)
+lwkt_initmsg_simple(lwkt_msg_t msg, int op)
 {
-    msg->ms_cmd = cmd;
-    msg->ms_flags = MSGF_DONE;
-    msg->ms_reply_port = rport;
-    msg->ms_msgsize = 0;
+    lwkt_initmsg(msg, &curthread->td_msgport, 0,
+       lwkt_cmd_op(op), lwkt_cmd_op(0));
 }
 
 static __inline
 void
 lwkt_reinitmsg(lwkt_msg_t msg, lwkt_port_t rport)
 {
-    msg->ms_flags = (msg->ms_flags & MSGF_ASYNC) | MSGF_DONE;
+    msg->ms_flags = (msg->ms_flags & (MSGF_ASYNC | MSGF_ABORTABLE)) | MSGF_DONE;
     msg->ms_reply_port = rport;
 }
 
@@ -60,13 +95,11 @@ static __inline
 void
 lwkt_replymsg(lwkt_msg_t msg, int error)
 {   
-    lwkt_port_t port = msg->ms_reply_port;
+    lwkt_port_t port;
 
-    crit_enter();
     msg->ms_error = error;
-    msg->ms_flags |= MSGF_REPLY1;
+    port = msg->ms_reply_port;
     port->mp_replyport(port, msg);
-    crit_exit();
 }
 
 static __inline