kernel - Beef up lwkt_dropmsg() API and fix deadlock in so_async_rcvd*()
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 26 Feb 2013 00:49:01 +0000 (16:49 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 26 Feb 2013 00:49:01 +0000 (16:49 -0800)
* Beef up the lwkt_dropmsg() API.  The API now conditionally returns
  success (0) or an error (ENOENT).

* so_pru_rcvd_async() improperly calls lwkt_sendmsg() with a spinlock
  held.  This is not legal.  Hack up lwkt_sendmsg() a bit to resolve.

sys/kern/lwkt_msgport.c
sys/kern/uipc_msg.c
sys/net/bridge/bridgestp.c
sys/net/bridge/if_bridge.c
sys/net/ipfw/ip_fw2.c
sys/netinet/ip_carp.c
sys/netinet/tcp_output.c
sys/netinet/tcp_timer.c
sys/sys/msgport.h
sys/sys/msgport2.h

index b954834..37c5fef 100644 (file)
@@ -106,6 +106,28 @@ lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
     }
 }
 
+void
+lwkt_sendmsg_stage1(lwkt_port_t port, lwkt_msg_t msg)
+{
+    KKASSERT(msg->ms_reply_port != NULL &&
+            (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
+    msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
+}
+
+void
+lwkt_sendmsg_stage2(lwkt_port_t port, lwkt_msg_t msg)
+{
+    int error;
+
+    if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
+       /*
+        * Target port opted to execute the message synchronously so
+        * queue the response.
+        */
+       lwkt_replymsg(msg, error);
+    }
+}
+
 /*
  * lwkt_domsg()
  *
@@ -197,14 +219,14 @@ static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
-static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
+static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
 
 static void *lwkt_spin_getport(lwkt_port_t port);
 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
-static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
+static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
 
 static void *lwkt_serialize_getport(lwkt_port_t port);
 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
@@ -218,7 +240,7 @@ static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
-static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
+static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
 
 /*
  * Core port initialization (internal)
@@ -231,7 +253,7 @@ _lwkt_initport(lwkt_port_t port,
               int (*wmsgfn)(lwkt_msg_t, int),
               void *(*wportfn)(lwkt_port_t, int),
               void (*rportfn)(lwkt_port_t, lwkt_msg_t),
-              void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
+              int (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
 {
     bzero(port, sizeof(*port));
     TAILQ_INIT(&port->mp_msgq);
@@ -286,7 +308,7 @@ lwkt_initport_thread(lwkt_port_t port, thread_t td)
 void
 lwkt_initport_spin(lwkt_port_t port, thread_t td)
 {
-    void (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
+    int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
 
     if (td == NULL)
        dmsgfn = lwkt_panic_dropmsg;
@@ -556,16 +578,25 @@ lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
  * This function could _only_ be used when caller is in the same thread
  * as the message's target port owner thread.
  */
-static void
+static int
 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
 {
+    int error;
+
     KASSERT(port->mpu_td == curthread,
            ("message could only be dropped in the same thread "
             "as the message target port thread"));
     crit_enter_quick(port->mpu_td);
-    _lwkt_pullmsg(port, msg);
-    atomic_set_int(&msg->ms_flags, MSGF_DONE);
+    if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
+           _lwkt_pullmsg(port, msg);
+           atomic_set_int(&msg->ms_flags, MSGF_DONE);
+           error = 0;
+    } else {
+           error = ENOENT;
+    }
     crit_exit_quick(port->mpu_td);
+
+    return (error);
 }
 
 /*
@@ -936,16 +967,25 @@ lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
  * This function could _only_ be used when caller is in the same thread
  * as the message's target port owner thread.
  */
-static void
+static int
 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
 {
+    int error;
+
     KASSERT(port->mpu_td == curthread,
            ("message could only be dropped in the same thread "
             "as the message target port thread\n"));
     spin_lock(&port->mpu_spin);
-    _lwkt_pullmsg(port, msg);
-    msg->ms_flags |= MSGF_DONE;
+    if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
+           _lwkt_pullmsg(port, msg);
+           msg->ms_flags |= MSGF_DONE;
+           error = 0;
+    } else {
+           error = ENOENT;
+    }
     spin_unlock(&port->mpu_spin);
+
+    return (error);
 }
 
 /************************************************************************
@@ -1157,8 +1197,10 @@ lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
 }
 
 static
-void
+int
 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
 {
     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
+    /* NOT REACHED */
+    return (ENOENT);
 }
index 3a7c11d..7687761 100644 (file)
@@ -45,7 +45,9 @@
 #include <sys/spinlock2.h>
 #include <sys/mbuf.h>
 #include <vm/pmap.h>
+
 #include <net/netmsg2.h>
+#include <sys/socketvar2.h>
 
 #include <net/netisr.h>
 #include <net/netmsg.h>
@@ -310,10 +312,20 @@ so_pru_rcvd_async(struct socket *so)
        KASSERT(so->so_proto->pr_flags & PR_ASYNC_RCVD,
            ("async pru_rcvd is not supported"));
 
+       /*
+        * WARNING!  Spinlock is a bit dodgy, use hacked up sendmsg
+        *           to avoid deadlocking.
+        */
        spin_lock(&so->so_rcvd_spin);
        if ((so->so_rcvd_msg.nm_pru_flags & PRUR_DEAD) == 0) {
-               if (lmsg->ms_flags & MSGF_DONE)
-                       lwkt_sendmsg(so->so_port, lmsg);
+               if (lmsg->ms_flags & MSGF_DONE) {
+                       soreference(so);
+                       lwkt_sendmsg_stage1(so->so_port, lmsg);
+                       spin_unlock(&so->so_rcvd_spin);
+                       lwkt_sendmsg_stage2(so->so_port, lmsg);
+               } else {
+                       spin_unlock(&so->so_rcvd_spin);
+               }
        } else {
                static int deadlog = 0;
 
@@ -321,8 +333,8 @@ so_pru_rcvd_async(struct socket *so)
                        kprintf("async rcvd is dead\n");
                        deadlog = 1;
                }
+               spin_unlock(&so->so_rcvd_spin);
        }
-       spin_unlock(&so->so_rcvd_spin);
 }
 
 int
@@ -595,9 +607,13 @@ netmsg_so_notify_abort(netmsg_t msg)
 void
 so_async_rcvd_reply(struct socket *so)
 {
+       /*
+        * Spinlock safe, reply runs to degenerate lwkt_null_replyport()
+        */
        spin_lock(&so->so_rcvd_spin);
        lwkt_replymsg(&so->so_rcvd_msg.base.lmsg, 0);
        spin_unlock(&so->so_rcvd_spin);
+       sofree(so);
 }
 
 void
@@ -605,9 +621,18 @@ so_async_rcvd_drop(struct socket *so)
 {
        lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg;
 
+       /*
+        * Spinlock safe, reply runs to degenerate lwkt_spin_dropmsg()
+        */
        spin_lock(&so->so_rcvd_spin);
-       if ((lmsg->ms_flags & MSGF_DONE) == 0)
-               lwkt_dropmsg(lmsg);
+       if (lwkt_dropmsg(lmsg) == 0)
+               sofree(so);
        so->so_rcvd_msg.nm_pru_flags |= PRUR_DEAD;
        spin_unlock(&so->so_rcvd_spin);
+       if ((lmsg->ms_flags & MSGF_DONE) == 0) {
+               kprintf("Warning: tcp: so_async_rcvd_drop() raced message\n");
+               while ((lmsg->ms_flags & MSGF_DONE) == 0) {
+                       tsleep(so, 0, "soadrop", 1);
+               }
+       }
 }
index 85b35c3..1684a6b 100644 (file)
@@ -1224,7 +1224,6 @@ void
 bstp_stop(struct bridge_softc *sc)
 {
        struct bridge_iflist *bif;
-       struct lwkt_msg *lmsg;
 
        KKASSERT(&curthread->td_msgport == BRIDGE_CFGPORT);
 
@@ -1243,11 +1242,7 @@ bstp_stop(struct bridge_softc *sc)
        bstp_timer_stop(&sc->sc_hello_timer);
 
        crit_enter();
-       lmsg = &sc->sc_bstptimemsg.lmsg;
-       if ((lmsg->ms_flags & MSGF_DONE) == 0) {
-               /* Pending to be processed; drop it */
-               lwkt_dropmsg(lmsg);
-       }
+       lwkt_dropmsg(&sc->sc_bstptimemsg.lmsg);
        crit_exit();
 }
 
index c9d8131..429c72c 100644 (file)
@@ -1113,7 +1113,6 @@ static int
 bridge_ioctl_stop(struct bridge_softc *sc, void *arg __unused)
 {
        struct ifnet *ifp = sc->sc_ifp;
-       struct lwkt_msg *lmsg;
 
        if ((ifp->if_flags & IFF_RUNNING) == 0)
                return 0;
@@ -1121,11 +1120,7 @@ bridge_ioctl_stop(struct bridge_softc *sc, void *arg __unused)
        callout_stop(&sc->sc_brcallout);
 
        crit_enter();
-       lmsg = &sc->sc_brtimemsg.lmsg;
-       if ((lmsg->ms_flags & MSGF_DONE) == 0) {
-               /* Pending to be processed; drop it */
-               lwkt_dropmsg(lmsg);
-       }
+       lwkt_dropmsg(&sc->sc_brtimemsg.lmsg);
        crit_exit();
 
        bstp_stop(sc);
index 39d90a2..eff36e4 100644 (file)
@@ -4483,12 +4483,7 @@ ipfw_fini_dispatch(netmsg_t nmsg)
        netmsg_service_sync();
 
        crit_enter();
-       if ((ipfw_timeout_netmsg.lmsg.ms_flags & MSGF_DONE) == 0) {
-               /*
-                * Callout message is pending; drop it
-                */
-               lwkt_dropmsg(&ipfw_timeout_netmsg.lmsg);
-       }
+       lwkt_dropmsg(&ipfw_timeout_netmsg.lmsg);
        crit_exit();
 
        ip_fw_chk_ptr = NULL;
index 6ea6253..97489c6 100644 (file)
@@ -673,10 +673,8 @@ carp_clone_destroy_dispatch(netmsg_t msg)
        callout_stop_sync(&sc->sc_md6_tmo);
 
        crit_enter();
-       if ((sc->sc_ad_msg.base.lmsg.ms_flags & MSGF_DONE) == 0)
-               lwkt_dropmsg(&sc->sc_ad_msg.base.lmsg);
-       if ((sc->sc_md_msg.base.lmsg.ms_flags & MSGF_DONE) == 0)
-               lwkt_dropmsg(&sc->sc_md_msg.base.lmsg);
+       lwkt_dropmsg(&sc->sc_ad_msg.base.lmsg);
+       lwkt_dropmsg(&sc->sc_md_msg.base.lmsg);
        crit_exit();
 
        lwkt_replymsg(&cmsg->base.lmsg, 0);
index 2d01519..d052b73 100644 (file)
@@ -1442,12 +1442,12 @@ tcp_output_init(struct tcpcb *tp)
 void
 tcp_output_cancel(struct tcpcb *tp)
 {
+       /*
+        * This message is still pending to be processed;
+        * drop it.  Optimized.
+        */
        crit_enter();
        if ((tp->tt_sndmore->lmsg.ms_flags & MSGF_DONE) == 0) {
-               /*
-                * This message is still pending to be processed;
-                * drop it.
-                */
                lwkt_dropmsg(&tp->tt_sndmore->lmsg);
        }
        crit_exit();
index 22d39a2..476a8f7 100644 (file)
@@ -757,12 +757,13 @@ tcp_destroy_timermsg(struct tcpcb *tp)
                return;
 
        KKASSERT(tmsg->tt_cpuid == mycpuid);
+
+       /*
+        * This message is still pending to be processed;
+        * drop it.  Optimized.
+        */
        crit_enter();
        if ((tmsg->tt_msg.lmsg.ms_flags & MSGF_DONE) == 0) {
-               /*
-                * This message is still pending to be processed;
-                * drop it.
-                */
                lwkt_dropmsg(&tmsg->tt_msg.lmsg);
        }
        crit_exit();
index 1cf38a7..1579eea 100644 (file)
@@ -144,6 +144,7 @@ MALLOC_DECLARE(M_LWKTMSG);
  *     - drop a specific message from the specified port.  Currently only
  *       threads' embedded ports (thread ports or spin ports) support this
  *        function and must be used in the port's owner thread.
+ *       (returns 0 on success, ENOENT on error).
  *
  * The use of mpu_td and mp_u.spin is specific to the port callback function
  * set.  Default ports are tied to specific threads and use cpu locality
@@ -168,7 +169,7 @@ typedef struct lwkt_port {
     int                        (*mp_waitmsg)(lwkt_msg_t, int flags);
     void *             (*mp_waitport)(lwkt_port_t, int flags);
     void               (*mp_replyport)(lwkt_port_t, lwkt_msg_t);
-    void               (*mp_dropmsg)(lwkt_port_t, lwkt_msg_t);
+    int                        (*mp_dropmsg)(lwkt_port_t, lwkt_msg_t);
 } lwkt_port;
 
 #ifdef _KERNEL
@@ -205,6 +206,8 @@ void lwkt_initport_putonly(lwkt_port_t,
                                int (*pportfn)(lwkt_port_t, lwkt_msg_t));
 
 void lwkt_sendmsg(lwkt_port_t, lwkt_msg_t);
+void lwkt_sendmsg_stage1(lwkt_port_t, lwkt_msg_t);
+void lwkt_sendmsg_stage2(lwkt_port_t, lwkt_msg_t);
 int lwkt_domsg(lwkt_port_t, lwkt_msg_t, int);
 int lwkt_forwardmsg(lwkt_port_t, lwkt_msg_t);
 void lwkt_abortmsg(lwkt_msg_t);
index c4a6610..e8195fa 100644 (file)
@@ -101,15 +101,17 @@ lwkt_checkmsg(lwkt_msg_t msg)
 }
 
 static __inline
-void
+int
 lwkt_dropmsg(lwkt_msg_t msg)
 {
     lwkt_port_t port;
+    int error = ENOENT;
 
-    KKASSERT((msg->ms_flags & (MSGF_DROPABLE | MSGF_DONE | MSGF_QUEUED)) ==
-            (MSGF_DROPABLE | MSGF_QUEUED));
+    KKASSERT(msg->ms_flags & MSGF_DROPABLE);
     port = msg->ms_target_port;
-    port->mp_dropmsg(port, msg);
+    if (port)
+           error = port->mp_dropmsg(port, msg);
+    return (error);
 }
 
 #endif /* _KERNEL */