dmsg - Stabilization work
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 3 Mar 2015 06:21:23 +0000 (22:21 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 3 Mar 2015 06:21:23 +0000 (22:21 -0800)
* Add a refs field to dmsg_state and kdmsg_state for retention and
  disposal.

* Separate out the tracking of state->subq and the state RB trees.

* Greatly simplify the iocom shutdown code and functions related
  to handling communications failures.  When iterating states for
  shutdown, which requires simulating a received failure message,
  we can now simply iterate via state->subq.

* Greatly simplify how the simulated failures are generated and
  handled (See dmsg_msg_simulate_failure()).  This is probably the
  most complex part of the library.

* Fix a memory leak in kern_dmsg.c.

* Replace xdisk's per-softc token with a lockmgr lock.  Atomicy
  has to be guaranteed across blocking conditions in certain cases
  and it was easier to simply use a lock for everything.

  Cleanup the locking.

* Ripout the shutdown check in the I/O path, which can deadlock
  the disk management threads.  This will need to be revisited
  as it means that /dev/xa* and /dev/serno* devices remain in
  /dev after a link failure even when there are no opens on the
  device.

* Add the B_FAILONDIS flag to struct buf.  This flag allows the disk
  probe code to tell xdisk that it is ok for the I/O to fail, allowing
  xdisk to discard I/Os that would otherwise block or deadlock the
  disk probe code when the related network connection is lost.

lib/libdmsg/dmsg.h
lib/libdmsg/msg.c
sys/dev/disk/xdisk/xdisk.c
sys/kern/kern_dmsg.c
sys/kern/subr_diskgpt.c
sys/kern/subr_disklabel32.c
sys/kern/subr_disklabel64.c
sys/kern/subr_diskmbr.c
sys/sys/buf.h
sys/sys/dmsg.h

index 361ffdf..d6c6ec0 100644 (file)
@@ -179,13 +179,14 @@ struct dmsg_state {
        dmsg_media_t    *media;
 };
 
-#define DMSG_STATE_INSERTED    0x0001
+#define DMSG_STATE_SUBINSERTED 0x0001
 #define DMSG_STATE_DYNAMIC     0x0002
 #define DMSG_STATE_NODEID      0x0004          /* manages a node id */
 #define DMSG_STATE_UNUSED_0008 0x0008
 #define DMSG_STATE_OPPOSITE    0x0010          /* initiated by other end */
 #define DMSG_STATE_CIRCUIT     0x0020          /* LNK_SPAN special case */
 #define DMSG_STATE_DYING       0x0040          /* indicates circuit failure */
+#define DMSG_STATE_RBINSERTED  0x0080
 #define DMSG_STATE_ROOT                0x8000          /* iocom->state0 */
 
 /*
index c00ff4e..e6f6f59 100644 (file)
@@ -45,7 +45,7 @@ static int dmsg_state_msgrx(dmsg_msg_t *msg);
 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
 static void dmsg_state_free(dmsg_state_t *state);
-static void dmsg_msg_simulate_failure(dmsg_state_t *state, int error);
+static void dmsg_simulate_failure(dmsg_state_t *state, int error);
 
 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
 
@@ -327,7 +327,8 @@ dmsg_msg_alloc_locked(dmsg_state_t *state,
 
                RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
-               state->flags |= DMSG_STATE_INSERTED;
+               state->flags |= DMSG_STATE_SUBINSERTED |
+                               DMSG_STATE_RBINSERTED;
 
                if (DMsgDebugOpt) {
                        fprintf(stderr,
@@ -661,7 +662,6 @@ dmsg_ioq_read(dmsg_iocom_t *iocom)
 {
        dmsg_ioq_t *ioq = &iocom->ioq_rx;
        dmsg_msg_t *msg;
-       dmsg_state_t *state;
        dmsg_hdr_t *head;
        ssize_t n;
        size_t bytes;
@@ -678,6 +678,11 @@ again:
         */
        if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
+
+               if (msg->state == &iocom->state0) {
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
+                       fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
+               }
                return (msg);
        }
        atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
@@ -1069,9 +1074,7 @@ again:
         *       to update them when breaking out.
         */
        if (ioq->error) {
-               dmsg_state_t *tmp_state;
 skip:
-               fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
                /*
                 * An unrecoverable error causes all active receive
                 * transactions to be terminated with a LNK_ERROR message.
@@ -1081,6 +1084,7 @@ skip:
                 * message, which should cause master processing loops to
                 * terminate.
                 */
+               fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
                assert(ioq->msg == msg);
                if (msg) {
                        dmsg_msg_free(msg);
@@ -1109,24 +1113,7 @@ skip:
                 */
                pthread_mutex_lock(&iocom->mtx);
                dmsg_iocom_drain(iocom);
-
-               tmp_state = NULL;
-               RB_FOREACH(state, dmsg_state_tree, &iocom->staterd_tree) {
-                       atomic_set_int(&state->flags, DMSG_STATE_DYING);
-                       if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
-                               tmp_state = state;
-               }
-               RB_FOREACH(state, dmsg_state_tree, &iocom->statewr_tree) {
-                       atomic_set_int(&state->flags, DMSG_STATE_DYING);
-                       if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
-                               tmp_state = state;
-               }
-
-               if (tmp_state) {
-                       dmsg_msg_simulate_failure(tmp_state, ioq->error);
-               } else {
-                       dmsg_msg_simulate_failure(&iocom->state0, ioq->error);
-               }
+               dmsg_simulate_failure(&iocom->state0, ioq->error);
                pthread_mutex_unlock(&iocom->mtx);
                if (TAILQ_FIRST(&ioq->msgq))
                        goto again;
@@ -1211,6 +1198,10 @@ skip:
                        ioq->error = error;
                        goto skip;
                }
+
+               /*
+                * No error and not routed
+                */
                /* no error, not routed.  Fall through and return msg */
        }
        return (msg);
@@ -1606,19 +1597,22 @@ dmsg_msg_write(dmsg_msg_t *msg)
        pthread_mutex_lock(&iocom->mtx);
        state = msg->state;
 
+#if 0
        /*
         * Make sure the parent transaction is still open in the transmit
         * direction.  If it isn't the message is dead and we have to
         * potentially simulate a rxmsg terminating the transaction.
         */
-       if (state->parent->txcmd & DMSGF_DELETE) {
+       if ((state->parent->txcmd & DMSGF_DELETE) ||
+           (state->parent->rxcmd & DMSGF_DELETE)) {
                fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
-               dmsg_msg_simulate_failure(state, DMSG_ERR_LOSTLINK);
+               dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK);
                dmsg_state_cleanuptx(iocom, msg);
                dmsg_msg_free(msg);
                pthread_mutex_unlock(&iocom->mtx);
                return;
        }
+#endif
 
        /*
         * Process state data into the message as needed, then update the
@@ -1666,44 +1660,45 @@ dmsg_msg_write(dmsg_msg_t *msg)
 }
 
 /*
+ * Simulate reception of a transaction DELETE message when the link goes
+ * bad.  This routine must recurse through state->subq and generate messages
+ * and callbacks bottom-up.
+ *
  * iocom->mtx must be held by caller.
  */
 static
 void
-dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
+dmsg_simulate_failure(dmsg_state_t *state, int error)
 {
-       dmsg_iocom_t *iocom = state->iocom;
+       dmsg_state_t *substate;
+       dmsg_iocom_t *iocom;
        dmsg_msg_t *msg;
 
-       msg = NULL;
+       while ((substate = TAILQ_FIRST(&state->subq)) != NULL) {
+               dmsg_simulate_failure(substate, error);
+       }
 
+       iocom = state->iocom;
        if (state == &iocom->state0) {
                /*
                 * No active local or remote transactions remain.
-                * Generate a final LNK_ERROR and flag EOF.
+                * Generate a final LNK_ERROR.  EOF will be flagged
+                * when the message is returned by dmsg_ioq_read().
                 */
                msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
                                            DMSG_LNK_ERROR,
                                            NULL, NULL);
                msg->any.head.error = error;
-               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
-               fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
        } else if (state->flags & DMSG_STATE_OPPOSITE) {
                /*
                 * Active remote transactions are still present.
                 * Simulate the other end sending us a DELETE.
                 */
-               if (state->rxcmd & DMSGF_DELETE) {
-                       fprintf(stderr,
-                               "iocom: ioq error(rd) %d sleeping "
-                               "state %p rxcmd %08x txcmd %08x "
-                               "func %p\n",
-                               error, state, state->rxcmd,
-                               state->txcmd, state->func);
-                       usleep(100000); /* XXX */
-                       atomic_set_int(&iocom->flags,
-                                      DMSG_IOCOMF_RWORK);
-               } else {
+               if (state->flags & DMSG_STATE_SUBINSERTED) {
+                       TAILQ_REMOVE(&state->parent->subq, state, entry);
+                       state->flags &= ~DMSG_STATE_SUBINSERTED;
+               }
+               if ((state->rxcmd & DMSGF_DELETE) == 0) {
                        fprintf(stderr, "SIMULATE ERROR1\n");
                        msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
                                             DMSG_LNK_ERROR,
@@ -1719,23 +1714,19 @@ dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
                             DMSG_STATE_OPPOSITE) == 0) {
                                msg->any.head.cmd |= DMSGF_REVCIRC;
                        }
+               } else {
+                       msg = NULL;
                }
        } else {
                /*
                 * Active local transactions are still present.
                 * Simulate the other end sending us a DELETE.
                 */
-               if (state->rxcmd & DMSGF_DELETE) {
-                       fprintf(stderr,
-                               "iocom: ioq error(wr) %d sleeping "
-                               "state %p rxcmd %08x txcmd %08x "
-                               "func %p\n",
-                               error, state, state->rxcmd,
-                               state->txcmd, state->func);
-                       usleep(100000); /* XXX */
-                       atomic_set_int(&iocom->flags,
-                                      DMSG_IOCOMF_RWORK);
-               } else {
+               if (state->flags & DMSG_STATE_SUBINSERTED) {
+                       TAILQ_REMOVE(&state->parent->subq, state, entry);
+                       state->flags &= ~DMSG_STATE_SUBINSERTED;
+               }
+               if ((state->rxcmd & DMSGF_DELETE) == 0) {
                        fprintf(stderr, "SIMULATE ERROR1\n");
                        msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
                                             DMSG_LNK_ERROR,
@@ -1754,6 +1745,8 @@ dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
                        }
                        if ((state->rxcmd & DMSGF_CREATE) == 0)
                                msg->any.head.cmd |= DMSGF_CREATE;
+               } else {
+                       msg = NULL;
                }
        }
        if (msg) {
@@ -1973,6 +1966,13 @@ dmsg_state_result(dmsg_state_t *state, uint32_t error)
  *
  * --
  *
+ * The message may be running over a circuit.  If the circuit is half-deleted
+ * The message is typically racing against a link failure and must be thrown
+ * out.  As the circuit deletion propagates the library will automatically
+ * generate terminations for sub states.
+ *
+ * --
+ *
  * ABORT sequences work by setting the ABORT flag along with normal message
  * state.  However, ABORTs can also be sent on half-closed messages, that is
  * even if the command or reply side has already sent a DELETE, as long as
@@ -2128,7 +2128,8 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                pthread_mutex_lock(&iocom->mtx);
                RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
-               state->flags |= DMSG_STATE_INSERTED;
+               state->flags |= DMSG_STATE_SUBINSERTED |
+                               DMSG_STATE_RBINSERTED;
 
                /*
                 * If the parent is a relay set up the state handler to
@@ -2428,7 +2429,7 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                state->rxcmd |= DMSGF_DELETE;
 
                if (state->txcmd & DMSGF_DELETE) {
-                       assert(state->flags & DMSG_STATE_INSERTED);
+                       assert(state->flags & DMSG_STATE_RBINSERTED);
                        if (state->rxcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
                                RB_REMOVE(dmsg_state_tree,
@@ -2438,11 +2439,14 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                                RB_REMOVE(dmsg_state_tree,
                                          &iocom->staterd_tree, state);
                        }
-                       pstate = state->parent;
-                       TAILQ_REMOVE(&pstate->subq, state, entry);
-                       state->flags &= ~DMSG_STATE_INSERTED;
+                       state->flags &= ~DMSG_STATE_RBINSERTED;
+                       if (state->flags & DMSG_STATE_SUBINSERTED) {
+                               pstate = state->parent;
+                               TAILQ_REMOVE(&pstate->subq, state, entry);
+                               state->flags &= ~DMSG_STATE_SUBINSERTED;
+                               dmsg_state_drop(pstate);
+                       }
                        state->parent = NULL;
-                       dmsg_state_drop(pstate);
 
                        if (state->relay) {
                                dmsg_state_drop(state->relay);
@@ -2494,7 +2498,7 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                assert((state->txcmd & DMSGF_DELETE) == 0);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
-                       assert(state->flags & DMSG_STATE_INSERTED);
+                       assert(state->flags & DMSG_STATE_RBINSERTED);
                        if (state->txcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
                                RB_REMOVE(dmsg_state_tree,
@@ -2504,9 +2508,12 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                                RB_REMOVE(dmsg_state_tree,
                                          &iocom->statewr_tree, state);
                        }
+                       state->flags &= ~DMSG_STATE_RBINSERTED;
                        pstate = state->parent;
-                       TAILQ_REMOVE(&pstate->subq, state, entry);
-                       state->flags &= ~DMSG_STATE_INSERTED;
+                       if (state->flags & DMSG_STATE_SUBINSERTED) {
+                               TAILQ_REMOVE(&pstate->subq, state, entry);
+                               state->flags &= ~DMSG_STATE_SUBINSERTED;
+                       }
                        state->parent = NULL;
                        dmsg_state_drop(pstate);
 
@@ -2547,7 +2554,9 @@ dmsg_state_free(dmsg_state_t *state)
                fprintf(stderr, "terminate state %p id=%08x\n",
                        state, (uint32_t)state->msgid);
        }
-       assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_INSERTED)) == 0);
+       assert((state->flags & (DMSG_STATE_ROOT |
+                               DMSG_STATE_SUBINSERTED |
+                               DMSG_STATE_RBINSERTED)) == 0);
        assert(TAILQ_EMPTY(&state->subq));
        assert(state->refs == 0);
        if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
index 5f5fe57..a12213b 100644 (file)
@@ -118,7 +118,7 @@ struct xa_softc {
        TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
        TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
        TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
-       struct lwkt_token tok;
+       struct lock     lk;
 };
 
 typedef struct xa_softc        xa_softc_t;
@@ -189,9 +189,9 @@ static struct dev_ops xa_ops = {
        .d_psize =      xa_size
 };
 
-static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
 static int xdisk_opencount;
 static cdev_t xdisk_dev;
+struct lock xdisk_lk;
 static TAILQ_HEAD(, xa_iocom) xaiocomq;
 
 /*
@@ -204,6 +204,7 @@ xdisk_modevent(module_t mod, int type, void *data)
        case MOD_LOAD:
                TAILQ_INIT(&xaiocomq);
                RB_INIT(&xa_device_tree);
+               lockinit(&xdisk_lk, "xdisk", 0, 0);
                xdisk_dev = make_dev(&xdisk_ops, 0,
                                     UID_ROOT, GID_WHEEL, 0600, "xdisk");
                break;
@@ -238,18 +239,18 @@ xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
 static int
 xdisk_open(struct dev_open_args *ap)
 {
-       lwkt_gettoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
        ++xdisk_opencount;
-       lwkt_reltoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_RELEASE);
        return(0);
 }
 
 static int
 xdisk_close(struct dev_close_args *ap)
 {
-       lwkt_gettoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
        --xdisk_opencount;
-       lwkt_reltoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_RELEASE);
        return(0);
 }
 
@@ -285,11 +286,10 @@ xdisk_attach(struct xdisk_attach_ioctl *xaioc)
        /*
         * Normalize ioctl params
         */
-       kprintf("xdisk_attach1\n");
        fp = holdfp(curproc->p_fd, xaioc->fd, -1);
        if (fp == NULL)
                return EINVAL;
-       kprintf("xdisk_attach2\n");
+       kprintf("xdisk_attach fp=%p\n", fp);
 
        /*
         * See if the serial number is already present.  If we are
@@ -297,10 +297,9 @@ xdisk_attach(struct xdisk_attach_ioctl *xaioc)
         * duplicate entries not yet removed so we wait a bit and
         * retry.
         */
-       lwkt_gettoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
 
        xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
-       kprintf("xdisk_attach3\n");
        kdmsg_iocom_init(&xaio->iocom, xaio,
                         KDMSG_IOCOMF_AUTOCONN,
                         M_XDISK, xaio_rcvdmsg);
@@ -331,7 +330,8 @@ xdisk_attach(struct xdisk_attach_ioctl *xaioc)
         */
        TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
        kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
-       lwkt_reltoken(&xdisk_token);
+
+       lockmgr(&xdisk_lk, LK_RELEASE);
 
        return 0;
 }
@@ -351,10 +351,10 @@ xaio_exit(kdmsg_iocom_t *iocom)
 {
        xa_iocom_t *xaio = iocom->handle;
 
-       kprintf("xdisk_detach -xaio_exit\n");
-       lwkt_gettoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
+       kprintf("xdisk_detach [xaio_exit()]\n");
        TAILQ_REMOVE(&xaiocomq, xaio, entry);
-       lwkt_reltoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_RELEASE);
 
        kdmsg_iocom_uninit(&xaio->iocom);
 
@@ -375,9 +375,12 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
        xa_iocom_t      *xaio = state->iocom->handle;
        xa_softc_t      *sc;
 
-       kprintf("xdisk_rcvdmsg %08x (state cmd %08x)\n",
-               msg->any.head.cmd, msg->tcmd);
-       lwkt_gettoken(&xdisk_token);
+       if (state) {
+               kprintf("xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
+                       state, state->rxcmd, state->txcmd,
+                       msg->any.head.cmd);
+       }
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
 
        switch(msg->tcmd) {
        case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
@@ -402,9 +405,8 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                      sizeof(xaio->dummysc.fs_label));
                xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
 
-               kprintf("xdisk: %s LNK_SPAN create state=%p ",
-                       msg->any.lnk_span.fs_label,
-                       msg->state);
+               kprintf("xdisk: LINK_SPAN state %p create for %s\n",
+                       msg->state, msg->any.lnk_span.fs_label);
 
                sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
                if (sc == NULL) {
@@ -415,7 +417,6 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                        int n;
 
                        sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
-                       kprintf("(not found - create %p)\n", sc);
                        bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
                              sizeof(sc->cl_label));
                        sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
@@ -437,11 +438,13 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                        sc->unit = unit;
                        sc->serializing = 1;
                        sc->spancnt = 1;
-                       lwkt_token_init(&sc->tok, "xa");
+                       lockinit(&sc->lk, "xalk", 0, 0);
                        TAILQ_INIT(&sc->spanq);
                        TAILQ_INIT(&sc->bioq);
                        TAILQ_INIT(&sc->tag_freeq);
                        TAILQ_INIT(&sc->tag_pendq);
+
+                       lockmgr(&sc->lk, LK_EXCLUSIVE);
                        RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
                        TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
                        msg->state->any.xa_sc = sc;
@@ -491,8 +494,9 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                         */
                        disk_setdiskinfo(&sc->disk, &sc->info);
                        xa_restart_deferred(sc);        /* eats serializing */
+                       lockmgr(&sc->lk, LK_RELEASE);
                } else {
-                       kprintf("(found spancnt %d sc=%p)\n", sc->spancnt, sc);
+                       lockmgr(&sc->lk, LK_EXCLUSIVE);
                        ++sc->spancnt;
                        TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
                        msg->state->any.xa_sc = sc;
@@ -500,7 +504,9 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                                sc->serializing = 1;
                                xa_restart_deferred(sc); /* eats serializing */
                        }
+                       lockmgr(&sc->lk, LK_RELEASE);
                }
+               kprintf("xdisk: sc %p spancnt %d\n", sc, sc->spancnt);
                kdmsg_msg_result(msg, 0);
                break;
        case DMSG_LNK_SPAN | DMSGF_DELETE:
@@ -510,20 +516,51 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                 * Return a final result, closing our end of the transaction.
                 */
                sc = msg->state->any.xa_sc;
-               kprintf("xdisk: %s LNK_SPAN terminate state=%p\n",
-                       sc->fs_label, msg->state);
+               kprintf("xdisk: LINK_SPAN state %p delete for %s (sc=%p)\n",
+                       msg->state, (sc ? sc->fs_label : "(null)"), sc);
+               lockmgr(&sc->lk, LK_EXCLUSIVE);
                msg->state->any.xa_sc = NULL;
                TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
                --sc->spancnt;
-               xa_terminate_check(sc);
+
+               kprintf("xdisk: sc %p spancnt %d\n", sc, sc->spancnt);
+
+               /*
+                * Spans can come and go as the graph stabilizes, so if
+                * we lose a span along with sc->open_tag we may be able
+                * to restart the I/Os on a different span.
+                */
+               if (sc->spancnt &&
+                   sc->serializing == 0 && sc->open_tag == NULL) {
+                       sc->serializing = 1;
+                       xa_restart_deferred(sc);
+               }
+               lockmgr(&sc->lk, LK_RELEASE);
                kdmsg_msg_reply(msg, 0);
+
+#if 0
+               /*
+                * Termination
+                */
+               if (sc->spancnt == 0)
+                       xa_terminate_check(sc);
+#endif
                break;
        case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
+               /*
+                * Ignore unimplemented streaming replies on our LNK_SPAN
+                * transaction.
+                */
+               kprintf("xdisk: LINK_SPAN state %p delete+reply\n",
+                       msg->state);
+               break;
        case DMSG_LNK_SPAN | DMSGF_REPLY:
                /*
                 * Ignore unimplemented streaming replies on our LNK_SPAN
                 * transaction.
                 */
+               kprintf("xdisk: LINK_SPAN state %p reply\n",
+                       msg->state);
                break;
        case DMSG_DBG_SHELL:
                /*
@@ -566,7 +603,7 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
                        kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
                break;
        }
-       lwkt_reltoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_RELEASE);
 
        return 0;
 }
@@ -574,7 +611,7 @@ xaio_rcvdmsg(kdmsg_msg_t *msg)
 /*
  * Determine if we can destroy the xa_softc.
  *
- * Called with xdisk_token held.
+ * Called with xdisk_lk held.
  */
 static
 void
@@ -594,18 +631,25 @@ xa_terminate_check(struct xa_softc *sc)
                kprintf("(leave intact)\n");
                return;
        }
-       kprintf("(remove from tree)\n");
-       sc->serializing = 1;
-       KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
 
+       /*
+        * Remove from device tree, a race with a new incoming span
+        * will create a new softc and disk.
+        */
        RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
 
+       /*
+        * Device has to go first to prevent device ops races.
+        */
        if (sc->dev) {
                disk_destroy(&sc->disk);
                devstat_remove_entry(&sc->stats);
                sc->dev->si_drv1 = NULL;
                sc->dev = NULL;
        }
+
+       kprintf("(remove from tree)\n");
+       sc->serializing = 1;
        KKASSERT(sc->opencnt == 0);
        KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
 
@@ -636,11 +680,11 @@ xa_open(struct dev_open_args *ap)
         * Interlock open with opencnt, wait for attachment operations
         * to finish.
         */
-       lwkt_gettoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
 again:
        sc = dev->si_drv1;
        if (sc == NULL) {
-               lwkt_reltoken(&xdisk_token);
+               lockmgr(&xdisk_lk, LK_RELEASE);
                return ENXIO;   /* raced destruction */
        }
        if (sc->serializing) {
@@ -655,10 +699,9 @@ again:
        if (sc->opencnt++ > 0) {
                sc->serializing = 0;
                wakeup(sc);
-               lwkt_reltoken(&xdisk_token);
+               lockmgr(&xdisk_lk, LK_RELEASE);
                return(0);
        }
-       lwkt_reltoken(&xdisk_token);
 
        /*
         * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
@@ -669,13 +712,14 @@ again:
                sc->serializing = 0;
                wakeup(sc);
        }
+       lockmgr(&xdisk_lk, LK_RELEASE);
 
        /*
         * Wait for completion of the BLK_OPEN
         */
-       lwkt_gettoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
        while (sc->serializing)
-               tsleep(sc, 0, "xaopen", hz);
+               lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
 
        error = sc->last_error;
        if (error) {
@@ -684,7 +728,7 @@ again:
                xa_terminate_check(sc);
                sc = NULL;      /* sc may be invalid now */
        }
-       lwkt_reltoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_RELEASE);
 
        return (error);
 }
@@ -699,8 +743,8 @@ xa_close(struct dev_close_args *ap)
        sc = dev->si_drv1;
        if (sc == NULL)
                return ENXIO;   /* raced destruction */
-       lwkt_gettoken(&xdisk_token);
-       lwkt_gettoken(&sc->tok);
+       lockmgr(&xdisk_lk, LK_EXCLUSIVE);
+       lockmgr(&sc->lk, LK_EXCLUSIVE);
 
        /*
         * NOTE: Clearing open_tag allows a concurrent open to re-open
@@ -709,14 +753,16 @@ xa_close(struct dev_close_args *ap)
        if (sc->opencnt == 1 && sc->open_tag) {
                tag = sc->open_tag;
                sc->open_tag = NULL;
+               lockmgr(&sc->lk, LK_RELEASE);
                kdmsg_state_reply(tag->state, 0);       /* close our side */
                xa_wait(tag);                           /* wait on remote */
+       } else {
+               lockmgr(&sc->lk, LK_RELEASE);
        }
-       lwkt_reltoken(&sc->tok);
        KKASSERT(sc->opencnt > 0);
        --sc->opencnt;
        xa_terminate_check(sc);
-       lwkt_reltoken(&xdisk_token);
+       lockmgr(&xdisk_lk, LK_RELEASE);
 
        return(0);
 }
@@ -728,25 +774,16 @@ xa_strategy(struct dev_strategy_args *ap)
        xa_tag_t *tag;
        struct bio *bio = ap->a_bio;
 
-       /*
-        * Only BUF_CMD_READ is allowed (for probes) if opencnt is zero.
-        * Otherwise a BLK_OPEN transaction is required.
-        */
-       if (sc->opencnt == 0 && bio->bio_buf->b_cmd != BUF_CMD_READ) {
-               bio->bio_buf->b_error = ENXIO;
-               bio->bio_buf->b_flags |= B_ERROR;
-               biodone(bio);
-               return(0);
-       }
        devstat_start_transaction(&sc->stats);
        atomic_add_int(&xa_active, 1);
        xa_last = bio->bio_offset;
 
-       lwkt_gettoken(&sc->tok);
+       lockmgr(&sc->lk, LK_EXCLUSIVE);
        tag = xa_setup_cmd(sc, bio);
        if (tag)
                xa_start(tag, NULL, 1);
-       lwkt_reltoken(&sc->tok);
+       lockmgr(&sc->lk, LK_RELEASE);
+
        return(0);
 }
 
@@ -772,6 +809,7 @@ xa_size(struct dev_psize_args *ap)
  ************************************************************************
  *
  * Implement tag/msg setup and related functions.
+ * Called with sc->lk held.
  */
 static xa_tag_t *
 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
@@ -781,7 +819,6 @@ xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
        /*
         * Only get a tag if we have a valid virtual circuit to the server.
         */
-       lwkt_gettoken(&sc->tok);
        if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
                TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
                tag->bio = bio;
@@ -794,11 +831,13 @@ xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
        if (tag == NULL && bio) {
                TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
        }
-       lwkt_reltoken(&sc->tok);
 
        return (tag);
 }
 
+/*
+ * Called with sc->lk held
+ */
 static void
 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
 {
@@ -810,6 +849,18 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
        if (msg == NULL) {
                struct bio *bio;
                struct buf *bp;
+               kdmsg_state_t *trans;
+
+               if (sc->opencnt == 0 || sc->open_tag == NULL) {
+                       TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
+                               if ((trans->rxcmd & DMSGF_DELETE) == 0)
+                                       break;
+                       }
+               } else {
+                       trans = sc->open_tag->state;
+               }
+               if (trans == NULL)
+                       goto skip;
 
                KKASSERT(tag->bio);
                bio = tag->bio;
@@ -817,33 +868,17 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
 
                switch(bp->b_cmd) {
                case BUF_CMD_READ:
-                       if (sc->opencnt == 0 || sc->open_tag == NULL) {
-                               kdmsg_state_t *span;
-
-                               TAILQ_FOREACH(span, &sc->spanq, user_entry) {
-                                       if ((span->rxcmd & DMSGF_DELETE) == 0)
-                                               break;
-                               }
-                               if (span == NULL)
-                                       break;
-                               msg = kdmsg_msg_alloc(span,
-                                                     DMSG_BLK_READ |
-                                                     DMSGF_CREATE |
-                                                     DMSGF_DELETE,
-                                                     xa_bio_completion, tag);
-                       } else {
-                               msg = kdmsg_msg_alloc(sc->open_tag->state,
-                                                     DMSG_BLK_READ |
-                                                     DMSGF_CREATE |
-                                                     DMSGF_DELETE,
-                                                     xa_bio_completion, tag);
-                       }
+                       msg = kdmsg_msg_alloc(trans,
+                                             DMSG_BLK_READ |
+                                             DMSGF_CREATE |
+                                             DMSGF_DELETE,
+                                             xa_bio_completion, tag);
                        msg->any.blk_read.keyid = sc->keyid;
                        msg->any.blk_read.offset = bio->bio_offset;
                        msg->any.blk_read.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_WRITE:
-                       msg = kdmsg_msg_alloc(sc->open_tag->state,
+                       msg = kdmsg_msg_alloc(trans,
                                              DMSG_BLK_WRITE |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -854,7 +889,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
                        msg->aux_size = bp->b_bcount;
                        break;
                case BUF_CMD_FLUSH:
-                       msg = kdmsg_msg_alloc(sc->open_tag->state,
+                       msg = kdmsg_msg_alloc(trans,
                                              DMSG_BLK_FLUSH |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -863,7 +898,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
                        msg->any.blk_flush.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_FREEBLKS:
-                       msg = kdmsg_msg_alloc(sc->open_tag->state,
+                       msg = kdmsg_msg_alloc(trans,
                                              DMSG_BLK_FREEBLKS |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -882,12 +917,18 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
                }
        }
 
+       /*
+        * If no msg was allocated this ia failure
+        */
+skip:
        if (msg) {
                tag->state = msg->state;
                kdmsg_msg_write(msg);
        } else {
+               lockmgr(&sc->lk, LK_RELEASE);
                tag->status.head.error = DMSG_ERR_IO;
                xa_done(tag, 1);
+               lockmgr(&sc->lk, LK_EXCLUSIVE);
        }
 }
 
@@ -897,13 +938,12 @@ xa_wait(xa_tag_t *tag)
        xa_softc_t *sc = tag->sc;
        uint32_t error;
 
-       kprintf("xdisk: xa_wait  %p\n", tag);
-
-       lwkt_gettoken(&sc->tok);
+       lockmgr(&sc->lk, LK_EXCLUSIVE);
        tag->waiting = 1;
        while (tag->done == 0)
-               tsleep(tag, 0, "xawait", 0);
-       lwkt_reltoken(&sc->tok);
+               lksleep(tag, &sc->lk, 0, "xawait", 0);
+       lockmgr(&sc->lk, LK_RELEASE);
+
        error = tag->status.head.error;
        tag->waiting = 0;
        xa_release(tag, 0);
@@ -936,18 +976,29 @@ xa_release(xa_tag_t *tag, int wasbio)
        xa_softc_t *sc = tag->sc;
        struct bio *bio;
 
-       lwkt_gettoken(&sc->tok);
+       if ((bio = tag->bio) != NULL) {
+               struct buf *bp = bio->bio_buf;
+
+               bp->b_error = EIO;
+               bp->b_flags |= B_ERROR;
+               devstat_end_transaction_buf(&sc->stats, bp);
+               atomic_add_int(&xa_active, -1);
+               biodone(bio);
+               tag->bio = NULL;
+       }
+
+       lockmgr(&sc->lk, LK_EXCLUSIVE);
+
        if (wasbio && sc->open_tag &&
            (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
                TAILQ_REMOVE(&sc->bioq, bio, bio_act);
                tag->bio = bio;
                xa_start(tag, NULL, 1);
-               lwkt_reltoken(&sc->tok);
        } else {
                TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
                TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
-               lwkt_reltoken(&sc->tok);
        }
+       lockmgr(&sc->lk, LK_RELEASE);
 }
 
 /*
@@ -965,6 +1016,9 @@ xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
         * of the transaction and we are waiting for the other side to
         * close.
         */
+       kprintf("xa_sync_completion: tag %p msg %08x state %p\n",
+               tag, msg->any.head.cmd, msg->state);
+
        if (tag == NULL) {
                if (msg->any.head.cmd & DMSGF_CREATE)
                        kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
@@ -975,7 +1029,7 @@ xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
        /*
         * Validate the tag
         */
-       lwkt_gettoken(&sc->tok);
+       lockmgr(&sc->lk, LK_EXCLUSIVE);
 
        /*
         * Handle initial response to our open and restart any deferred
@@ -1027,7 +1081,8 @@ xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
                        xa_done(tag, 0);
                }
        }
-       lwkt_reltoken(&sc->tok);
+       lockmgr(&sc->lk, LK_RELEASE);
+
        return (0);
 }
 
@@ -1136,11 +1191,23 @@ handle_done:
         * Handle the case where the transaction failed due to a
         * connectivity issue.  The tag is put away with wasbio=0
         * and we put the BIO back onto the bioq for a later restart.
+        *
+        * probe I/Os (where the device is not open) will be failed
+        * instead of requeued.
         */
 handle_repend:
-       lwkt_gettoken(&sc->tok);
-       kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
        tag->bio = NULL;
+       if (bio->bio_buf->b_flags & B_FAILONDIS) {
+               kprintf("xa_strategy: disconnected, fail bp %p\n",
+                       bio->bio_buf);
+               bio->bio_buf->b_error = ENXIO;
+               bio->bio_buf->b_flags |= B_ERROR;
+               biodone(bio);
+               bio = NULL;
+               kprintf("BIO CIRC FAILURE, FAIL BIO %p\n", bio);
+       } else {
+               kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
+       }
        xa_done(tag, 0);
        if ((state->txcmd & DMSGF_DELETE) == 0)
                kdmsg_msg_reply(msg, 0);
@@ -1148,9 +1215,11 @@ handle_repend:
        /*
         * Requeue the bio
         */
-       TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
-
-       lwkt_reltoken(&sc->tok);
+       if (bio) {
+               lockmgr(&sc->lk, LK_EXCLUSIVE);
+               TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
+               lockmgr(&sc->lk, LK_RELEASE);
+       }
        return (0);
 }
 
@@ -1158,7 +1227,7 @@ handle_repend:
  * Restart as much deferred I/O as we can.  The serializer is set and we
  * eat it (clear it) when done.
  *
- * Called with sc->tok held
+ * Called with sc->lk held
  */
 static
 void
@@ -1201,13 +1270,14 @@ xa_restart_deferred(xa_softc_t *sc)
                                error = ENXIO;
                }
                if (error == 0) {
-                       kprintf("xdisk: BLK_OPEN\n");
                        sc->open_tag = tag;
                        msg = kdmsg_msg_alloc(span,
                                              DMSG_BLK_OPEN |
                                              DMSGF_CREATE,
                                              xa_sync_completion, tag);
                        msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
+                       kprintf("xdisk: BLK_OPEN tag %p state %p span-state %p\n",
+                               tag, msg->state, span);
                        xa_start(tag, msg, 0);
                }
                if (error) {
index db95ad1..ac72513 100644 (file)
@@ -62,9 +62,22 @@ static int kdmsg_state_msgrx(kdmsg_msg_t *msg);
 static int kdmsg_state_msgtx(kdmsg_msg_t *msg);
 static void kdmsg_state_cleanuprx(kdmsg_msg_t *msg);
 static void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
+static void kdmsg_simulate_failure(kdmsg_state_t *state, int meto, int error);
 static void kdmsg_state_abort(kdmsg_state_t *state);
 static void kdmsg_state_free(kdmsg_state_t *state);
 
+#ifdef KDMSG_DEBUG
+#define KDMSG_DEBUG_ARGS       , const char *file, int line
+#define kdmsg_state_ref(state) _kdmsg_state_ref(state, __FILE__, __LINE__)
+#define kdmsg_state_drop(state)        _kdmsg_state_drop(state, __FILE__, __LINE__)
+#else
+#define KDMSG_DEBUG_ARGS
+#define kdmsg_state_ref(state) _kdmsg_state_ref(state)
+#define kdmsg_state_drop(state)        _kdmsg_state_drop(state)
+#endif
+static void _kdmsg_state_ref(kdmsg_state_t *state KDMSG_DEBUG_ARGS);
+static void _kdmsg_state_drop(kdmsg_state_t *state KDMSG_DEBUG_ARGS);
+
 static void kdmsg_iocom_thread_rd(void *arg);
 static void kdmsg_iocom_thread_wr(void *arg);
 static int kdmsg_autorxmsg(kdmsg_msg_t *msg);
@@ -157,6 +170,7 @@ kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom,
        iocom->auto_lnk_conn.head = msg->any.head;
        msg->any.lnk_conn = iocom->auto_lnk_conn;
        iocom->conn_state = msg->state;
+       kdmsg_state_ref(msg->state);    /* iocom->conn_state */
        kdmsg_msg_write(msg);
 }
 
@@ -194,6 +208,11 @@ kdmsg_lnk_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
 
        if ((state->txcmd & DMSGF_DELETE) == 0 &&
            (msg->any.head.cmd & DMSGF_DELETE)) {
+               /*
+                * iocom->conn_state has a state ref, drop it when clearing.
+                */
+               if (iocom->conn_state)
+                       kdmsg_state_drop(iocom->conn_state);
                iocom->conn_state = NULL;
                kdmsg_msg_reply(msg, 0);
        }
@@ -245,12 +264,12 @@ kdmsg_iocom_uninit(kdmsg_iocom_t *iocom)
         */
        if ((state = iocom->freerd_state) != NULL) {
                iocom->freerd_state = NULL;
-               kdmsg_state_free(state);
+               kdmsg_state_drop(state);
        }
 
        if ((state = iocom->freewr_state) != NULL) {
                iocom->freewr_state = NULL;
-               kdmsg_state_free(state);
+               kdmsg_state_drop(state);
        }
 
        /*
@@ -371,13 +390,11 @@ kdmsg_iocom_thread_wr(void *arg)
 {
        kdmsg_iocom_t *iocom = arg;
        kdmsg_msg_t *msg;
-       kdmsg_state_t *state;
        ssize_t res;
        size_t abytes;
        int error = 0;
        int save_ticks;
        int didwarn;
-       int didwork;
 
        /*
         * Transmit loop
@@ -404,18 +421,15 @@ kdmsg_iocom_thread_wr(void *arg)
                         * persist and half-closed state handling.
                         */
                        TAILQ_REMOVE(&iocom->msgq, msg, qentry);
-                       lockmgr(&iocom->msglk, LK_RELEASE);
 
                        error = kdmsg_state_msgtx(msg);
                        if (error == EALREADY) {
                                error = 0;
                                kdmsg_msg_free(msg);
-                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                                continue;
                        }
                        if (error) {
                                kdmsg_msg_free(msg);
-                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                                break;
                        }
 
@@ -425,13 +439,14 @@ kdmsg_iocom_thread_wr(void *arg)
                         * We have to clean up the message as if the transmit
                         * succeeded even if it failed.
                         */
+                       lockmgr(&iocom->msglk, LK_RELEASE);
                        error = fp_write(iocom->msg_fp, &msg->any,
                                         msg->hdr_size, &res, UIO_SYSSPACE);
                        if (error || res != msg->hdr_size) {
                                if (error == 0)
                                        error = EINVAL;
-                               kdmsg_state_cleanuptx(msg);
                                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                               kdmsg_state_cleanuptx(msg);
                                break;
                        }
                        if (msg->aux_size) {
@@ -442,13 +457,13 @@ kdmsg_iocom_thread_wr(void *arg)
                                if (error || res != abytes) {
                                        if (error == 0)
                                                error = EINVAL;
-                                       kdmsg_state_cleanuptx(msg);
                                        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                                       kdmsg_state_cleanuptx(msg);
                                        break;
                                }
                        }
-                       kdmsg_state_cleanuptx(msg);
                        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                       kdmsg_state_cleanuptx(msg);
                }
        }
 
@@ -477,6 +492,10 @@ kdmsg_iocom_thread_wr(void *arg)
        }
 
        /*
+        * We can no longer receive new messages.  We must drain the transmit
+        * message queue and simulate received messages to close anay remaining
+        * states.
+        *
         * Loop until all the states are gone and there are no messages
         * pending transmit.
         */
@@ -487,60 +506,29 @@ kdmsg_iocom_thread_wr(void *arg)
               RB_ROOT(&iocom->staterd_tree) ||
               RB_ROOT(&iocom->statewr_tree)) {
                /*
-                * Drain the transmit msgq.
+                *
                 */
                kdmsg_drain_msgq(iocom);
+               kprintf("simulate failure for all substates of state0\n");
+               kdmsg_simulate_failure(&iocom->state0, 0, DMSG_ERR_LOSTLINK);
 
-               /*
-                * Flag any pending states that we are in the middle of
-                * a shutdown.
-                */
-               RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree)
-                       atomic_set_int(&state->flags, KDMSG_STATE_DYING);
-               RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree)
-                       atomic_set_int(&state->flags, KDMSG_STATE_DYING);
-
-               /*
-                * Simulate received message completions (with error) for
-                * all remaining states.  The transmit side is the
-                * responsibility of the service code so we may end up
-                * looping until things like e.g. I/O requests are complete.
-                */
-               didwork = 0;
-               RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
-                       if ((state->rxcmd & DMSGF_DELETE) == 0) {
-                               lockmgr(&iocom->msglk, LK_RELEASE);
-                               kdmsg_state_abort(state);
-                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-                               didwork = 1;
-                               break;
-                       }
-               }
-
-               RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
-                       if ((state->rxcmd & DMSGF_DELETE) == 0) {
-                               lockmgr(&iocom->msglk, LK_RELEASE);
-                               kdmsg_state_abort(state);
-                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-                               didwork = 1;
-                               break;
-                       }
-               }
-
-               /*
-                * If we've gone through the whole thing sleep for ~1 second.
-                * States may not go away completely until service side actions
-                * finish handling any in-progress operations.
-                */
-               if (didwork == 0)
-                       lksleep(iocom, &iocom->msglk, 0, "clstrtk", hz);
+               lksleep(iocom, &iocom->msglk, 0, "clstrtk", hz / 2);
 
                if ((int)(ticks - save_ticks) > hz*2 && didwarn == 0) {
                        didwarn = 1;
                        kprintf("kdmsg: warning, write thread on %p still "
                                "terminating\n", iocom);
                }
+               if ((int)(ticks - save_ticks) > hz*15 && didwarn == 1) {
+                       didwarn = 2;
+                       kprintf("kdmsg: warning, write thread on %p still "
+                               "terminating\n", iocom);
+               }
                if ((int)(ticks - save_ticks) > hz*60) {
+                       kprintf("kdmsg: msgq %p rd_tree %p wr_tree %p\n",
+                               TAILQ_FIRST(&iocom->msgq),
+                               RB_ROOT(&iocom->staterd_tree),
+                               RB_ROOT(&iocom->statewr_tree));
                        panic("kdmsg: write thread on %p could not terminate\n",
                              iocom);
                }
@@ -580,7 +568,7 @@ kdmsg_iocom_thread_wr(void *arg)
  * This cleans out the pending transmit message queue, adjusting any
  * persistent states properly in the process.
  *
- * Caller must hold pmp->iocom.msglk
+ * Called with iocom locked.
  */
 void
 kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
@@ -595,18 +583,18 @@ kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
         */
        while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
                TAILQ_REMOVE(&iocom->msgq, msg, qentry);
-               lockmgr(&iocom->msglk, LK_RELEASE);
                if (kdmsg_state_msgtx(msg))
                        kdmsg_msg_free(msg);
                else
                        kdmsg_state_cleanuptx(msg);
-               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        }
 }
 
 /*
  * Do all processing required to handle a freshly received message
  * after its low level header has been validated.
+ *
+ * iocom is not locked.
  */
 static
 int
@@ -615,6 +603,23 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
        kdmsg_iocom_t *iocom = msg->state->iocom;
        int error;
 
+#if 0
+       /*
+        * If sub-states exist and we are deleting (typically due to a
+        * disconnect), we may not receive deletes for any of the substates
+        * and must simulate associated failures.
+        */
+       if (msg->state &&
+           (msg->any.head.cmd & DMSGF_DELETE) &&
+           TAILQ_FIRST(&msg->state->subq)) {
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+               kprintf("simulate failure for substates of cmd %08x/%08x\n",
+                       msg->state->rxcmd, msg->state->txcmd);
+               kdmsg_simulate_failure(msg->state, 0, DMSG_ERR_LOSTLINK);
+               lockmgr(&iocom->msglk, LK_RELEASE);
+       }
+#endif
+
        /*
         * State machine tracking, state assignment for msg,
         * returns error and discard status.  Errors are fatal
@@ -732,9 +737,11 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
                state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
                state->flags = KDMSG_STATE_DYNAMIC;
                state->iocom = iocom;
+               state->refs = 1;
                TAILQ_INIT(&state->subq);
                iocom->freerd_state = state;
        }
+       state = NULL;   /* safety */
 
        /*
         * Lock RB tree and locate existing persistent state, if any.
@@ -745,25 +752,36 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
 again:
-       sdummy.msgid = msg->any.head.msgid;
-       sdummy.iocom = iocom;
-       if (msg->any.head.cmd & DMSGF_REVTRANS) {
-               state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree,
-                               &sdummy);
+       if (msg->state == &iocom->state0) {
+               sdummy.msgid = msg->any.head.msgid;
+               sdummy.iocom = iocom;
+               if (msg->any.head.cmd & DMSGF_REVTRANS) {
+                       state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree,
+                                       &sdummy);
+               } else {
+                       state = RB_FIND(kdmsg_state_tree, &iocom->staterd_tree,
+                                       &sdummy);
+               }
+
+               /*
+                * Set message state unconditionally.  If this is a CREATE
+                * message this state will become the parent state and new
+                * state will be allocated for the message state.
+                */
+               if (state == NULL)
+                       state = &iocom->state0;
+               if (state->flags & KDMSG_STATE_INTERLOCK) {
+                       state->flags |= KDMSG_STATE_SIGNAL;
+                       lksleep(state, &iocom->msglk, 0, "dmrace", hz);
+                       goto again;
+               }
+               kdmsg_state_ref(state);
+               kdmsg_state_drop(msg->state);   /* iocom->state0 */
+               msg->state = state;
        } else {
-               state = RB_FIND(kdmsg_state_tree, &iocom->staterd_tree,
-                               &sdummy);
-       }
-       if (state == NULL)
-               state = &iocom->state0;
-       if (state->flags & KDMSG_STATE_INTERLOCK) {
-               state->flags |= KDMSG_STATE_SIGNAL;
-               lksleep(state, &iocom->msglk, 0, "dmrace", hz);
-               goto again;
+               state = msg->state;
        }
 
-       msg->state = state;
-
        /*
         * Short-cut one-off or mid-stream messages.
         */
@@ -817,16 +835,24 @@ again:
                }
 
                /*
-                * Allocate new state
+                * Allocate new state.
+                *
+                * msg->state becomes the owner of the ref we inherit from
+                * freerd_stae.
                 */
+               kdmsg_state_drop(state);
                state = iocom->freerd_state;
                iocom->freerd_state = NULL;
 
-               msg->state = state;
+               msg->state = state;             /* inherits freerd ref */
                state->parent = pstate;
                KKASSERT(state->iocom == iocom);
-               state->flags |= KDMSG_STATE_INSERTED |
+               state->flags |= KDMSG_STATE_RBINSERTED |
+                               KDMSG_STATE_SUBINSERTED |
                                KDMSG_STATE_OPPOSITE;
+               kdmsg_state_ref(pstate);        /* states on pstate->subq */
+               kdmsg_state_ref(state);         /* state on pstate->subq */
+               kdmsg_state_ref(state);         /* state on rbtree */
                state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->txcmd = DMSGF_REPLY;
@@ -1108,7 +1134,7 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
                KKASSERT((state->rxcmd & DMSGF_DELETE) == 0);
                state->rxcmd |= DMSGF_DELETE;
                if (state->txcmd & DMSGF_DELETE) {
-                       KKASSERT(state->flags & KDMSG_STATE_INSERTED);
+                       KKASSERT(state->flags & KDMSG_STATE_RBINSERTED);
                        if (state->rxcmd & DMSGF_REPLY) {
                                KKASSERT(msg->any.head.cmd &
                                         DMSGF_REPLY);
@@ -1120,18 +1146,19 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
                                RB_REMOVE(kdmsg_state_tree,
                                          &iocom->staterd_tree, state);
                        }
+                       state->flags &= ~KDMSG_STATE_RBINSERTED;
                        pstate = state->parent;
-                       TAILQ_REMOVE(&pstate->subq, state, entry);
-                       if (pstate != &pstate->iocom->state0 &&
-                           TAILQ_EMPTY(&pstate->subq) &&
-                           (pstate->flags & KDMSG_STATE_INSERTED) == 0) {
-                               kdmsg_state_free(pstate);
+                       if (state->flags & KDMSG_STATE_SUBINSERTED) {
+                               TAILQ_REMOVE(&pstate->subq, state, entry);
+                               state->flags &= ~KDMSG_STATE_SUBINSERTED;
+                               kdmsg_state_drop(pstate);  /* pstate->subq */
+                               kdmsg_state_drop(state);   /* pstate->subq */
+                               state->parent = NULL;
+                       } else {
+                               KKASSERT(state->parent == NULL);
                        }
-                       state->flags &= ~KDMSG_STATE_INSERTED;
-                       state->parent = NULL;
                        kdmsg_msg_free(msg);
-                       if (TAILQ_EMPTY(&state->subq))
-                               kdmsg_state_free(state);
+                       kdmsg_state_drop(state);        /* state on rbtree */
                        lockmgr(&iocom->msglk, LK_RELEASE);
                } else {
                        kdmsg_msg_free(msg);
@@ -1151,7 +1178,24 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
  *
  * This is used when the other end of the link is dead so the device driver
  * gets a completed transaction for all pending states.
+ *
+ * Called with iocom locked.
  */
+static
+void
+kdmsg_simulate_failure(kdmsg_state_t *state, int meto, int error)
+{
+       kdmsg_state_t *substate;
+
+       kdmsg_state_ref(state);                 /* aborting */
+       while ((substate = TAILQ_FIRST(&state->subq)) != NULL) {
+               kdmsg_simulate_failure(substate, 1, error);
+       }
+       if (meto)
+               kdmsg_state_abort(state);
+       kdmsg_state_drop(state);                /* aborting */
+}
+
 static
 void
 kdmsg_state_abort(kdmsg_state_t *state)
@@ -1164,6 +1208,7 @@ kdmsg_state_abort(kdmsg_state_t *state)
         * around and tries to reply to a broken circuit when then calls
         * the state abort code again.
         */
+       KKASSERT((state->flags & KDMSG_STATE_ABORTING) == 0);
        if (state->flags & KDMSG_STATE_ABORTING)
                return;
        state->flags |= KDMSG_STATE_ABORTING;
@@ -1175,13 +1220,33 @@ kdmsg_state_abort(kdmsg_state_t *state)
         *       (vs a message generated by the other side using its state),
         *       so we must invert DMSGF_REVTRANS and DMSGF_REVCIRC.
         */
-       msg = kdmsg_msg_alloc(state, DMSG_LNK_ERROR, NULL, NULL);
-       if ((state->rxcmd & DMSGF_CREATE) == 0)
-               msg->any.head.cmd |= DMSGF_CREATE;
-       msg->any.head.cmd |= DMSGF_DELETE | (state->rxcmd & DMSGF_REPLY);
-       msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
-       msg->any.head.error = DMSG_ERR_LOSTLINK;
-       kdmsg_msg_receive_handling(msg);
+       if ((state->rxcmd & DMSGF_DELETE) == 0) {
+               msg = kdmsg_msg_alloc(state, DMSG_LNK_ERROR, NULL, NULL);
+               if ((state->rxcmd & DMSGF_CREATE) == 0)
+                       msg->any.head.cmd |= DMSGF_CREATE;
+               msg->any.head.cmd |= DMSGF_DELETE |
+                                    (state->rxcmd & DMSGF_REPLY);
+               msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
+               msg->any.head.error = DMSG_ERR_LOSTLINK;
+               lockmgr(&state->iocom->msglk, LK_RELEASE);
+               kdmsg_msg_receive_handling(msg);
+               lockmgr(&state->iocom->msglk, LK_EXCLUSIVE);
+               msg = NULL;
+       }
+
+       /*
+        * If the state still has a parent association we must remove it
+        * now even if it is not fully closed or the simulation loop will
+        * livelock.
+        */
+       if (state->flags & KDMSG_STATE_SUBINSERTED) {
+               KKASSERT(state->flags & KDMSG_STATE_RBINSERTED);
+               TAILQ_REMOVE(&state->parent->subq, state, entry);
+               state->flags &= ~KDMSG_STATE_SUBINSERTED;
+               kdmsg_state_drop(state->parent);  /* pstate->subq */
+               kdmsg_state_drop(state);          /* pstate->subq */
+               state->parent = NULL;
+       }
 }
 
 /*
@@ -1215,6 +1280,8 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
                state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
                state->flags = KDMSG_STATE_DYNAMIC;
                state->iocom = iocom;
+               state->refs = 1;
+               TAILQ_INIT(&state->subq);
                iocom->freewr_state = state;
        }
 
@@ -1222,7 +1289,6 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
         * Lock RB tree.  If persistent state is present it will have already
         * been assigned to msg.
         */
-       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        state = msg->state;
 
        /*
@@ -1230,7 +1296,6 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
         */
        if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
                                  DMSGF_ABORT)) == 0) {
-               lockmgr(&iocom->msglk, LK_RELEASE);
                return(0);
        }
 
@@ -1385,11 +1450,12 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
        if (state && error == 0)
                state->flags |= KDMSG_STATE_INTERLOCK;
 
-       lockmgr(&iocom->msglk, LK_RELEASE);
-
        return (error);
 }
 
+/*
+ * Called with iocom locked.
+ */
 static
 void
 kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
@@ -1403,8 +1469,6 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                return;
        }
 
-       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-
        /*
         * Clear interlock (XXX hack) in case the send side blocks and a
         * response is returned in the other thread before
@@ -1421,7 +1485,7 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                KKASSERT((state->txcmd & DMSGF_DELETE) == 0);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
-                       KKASSERT(state->flags & KDMSG_STATE_INSERTED);
+                       KKASSERT(state->flags & KDMSG_STATE_RBINSERTED);
                        if (state->txcmd & DMSGF_REPLY) {
                                KKASSERT(msg->any.head.cmd &
                                         DMSGF_REPLY);
@@ -1433,25 +1497,47 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                                RB_REMOVE(kdmsg_state_tree,
                                          &iocom->statewr_tree, state);
                        }
+                       state->flags &= ~KDMSG_STATE_RBINSERTED;
                        pstate = state->parent;
-                       TAILQ_REMOVE(&pstate->subq, state, entry);
-                       if (pstate != &pstate->iocom->state0 &&
-                           TAILQ_EMPTY(&pstate->subq) &&
-                           (pstate->flags & KDMSG_STATE_INSERTED) == 0) {
-                               kdmsg_state_free(pstate);
+                       if (state->flags & KDMSG_STATE_SUBINSERTED) {
+                               TAILQ_REMOVE(&pstate->subq, state, entry);
+                               state->flags &= ~KDMSG_STATE_SUBINSERTED;
+                               kdmsg_state_drop(pstate);  /* pstate->subq */
+                               kdmsg_state_drop(state);   /* pstate->subq */
+                               state->parent = NULL;
+                       } else {
+                               KKASSERT(state->parent == NULL);
                        }
-                       state->flags &= ~KDMSG_STATE_INSERTED;
-                       state->parent = NULL;
                        kdmsg_msg_free(msg);
-                       if (TAILQ_EMPTY(&state->subq))
-                               kdmsg_state_free(state);
+                       kdmsg_state_drop(state);   /* state on rbtree */
                } else {
                        kdmsg_msg_free(msg);
                }
        } else {
                kdmsg_msg_free(msg);
        }
-       lockmgr(&iocom->msglk, LK_RELEASE);
+}
+
+static
+void
+_kdmsg_state_ref(kdmsg_state_t *state KDMSG_DEBUG_ARGS)
+{
+       atomic_add_int(&state->refs, 1);
+#if KDMSG_DEBUG
+       kprintf("state %p +%d\t%s:%d\n", state, state->refs, file, line);
+#endif
+}
+
+static
+void
+_kdmsg_state_drop(kdmsg_state_t *state KDMSG_DEBUG_ARGS)
+{
+       KKASSERT(state->refs > 0);
+#if KDMSG_DEBUG
+       kprintf("state %p -%d\t%s:%d\n", state, state->refs, file, line);
+#endif
+       if (atomic_fetchadd_int(&state->refs, -1) == 1)
+               kdmsg_state_free(state);
 }
 
 static
@@ -1460,8 +1546,12 @@ kdmsg_state_free(kdmsg_state_t *state)
 {
        kdmsg_iocom_t *iocom = state->iocom;
 
-       KKASSERT((state->flags & KDMSG_STATE_INSERTED) == 0);
-       kfree(state, iocom->mmsg);
+       KKASSERT((state->flags & KDMSG_STATE_RBINSERTED) == 0);
+       KKASSERT((state->flags & KDMSG_STATE_SUBINSERTED) == 0);
+       KKASSERT(TAILQ_EMPTY(&state->subq));
+
+       if (state != &state->iocom->state0)
+               kfree(state, iocom->mmsg);
 }
 
 kdmsg_msg_t *
@@ -1499,10 +1589,15 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd,
                if (RB_INSERT(kdmsg_state_tree, &iocom->statewr_tree, state))
                        panic("duplicate msgid allocated");
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
-               state->flags |= KDMSG_STATE_INSERTED;
+               state->flags |= KDMSG_STATE_RBINSERTED |
+                               KDMSG_STATE_SUBINSERTED;
+               kdmsg_state_ref(pstate);        /* pstate->subq */
+               kdmsg_state_ref(state);         /* pstate->subq */
+               kdmsg_state_ref(state);         /* state on rbtree */
                lockmgr(&iocom->msglk, LK_RELEASE);
        } else {
                pstate = state->parent;
+               KKASSERT(pstate != NULL);
        }
 
        if (state->flags & KDMSG_STATE_OPPOSITE)
@@ -1515,6 +1610,7 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd,
        msg->any.head.msgid = state->msgid;
        msg->any.head.circuit = pstate->msgid;
        msg->state = state;
+       kdmsg_state_ref(state);                 /* msg->state */
 
        return (msg);
 }
@@ -1523,13 +1619,17 @@ void
 kdmsg_msg_free(kdmsg_msg_t *msg)
 {
        kdmsg_iocom_t *iocom = msg->state->iocom;
+       kdmsg_state_t *state;
 
        if ((msg->flags & KDMSG_FLAG_AUXALLOC) &&
            msg->aux_data && msg->aux_size) {
                kfree(msg->aux_data, iocom->mmsg);
                msg->flags &= ~KDMSG_FLAG_AUXALLOC;
        }
-       msg->state = NULL;
+       if ((state = msg->state) != NULL) {
+               msg->state = NULL;
+               kdmsg_state_drop(state);        /* msg->state */
+       }
        msg->aux_data = NULL;
        msg->aux_size = 0;
 
@@ -1635,6 +1735,52 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                      "terminated iocom\n");
        }
 
+       /*
+        * For stateful messages, if the circuit is dead we have to abort
+        * the state and discard the message.
+        *
+        * - We must discard the message because the other end will not
+        *   be expecting any more messages over the dead circuit and might
+        *   not be able to receive them.
+        *
+        * - We abort the state by simulating a failure to generate a fake
+        *   incoming DELETE.  This will trigger the state callback and allow
+        *   the device to clean things up and reply, closing the outgoing
+        *   direction and terminating the state.
+        *
+        * - Because there are numerous races, it is possible that an abort
+        *   has already been initiated on this state.
+        *
+        * - For now, don't bother checking to see if this is a CREATE
+        *   message, though we could probably add that as a restriction.
+        *   Any pre-existing state will probably have already had an abort
+        *   initiated on it.
+        *
+        * This race occurs quite often, particularly as SPANs stabilize.
+        * End-points must do the right thing.
+        */
+       if (state) {
+               KKASSERT((state->txcmd & DMSGF_DELETE) == 0);
+               if ((state->parent->txcmd & DMSGF_DELETE) ||
+                   (state->parent->flags & KDMSG_STATE_ABORTING)) {
+                       kprintf("kdmsg_msg_write: Write to dying circuit "
+                               "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
+                               state->parent->rxcmd,
+                               state->parent->txcmd,
+                               state->parent->flags);
+                       kdmsg_state_ref(state);
+                       kdmsg_state_msgtx(msg);
+                       kdmsg_state_cleanuptx(msg);
+                       if ((state->flags & KDMSG_STATE_ABORTING) == 0) {
+                               kdmsg_simulate_failure(state, 1,
+                                                      DMSG_ERR_LOSTLINK);
+                       }
+                       kdmsg_state_drop(state);
+                       lockmgr(&iocom->msglk, LK_RELEASE);
+                       return;
+               }
+       }
+
        /*
         * Finish up the msg fields.  Note that msg->aux_size and the
         * aux_bytes stored in the message header represent the unaligned
index e51d634..9247371 100644 (file)
@@ -90,6 +90,7 @@ gptinit(cdev_t dev, struct disk_info *info, struct diskslices **sspp)
        bp1->b_bio1.bio_flags |= BIO_SYNC;
        bp1->b_bcount = info->d_media_blksize;
        bp1->b_cmd = BUF_CMD_READ;
+       bp1->b_flags |= B_FAILONDIS;
        dev_dstrategy(wdev, &bp1->b_bio1);
        if (biowait(&bp1->b_bio1, "gptrd") != 0) {
                kprintf("%s: reading GPT @ block 1: error %d\n",
@@ -143,6 +144,7 @@ gptinit(cdev_t dev, struct disk_info *info, struct diskslices **sspp)
        bp2->b_bio1.bio_flags |= BIO_SYNC;
        bp2->b_bcount = table_blocks * info->d_media_blksize;
        bp2->b_cmd = BUF_CMD_READ;
+       bp2->b_flags |= B_FAILONDIS;
        dev_dstrategy(wdev, &bp2->b_bio1);
        if (biowait(&bp2->b_bio1, "gptrd") != 0) {
                kprintf("%s: reading GPT partition table @ %lld: error %d\n",
index 1a172af..254d42c 100644 (file)
@@ -184,6 +184,7 @@ l32_readdisklabel(cdev_t dev, struct diskslice *sp, disklabel_t *lpp,
        bp->b_bcount = secsize;
        bp->b_flags &= ~B_INVAL;
        bp->b_cmd = BUF_CMD_READ;
+       bp->b_flags |= B_FAILONDIS;
        dev_dstrategy(dev, &bp->b_bio1);
        if (biowait(&bp->b_bio1, "labrd"))
                msg = "I/O error";
@@ -312,6 +313,7 @@ l32_writedisklabel(cdev_t dev, struct diskslices *ssp, struct diskslice *sp,
        bp->b_bio1.bio_done = biodone_sync;
        bp->b_bio1.bio_flags |= BIO_SYNC;
        bp->b_bcount = lp->d_secsize;
+       bp->b_flags |= B_FAILONDIS;
 
 #if 1
        /*
index 8bf14f5..84e9039 100644 (file)
@@ -150,6 +150,7 @@ l64_readdisklabel(cdev_t dev, struct diskslice *sp, disklabel_t *lpp,
        bp->b_bio1.bio_flags |= BIO_SYNC;
        bp->b_bcount = bpsize;
        bp->b_flags &= ~B_INVAL;
+       bp->b_flags |= B_FAILONDIS;
        bp->b_cmd = BUF_CMD_READ;
        dev_dstrategy(dev, &bp->b_bio1);
 
@@ -324,6 +325,7 @@ l64_writedisklabel(cdev_t dev, struct diskslices *ssp,
        bp->b_bio1.bio_done = biodone_sync;
        bp->b_bio1.bio_flags |= BIO_SYNC;
        bp->b_bcount = bpsize;
+       bp->b_flags |= B_FAILONDIS;
 
        /*
         * Because our I/O is larger then the label, and because we do not
index e24f288..54f582a 100644 (file)
@@ -131,6 +131,7 @@ reread_mbr:
        bp->b_bio1.bio_flags |= BIO_SYNC;
        bp->b_bcount = info->d_media_blksize;
        bp->b_cmd = BUF_CMD_READ;
+       bp->b_flags |= B_FAILONDIS;
        dev_dstrategy(wdev, &bp->b_bio1);
        if (biowait(&bp->b_bio1, "mbrrd") != 0) {
                if ((info->d_dsflags & DSO_MBRQUIET) == 0) {
@@ -438,6 +439,7 @@ mbr_extended(cdev_t dev, struct disk_info *info, struct diskslices *ssp,
        bp->b_bio1.bio_flags |= BIO_SYNC;
        bp->b_bcount = info->d_media_blksize;
        bp->b_cmd = BUF_CMD_READ;
+       bp->b_flags |= B_FAILONDIS;
        dev_dstrategy(dev, &bp->b_bio1);
        if (biowait(&bp->b_bio1, "mbrrd") != 0) {
                diskerr(&bp->b_bio1, dev,
index 70b6910..e1ed51c 100644 (file)
@@ -312,7 +312,7 @@ struct buf {
 #define        B_HEAVY         0x00100000      /* Heavy-weight buffer */
 #define        B_DIRTY         0x00200000      /* Needs writing later. */
 #define        B_RELBUF        0x00400000      /* Release VMIO buffer. */
-#define        B_UNUSED23      0x00800000      /* Request wakeup on done */
+#define        B_FAILONDIS     0x00800000      /* Fail on disconnect */
 #define        B_VNCLEAN       0x01000000      /* On vnode clean list */
 #define        B_VNDIRTY       0x02000000      /* On vnode dirty list */
 #define        B_PAGING        0x04000000      /* volatile paging I/O -- bypass VMIO */
index bfb2dc4..75e53f0 100644 (file)
  * sub-transaction.  Note that msgids must still be unique on an
  * iocom-by-iocom basis.
  *
+ * Messages can race closing circuits.  When a circuit is lost,
+ * messages are simulated to delete any sub-transactions.
+ *
  *                         MESSAGE TRANSACTIONAL STATES
  *
  * Message transactions are handled by the CREATE, DELETE, REPLY, ABORT, and
@@ -724,6 +727,7 @@ struct kdmsg_state {
        TAILQ_ENTRY(kdmsg_state) user_entry;    /* available to devices */
        struct kdmsg_iocom *iocom;
        struct kdmsg_state *parent;
+       int             refs;                   /* refs */
        uint32_t        icmd;                   /* record cmd creating state */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
@@ -739,14 +743,15 @@ struct kdmsg_state {
        } any;
 };
 
-#define KDMSG_STATE_INSERTED   0x0001
+#define KDMSG_STATE_SUBINSERTED        0x0001
 #define KDMSG_STATE_DYNAMIC    0x0002
 #define KDMSG_STATE_DELPEND    0x0004          /* transmit delete pending */
 #define KDMSG_STATE_ABORTING   0x0008          /* avoids recursive abort */
 #define KDMSG_STATE_OPPOSITE   0x0010          /* opposite direction */
 #define KDMSG_STATE_DYING      0x0020          /* indicates circuit failure */
 #define KDMSG_STATE_INTERLOCK  0x0040
-#define KDMSG_STATE_SIGNAL     0x0080
+#define KDMSG_STATE_RBINSERTED 0x0080
+#define KDMSG_STATE_SIGNAL     0x0400
 
 struct kdmsg_msg {
        TAILQ_ENTRY(kdmsg_msg) qentry;          /* serialized queue */