kernel - work on dmsg disk exports
authorMatthew Dillon <dillon@apollo.backplane.com>
Sun, 1 Mar 2015 19:41:41 +0000 (11:41 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sun, 1 Mar 2015 19:41:41 +0000 (11:41 -0800)
* Refactor and clean up the kdmsg thread cleanup code.

* Add a workaround for a possible tx/rx race which can occur due to the
  transmit side not retiring the kdmsg until after writing it.  This can
  potentially block and allow a received response to be processed before the
  related transmitted kdmsg is cleaned up.

* Add sysctls to track operations in progress.

* Code documentation.

sys/kern/kern_dmsg.c
sys/kern/subr_diskiocom.c
sys/sys/dmsg.h

index 51ca941..db95ad1 100644 (file)
@@ -32,7 +32,7 @@
  * SUCH DAMAGE.
  */
 /*
- * TODO: txcmd CREATE state is deferred by txmsgq, need to calculate
+ * TODO: txcmd CREATE state is deferred by tx msgq, need to calculate
  *      a streaming response.  See subr_diskiocom()'s diskiodone().
  */
 #include <sys/param.h>
@@ -107,7 +107,7 @@ kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
         * Destroy the current connection
         */
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX);
        while (iocom->msgrd_td || iocom->msgwr_td) {
                wakeup(&iocom->msg_ctl);
                lksleep(iocom, &iocom->msglk, 0, "clstrkl", hz);
@@ -233,7 +233,7 @@ kdmsg_iocom_uninit(kdmsg_iocom_t *iocom)
         * Ask the cluster controller to go away
         */
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX);
 
        while (iocom->msgrd_td || iocom->msgwr_td) {
                wakeup(&iocom->msg_ctl);
@@ -279,7 +279,7 @@ kdmsg_iocom_thread_rd(void *arg)
        size_t abytes;
        int error = 0;
 
-       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0) {
+       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILLRX) == 0) {
                /*
                 * Retrieve the message from the pipe or socket.
                 */
@@ -338,46 +338,28 @@ kdmsg_iocom_thread_rd(void *arg)
                msg = NULL;
        }
 
-       if (error)
-               kprintf("kdmsg: read failed error %d\n", error);
+       kprintf("kdmsg: read thread terminating error=%d\n", error);
 
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        if (msg)
                kdmsg_msg_free(msg);
 
        /*
-        * Shutdown the socket before waiting for the transmit side.
-        *
-        * If we are dying due to e.g. a socket disconnect verses being
-        * killed explicity we have to set KILL in order to kick the tx
-        * side when it might not have any other work to do.  KILL might
-        * already be set if we are in an unmount or reconnect.
+        * Shutdown the socket and set KILLRX for consistency in case the
+        * shutdown was not commanded.  Signal the transmit side to shutdown
+        * by setting KILLTX and waking it up.
         */
        fp_shutdown(iocom->msg_fp, SHUT_RDWR);
-
-       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
-       wakeup(&iocom->msg_ctl);
-
-       /*
-        * Wait for the transmit side to drain remaining messages
-        * before cleaning up the rx state.  The transmit side will
-        * set KILLTX and wait for the rx side to completely finish
-        * (set msgrd_td to NULL) before cleaning up any remaining
-        * tx states.
-        */
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX |
+                                       KDMSG_CLUSTERCTL_KILLTX);
+       iocom->msgrd_td = NULL;
        lockmgr(&iocom->msglk, LK_RELEASE);
-       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX);
        wakeup(&iocom->msg_ctl);
-       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILLTX) == 0) {
-               wakeup(&iocom->msg_ctl);
-               tsleep(iocom, 0, "clstrkw", hz);
-       }
-
-       iocom->msgrd_td = NULL;
 
        /*
-        * iocom can be ripped out from under us at this point but
-        * wakeup() is safe.
+        * iocom can be ripped out at any time once the lock is
+        * released with msgrd_td set to NULL.  The wakeup()s are safe but
+        * that is all.
         */
        wakeup(iocom);
        lwkt_exit();
@@ -393,7 +375,9 @@ kdmsg_iocom_thread_wr(void *arg)
        ssize_t res;
        size_t abytes;
        int error = 0;
-       int retries = 20;
+       int save_ticks;
+       int didwarn;
+       int didwork;
 
        /*
         * Transmit loop
@@ -401,7 +385,7 @@ kdmsg_iocom_thread_wr(void *arg)
        msg = NULL;
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
-       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0 && error == 0) {
+       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILLTX) == 0 && error == 0) {
                /*
                 * Sleep if no messages pending.  Interlock with flag while
                 * holding msglk.
@@ -468,81 +452,104 @@ kdmsg_iocom_thread_wr(void *arg)
                }
        }
 
-       /*
-        * Cleanup messages pending transmission and release msgq lock.
-        */
-       if (error)
-               kprintf("kdmsg: write failed error %d\n", error);
-       kprintf("thread_wr: Terminating iocom\n");
+       kprintf("kdmsg: write thread terminating error=%d\n", error);
 
        /*
-        * Shutdown the socket.  This will cause the rx thread to get an
-        * EOF and ensure that both threads get to a termination state.
+        * Shutdown the socket and set KILLTX for consistency in case the
+        * shutdown was not commanded.  Signal the receive side to shutdown
+        * by setting KILLRX and waking it up.
         */
        fp_shutdown(iocom->msg_fp, SHUT_RDWR);
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX |
+                                       KDMSG_CLUSTERCTL_KILLTX);
+       wakeup(&iocom->msg_ctl);
 
        /*
-        * Set KILLTX (which the rx side waits for), then wait for the RX
-        * side to completely finish before we clean out any remaining
-        * command states.
+        * The transmit thread is responsible for final cleanups, wait
+        * for the receive side to terminate to prevent new received
+        * states from interfering with our cleanup.
+        *
+        * Do not set msgwr_td to NULL until we actually exit.
         */
-       lockmgr(&iocom->msglk, LK_RELEASE);
-       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLTX);
-       wakeup(&iocom->msg_ctl);
        while (iocom->msgrd_td) {
                wakeup(&iocom->msg_ctl);
-               tsleep(iocom, 0, "clstrkw", hz);
+               lksleep(iocom, &iocom->msglk, 0, "clstrkt", hz);
        }
-       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
        /*
-        * Simulate received MSGF_DELETE's for any remaining states.
-        * (For remote masters).
-        *
-        * Drain the message queue to handle any device initiated writes
-        * due to state callbacks.
+        * Loop until all the states are gone and there are no messages
+        * pending transmit.
         */
-cleanuprd:
-       RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree)
-               atomic_set_int(&state->flags, KDMSG_STATE_DYING);
-       RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree)
-               atomic_set_int(&state->flags, KDMSG_STATE_DYING);
-       kdmsg_drain_msgq(iocom);
-       RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
-               if ((state->rxcmd & DMSGF_DELETE) == 0) {
-                       lockmgr(&iocom->msglk, LK_RELEASE);
-                       kdmsg_state_abort(state);
-                       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-                       goto cleanuprd;
+       save_ticks = ticks;
+       didwarn = 0;
+
+       while (TAILQ_FIRST(&iocom->msgq) ||
+              RB_ROOT(&iocom->staterd_tree) ||
+              RB_ROOT(&iocom->statewr_tree)) {
+               /*
+                * Drain the transmit msgq.
+                */
+               kdmsg_drain_msgq(iocom);
+
+               /*
+                * Flag any pending states that we are in the middle of
+                * a shutdown.
+                */
+               RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree)
+                       atomic_set_int(&state->flags, KDMSG_STATE_DYING);
+               RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree)
+                       atomic_set_int(&state->flags, KDMSG_STATE_DYING);
+
+               /*
+                * Simulate received message completions (with error) for
+                * all remaining states.  The transmit side is the
+                * responsibility of the service code so we may end up
+                * looping until things like e.g. I/O requests are complete.
+                */
+               didwork = 0;
+               RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
+                       if ((state->rxcmd & DMSGF_DELETE) == 0) {
+                               lockmgr(&iocom->msglk, LK_RELEASE);
+                               kdmsg_state_abort(state);
+                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                               didwork = 1;
+                               break;
+                       }
                }
-       }
 
-       /*
-        * Simulate received MSGF_DELETE's for any remaining states.
-        * (For local masters).
-        */
-       kdmsg_drain_msgq(iocom);
-       RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
-               if ((state->rxcmd & DMSGF_DELETE) == 0) {
-                       lockmgr(&iocom->msglk, LK_RELEASE);
-                       kdmsg_state_abort(state);
-                       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-                       goto cleanuprd;
+               RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
+                       if ((state->rxcmd & DMSGF_DELETE) == 0) {
+                               lockmgr(&iocom->msglk, LK_RELEASE);
+                               kdmsg_state_abort(state);
+                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                               didwork = 1;
+                               break;
+                       }
+               }
+
+               /*
+                * If we've gone through the whole thing sleep for ~1 second.
+                * States may not go away completely until service side actions
+                * finish handling any in-progress operations.
+                */
+               if (didwork == 0)
+                       lksleep(iocom, &iocom->msglk, 0, "clstrtk", hz);
+
+               if ((int)(ticks - save_ticks) > hz*2 && didwarn == 0) {
+                       didwarn = 1;
+                       kprintf("kdmsg: warning, write thread on %p still "
+                               "terminating\n", iocom);
+               }
+               if ((int)(ticks - save_ticks) > hz*60) {
+                       panic("kdmsg: write thread on %p could not terminate\n",
+                             iocom);
                }
        }
 
        /*
-        * Retry until all work is done
+        * Exit handling is done by the write thread.
         */
-       if (--retries == 0)
-               panic("kdmsg: comm thread shutdown couldn't drain");
-       if (TAILQ_FIRST(&iocom->msgq) ||
-           RB_ROOT(&iocom->staterd_tree) ||
-           RB_ROOT(&iocom->statewr_tree)) {
-               goto cleanuprd;
-       }
        iocom->flags |= KDMSG_IOCOMF_EXITNOACC;
-
        lockmgr(&iocom->msglk, LK_RELEASE);
 
        /*
@@ -737,6 +744,7 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
         */
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
+again:
        sdummy.msgid = msg->any.head.msgid;
        sdummy.iocom = iocom;
        if (msg->any.head.cmd & DMSGF_REVTRANS) {
@@ -748,6 +756,12 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
        }
        if (state == NULL)
                state = &iocom->state0;
+       if (state->flags & KDMSG_STATE_INTERLOCK) {
+               state->flags |= KDMSG_STATE_SIGNAL;
+               lksleep(state, &iocom->msglk, 0, "dmrace", hz);
+               goto again;
+       }
+
        msg->state = state;
 
        /*
@@ -1362,7 +1376,17 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
                error = 0;
                break;
        }
+
+       /*
+        * Set interlock (XXX hack) in case the send side blocks and a
+        * response is returned before kdmsg_state_cleanuptx() can be
+        * run.
+        */
+       if (state && error == 0)
+               state->flags |= KDMSG_STATE_INTERLOCK;
+
        lockmgr(&iocom->msglk, LK_RELEASE);
+
        return (error);
 }
 
@@ -1376,8 +1400,24 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
 
        if ((state = msg->state) == NULL) {
                kdmsg_msg_free(msg);
-       } else if (msg->any.head.cmd & DMSGF_DELETE) {
-               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+               return;
+       }
+
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+
+       /*
+        * Clear interlock (XXX hack) in case the send side blocks and a
+        * response is returned in the other thread before
+        * kdmsg_state_cleanuptx() can be run.  We maintain our hold on
+        * iocom->msglk so we can do this before completing our task.
+        */
+       if (state->flags & KDMSG_STATE_SIGNAL) {
+               kprintf("kdmsg: state %p interlock!\n", state);
+               wakeup(state);
+       }
+       state->flags &= ~(KDMSG_STATE_INTERLOCK | KDMSG_STATE_SIGNAL);
+
+       if (msg->any.head.cmd & DMSGF_DELETE) {
                KKASSERT((state->txcmd & DMSGF_DELETE) == 0);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
@@ -1405,14 +1445,13 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                        kdmsg_msg_free(msg);
                        if (TAILQ_EMPTY(&state->subq))
                                kdmsg_state_free(state);
-                       lockmgr(&iocom->msglk, LK_RELEASE);
                } else {
                        kdmsg_msg_free(msg);
-                       lockmgr(&iocom->msglk, LK_RELEASE);
                }
        } else {
                kdmsg_msg_free(msg);
        }
+       lockmgr(&iocom->msglk, LK_RELEASE);
 }
 
 static
@@ -1569,7 +1608,6 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                 */
                state = msg->state;
                msg->any.head.msgid = state->msgid;
-               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        } else {
                /*
                 * One-off message (always uses msgid 0 to distinguish
@@ -1578,12 +1616,13 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                 */
                state = NULL;
                msg->any.head.msgid = 0;
-               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        }
 
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+
        /*
         * This flag is not set until after the tx thread has drained
-        * the txmsgq and simulated responses.  After that point the
+        * the tx msgq and simulated responses.  After that point the
         * txthread is dead and can no longer simulate responses.
         *
         * Device drivers should never try to send a message once this
index 3cc5ee4..a785127 100644 (file)
@@ -72,6 +72,10 @@ struct dios_io {
 
 static MALLOC_DEFINE(M_DMSG_DISK, "dmsg_disk", "disk dmsg");
 
+static int blk_active;
+SYSCTL_INT(_debug, OID_AUTO, blk_active, CTLFLAG_RW, &blk_active, 0,
+           "Number of active iocom IOs");
+
 static int disk_iocom_reconnect(struct disk *dp, struct file *fp);
 static int disk_rcvdmsg(kdmsg_msg_t *msg);
 
@@ -354,8 +358,9 @@ disk_blk_read(struct disk *dp, kdmsg_msg_t *msg)
                bio->bio_offset = msg->any.blk_read.offset;
                bio->bio_caller_info1.ptr = msg->state;
                bio->bio_done = diskiodone;
-               /* kdmsg_state_hold(msg->state); */
 
+               /* kdmsg_state_hold(msg->state); */
+               atomic_add_int(&blk_active, 1);
                atomic_add_int(&iost->count, 1);
                if (msg->any.head.cmd & DMSGF_DELETE)
                        iost->eof = 1;
@@ -428,8 +433,9 @@ disk_blk_write(struct disk *dp, kdmsg_msg_t *msg)
                bio->bio_offset = msg->any.blk_write.offset;
                bio->bio_caller_info1.ptr = msg->state;
                bio->bio_done = diskiodone;
-               /* kdmsg_state_hold(msg->state); */
 
+               /* kdmsg_state_hold(msg->state); */
+               atomic_add_int(&blk_active, 1);
                atomic_add_int(&iost->count, 1);
                if (msg->any.head.cmd & DMSGF_DELETE)
                        iost->eof = 1;
@@ -479,8 +485,9 @@ disk_blk_flush(struct disk *dp, kdmsg_msg_t *msg)
                bio->bio_offset = msg->any.blk_flush.offset;
                bio->bio_caller_info1.ptr = msg->state;
                bio->bio_done = diskiodone;
-               /* kdmsg_state_hold(msg->state); */
 
+               /* kdmsg_state_hold(msg->state); */
+               atomic_add_int(&blk_active, 1);
                atomic_add_int(&iost->count, 1);
                if (msg->any.head.cmd & DMSGF_DELETE)
                        iost->eof = 1;
@@ -529,8 +536,9 @@ disk_blk_freeblks(struct disk *dp, kdmsg_msg_t *msg)
                bio->bio_offset = msg->any.blk_freeblks.offset;
                bio->bio_caller_info1.ptr = msg->state;
                bio->bio_done = diskiodone;
-               /* kdmsg_state_hold(msg->state); */
 
+               /* kdmsg_state_hold(msg->state); */
+               atomic_add_int(&blk_active, 1);
                atomic_add_int(&iost->count, 1);
                if (msg->any.head.cmd & DMSGF_DELETE)
                        iost->eof = 1;
@@ -618,6 +626,7 @@ diskiodone(struct bio *bio)
        } else {
                atomic_add_int(&iost->count, -1);
        }
+       atomic_add_int(&blk_active, -1);
        cmd |= DMSGF_REPLY;
 
        /*
index f7f2a41..bfb2dc4 100644 (file)
@@ -705,7 +705,7 @@ struct kdmsg_data;
 /*
  * msg_ctl flags (atomic)
  */
-#define KDMSG_CLUSTERCTL_KILL          0x00000001
+#define KDMSG_CLUSTERCTL_UNUSED01      0x00000001
 #define KDMSG_CLUSTERCTL_KILLRX                0x00000002 /* staged helper exit */
 #define KDMSG_CLUSTERCTL_KILLTX                0x00000004 /* staged helper exit */
 #define KDMSG_CLUSTERCTL_SLEEPING      0x00000008 /* interlocked w/msglk */
@@ -745,6 +745,8 @@ struct kdmsg_state {
 #define KDMSG_STATE_ABORTING   0x0008          /* avoids recursive abort */
 #define KDMSG_STATE_OPPOSITE   0x0010          /* opposite direction */
 #define KDMSG_STATE_DYING      0x0020          /* indicates circuit failure */
+#define KDMSG_STATE_INTERLOCK  0x0040
+#define KDMSG_STATE_SIGNAL     0x0080
 
 struct kdmsg_msg {
        TAILQ_ENTRY(kdmsg_msg) qentry;          /* serialized queue */