kernel - Fix network lockup due to msgport bug
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 22 Feb 2013 02:14:45 +0000 (18:14 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 22 Feb 2013 02:14:45 +0000 (18:14 -0800)
* Netisr threads (i.e. arp thread) which issue route table updates
  use a synchronous netmsg from a 'spin' type port to a 'thread' type
  port.

  When going spin->thread, the lwkt_thread_putport*() code was not
  using an atomic op to manipulate ms_flags.  This could interfere
  with the originator on the spin port issuing a lwkt_spin_waitmsg()
  and cause one or more flags to be lost.

  Ensure that lwkt_thread_putport*() uses atomic ops when manipulating
  ms_flags.

* Another serious issue is that the lwkt_*_waitmsg() code was testing
  MSGF_QUEUED outside of its port lock.  This flag can only be tested
  while the port is locked.

* lwkt_thread_replyport() must use an atomic op when setting
  MSGF_INTRANSIT and MSGF_REPLY to avoid SMP races on ms_flags
  updates.

* lwkt_thread_replyport() requires a critical section against
  possible preemption when adjusting ms_flags.

* lwkt_forwardmsg() does not need a critical section.

* Other notes: Not all ms_flags manipulation needs an atomic op.  For
  example, when initializing a new message or when a lock is held to
  rendezvous at a reply port when replying.  However, all 'put' and 'wait'
  interactions on messages absolutely require atomic ops when manipulating
  ms_flags.

  Finally, note that all msgport queue operations use atomic ops to
  adjust MSGF_QUEUED when adding or removing a message to a port queue.

sys/kern/lwkt_msgport.c

index 33a4eda..b954834 100644 (file)
@@ -85,6 +85,9 @@ MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
  *     NOTE: The message is in an indeterminant state until this call
  *     returns.  The caller should not mess with it (e.g. try to abort it)
  *     until then.
+ *
+ *     NOTE: Do not use this function to forward a message as we might
+ *     clobber ms_flags in a SMP race.
  */
 void
 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
@@ -111,6 +114,9 @@ lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
  *     asynchronously and this function will automatically block and
  *     wait for a response if the target executes the message
  *     asynchronously.
+ *
+ *     NOTE: Do not use this function to forward a message as we might
+ *     clobber ms_flags in a SMP race.
  */
 int
 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
@@ -143,11 +149,9 @@ lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
 {   
     int error;
 
-    crit_enter();
     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
     if ((error = port->mp_putport(port, msg)) != EASYNC)
        lwkt_replymsg(msg, error);
-    crit_exit();
     return(error);
 }
 
@@ -464,7 +468,7 @@ lwkt_thread_replyport_remote(lwkt_msg_t msg)
     }
 
     /*
-     * Cleanup
+     * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
      */
 #ifdef INVARIANTS
     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
@@ -507,16 +511,18 @@ lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
         * section is required.
         */
        if (port->mpu_td->td_gd == mycpu) {
+           crit_enter();
            flags = msg->ms_flags;
            cpu_sfence();
            msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
            if (port->mp_flags & MSGPORTF_WAITING)
                _lwkt_schedule_msg(port->mpu_td, flags);
+           crit_exit();
        } else {
 #ifdef INVARIANTS
-           msg->ms_flags |= MSGF_INTRANSIT;
+           atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
 #endif
-           msg->ms_flags |= MSGF_REPLY;
+           atomic_set_int(&msg->ms_flags, MSGF_REPLY);
            lwkt_send_ipiq(port->mpu_td->td_gd,
                           (ipifunc1_t)lwkt_thread_replyport_remote, msg);
        }
@@ -535,9 +541,9 @@ lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
            crit_exit();
        } else {
 #ifdef INVARIANTS
-           msg->ms_flags |= MSGF_INTRANSIT;
+           atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
 #endif
-           msg->ms_flags |= MSGF_REPLY;
+           atomic_set_int(&msg->ms_flags, MSGF_REPLY);
            lwkt_send_ipiq(port->mpu_td->td_gd,
                           (ipifunc1_t)lwkt_thread_replyport_remote, msg);
        }
@@ -558,7 +564,7 @@ lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
             "as the message target port thread"));
     crit_enter_quick(port->mpu_td);
     _lwkt_pullmsg(port, msg);
-    msg->ms_flags |= MSGF_DONE;
+    atomic_set_int(&msg->ms_flags, MSGF_DONE);
     crit_exit_quick(port->mpu_td);
 }
 
@@ -587,11 +593,13 @@ lwkt_thread_putport_remote(lwkt_msg_t msg)
     }
 
     /*
-     * Cleanup
+     * An atomic op is needed on ms_flags vs originator.  Also
+     * note that the originator might be using a different type
+     * of msgport.
      */
 #ifdef INVARIANTS
     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
-    msg->ms_flags &= ~MSGF_INTRANSIT;
+    atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
 #endif
     _lwkt_pushmsg(port, msg);
     if (port->mp_flags & MSGPORTF_WAITING)
@@ -613,7 +621,14 @@ lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
        crit_exit();
     } else {
 #ifdef INVARIANTS
-       msg->ms_flags |= MSGF_INTRANSIT;
+       /*
+        * Cleanup.
+        *
+        * An atomic op is needed on ms_flags vs originator.  Also
+        * note that the originator might be using a different type
+        * of msgport.
+        */
+       atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
 #endif
        lwkt_send_ipiq(port->mpu_td->td_gd,
                        (ipifunc1_t)lwkt_thread_putport_remote, msg);
@@ -653,6 +668,8 @@ static
 int
 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
 {
+    thread_t td = curthread;
+
     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
            ("can't wait dropable message"));
 
@@ -661,7 +678,6 @@ lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
         * If the done bit was not set we have to block until it is.
         */
        lwkt_port_t port = msg->ms_reply_port;
-       thread_t td = curthread;
        int sentabort;
 
        KKASSERT(port->mpu_td == td);
@@ -669,7 +685,7 @@ lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
        sentabort = 0;
 
        while ((msg->ms_flags & MSGF_DONE) == 0) {
-           port->mp_flags |= MSGPORTF_WAITING;
+           port->mp_flags |= MSGPORTF_WAITING; /* same cpu */
            if (sentabort == 0) {
                if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
                    lwkt_abortmsg(msg);
@@ -687,15 +703,15 @@ lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
         * If the done bit was set we only have to mess around with the
         * message if it is queued on the reply port.
         */
+       crit_enter_quick(td);
        if (msg->ms_flags & MSGF_QUEUED) {
            lwkt_port_t port = msg->ms_reply_port;
            thread_t td = curthread;
 
            KKASSERT(port->mpu_td == td);
-           crit_enter_quick(td);
            _lwkt_pullmsg(port, msg);
-           crit_exit_quick(td);
        }
+       crit_exit_quick(td);
     }
     return(msg->ms_error);
 }
@@ -791,9 +807,9 @@ lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
 
     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
            ("can't wait dropable message"));
+    port = msg->ms_reply_port;
 
     if ((msg->ms_flags & MSGF_DONE) == 0) {
-       port = msg->ms_reply_port;
        sentabort = 0;
        spin_lock(&port->mpu_spin);
        while ((msg->ms_flags & MSGF_DONE) == 0) {
@@ -803,6 +819,8 @@ lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
             * If message was sent synchronously from the beginning
             * the wakeup will be on the message structure, else it
             * will be on the port structure.
+            *
+            * ms_flags needs atomic op originator vs target MSGF_QUEUED
             */
            if (msg->ms_flags & MSGF_SYNC) {
                won = msg;
@@ -838,12 +856,11 @@ lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
            _lwkt_pullmsg(port, msg);
        spin_unlock(&port->mpu_spin);
     } else {
+       spin_lock(&port->mpu_spin);
        if (msg->ms_flags & MSGF_QUEUED) {
-           port = msg->ms_reply_port;
-           spin_lock(&port->mpu_spin);
            _lwkt_pullmsg(port, msg);
-           spin_unlock(&port->mpu_spin);
        }
+       spin_unlock(&port->mpu_spin);
     }
     return(msg->ms_error);
 }
@@ -882,12 +899,14 @@ lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
        /*
         * If a synchronous completion has been requested, just wakeup
         * the message without bothering to queue it to the target port.
+        *
+        * ms_flags protected by reply port spinlock
         */
        spin_lock(&port->mpu_spin);
        msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
        dowakeup = 0;
        if (msg->ms_flags & MSGF_WAITING) {
-               atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
+               msg->ms_flags &= ~MSGF_WAITING;
                dowakeup = 1;
        }
        spin_unlock(&port->mpu_spin);
@@ -1068,6 +1087,8 @@ lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
        /*
         * If a synchronous completion has been requested, just wakeup
         * the message without bothering to queue it to the target port.
+        *
+        * (both sides synchronized via serialized reply port)
         */
        msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
        wakeup(msg);