hammer2 - Retool dmsg mechanics to improve virtual circuit design 2/2
authorMatthew Dillon <dillon@apollo.backplane.com>
Mon, 5 May 2014 02:52:05 +0000 (19:52 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 5 May 2014 03:03:29 +0000 (20:03 -0700)
* Use transaction stacking to effectively connect resources to consumers.
  Issuing a transaction over a received SPAN effectively creates a virtual
  circuit to the originator.

* The hammer2 service demon (userland) is currently being used to route,
  but direct kernel<->kernel socket connections are now theoretically
  possible for later performance work.  Current performance is going
  to be low due to layering.

* Get subr_diskiocom and the xdisk driver mostly working again.  The
  code is a lot cleaner but still needs considerable stability work and
  better unwinding during failure conditions.

* Start work on hammer2<->hammer2 communications.

14 files changed:
lib/libdmsg/dmsg.h
lib/libdmsg/dmsg_local.h
lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c
lib/libdmsg/service.c
lib/libdmsg/subs.c
sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sys/dev/disk/xdisk/xdisk.c
sys/kern/kern_dmsg.c
sys/kern/subr_diskiocom.c
sys/sys/dmsg.h
sys/sys/xdiskioctl.h

index 0767946..21dfd50 100644 (file)
@@ -156,11 +156,12 @@ typedef struct dmsg_media dmsg_media_t;
  * directions.
  */
 struct dmsg_state {
-       RB_ENTRY(dmsg_state) rbnode;            /* indexed by msgid */
+       RB_ENTRY(dmsg_state) rbnode;            /* by state->msgid */
        TAILQ_HEAD(, dmsg_state) subq;          /* active stacked states */
        TAILQ_ENTRY(dmsg_state) entry;          /* on parent subq */
        struct dmsg_iocom *iocom;
        struct dmsg_state *parent;              /* transaction stacking */
+       struct dmsg_state *relay;               /* routing */
        uint32_t        icmd;                   /* command creating state */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
@@ -181,8 +182,10 @@ struct dmsg_state {
 #define DMSG_STATE_INSERTED    0x0001
 #define DMSG_STATE_DYNAMIC     0x0002
 #define DMSG_STATE_NODEID      0x0004          /* manages a node id */
-#define DMSG_STATE_ROUTED      0x0008          /* message is routed */
+#define DMSG_STATE_UNUSED_0008 0x0008
 #define DMSG_STATE_OPPOSITE    0x0010          /* initiated by other end */
+#define DMSG_STATE_CIRCUIT     0x0020          /* LNK_SPAN special case */
+#define DMSG_STATE_ROOT                0x8000          /* iocom->state0 */
 
 /*
  * This is the core in-memory representation of a message structure.
@@ -260,7 +263,7 @@ typedef struct dmsg_ioq dmsg_ioq_t;
 #define DMSG_IOQ_ERROR_IVWRAP          18      /* IVs exhaused */
 #define DMSG_IOQ_ERROR_MACFAIL         19      /* MAC of encr alg failed */
 #define DMSG_IOQ_ERROR_ALGO            20      /* Misc. encr alg error */
-#define DMSG_IOQ_ERROR_ROUTED          21      /* ignore routed message */
+#define DMSG_IOQ_ERROR_UNUSED21                21
 #define DMSG_IOQ_ERROR_BAD_CIRCUIT     22      /* unconfigured circuit */
 #define DMSG_IOQ_ERROR_UNUSED23                23
 #define DMSG_IOQ_ERROR_ASSYM           24      /* Assymetric path */
@@ -289,7 +292,6 @@ struct dmsg_iocom {
        void    (*altmsg_callback)(struct dmsg_iocom *);
        void    (*rcvmsg_callback)(dmsg_msg_t *msg);
        void    (*usrmsg_callback)(dmsg_msg_t *msg, int unmanaged);
-       void    (*node_handler)(void **opaquep, dmsg_msg_t *msg, int op);
        dmsg_msg_queue_t txmsgq;                /* tx msgq from remote */
        struct h2span_conn *conn;               /* if LNK_CONN active */
        uint64_t        conn_msgid;             /* LNK_CONN circuit */
@@ -338,7 +340,6 @@ struct dmsg_master_service_info {
        void    *handle;
        void    (*altmsg_callback)(dmsg_iocom_t *iocom);
        void    (*usrmsg_callback)(dmsg_msg_t *msg, int unmanaged);
-       void    (*node_handler)(void **opaquep, dmsg_msg_t *msg, int op);
        void    (*exit_callback)(void *handle);
 };
 
@@ -405,6 +406,7 @@ void dmsg_iocom_drain(dmsg_iocom_t *iocom);
 void dmsg_iocom_flush1(dmsg_iocom_t *iocom);
 void dmsg_iocom_flush2(dmsg_iocom_t *iocom);
 
+void dmsg_state_relay(dmsg_msg_t *msg);
 void dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
 void dmsg_state_free(dmsg_state_t *state);
 
index 99aac89..c3b4533 100644 (file)
@@ -44,6 +44,8 @@
 #include <sys/uio.h>
 
 #include <netinet/in.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
 #include <arpa/inet.h>
 
 #include <assert.h>
index cb0083e..7ca010b 100644 (file)
@@ -38,7 +38,6 @@
 int DMsgDebugOpt;
 
 static int dmsg_state_msgrx(dmsg_msg_t *msg);
-static int dmsg_state_routedrx(dmsg_state_t *state, dmsg_msg_t *msg);
 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
 
@@ -128,6 +127,7 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
        dmsg_ioq_init(iocom, &iocom->ioq_tx);
        iocom->state0.iocom = iocom;
        iocom->state0.parent = &iocom->state0;
+       iocom->state0.flags = DMSG_STATE_ROOT;
        TAILQ_INIT(&iocom->state0.subq);
 
        if (pipe(iocom->wakeupfds) < 0)
@@ -248,7 +248,7 @@ dmsg_iocom_done(dmsg_iocom_t *iocom)
  * Allocate a new message using the specified transaction state.
  *
  * If CREATE is set a new transaction is allocated relative to the passed-in
- * transaction.
+ * transaction (the 'state' argument becomes pstate).
  *
  * If CREATE is not set the message is associated with the passed-in
  * transaction.
@@ -291,6 +291,7 @@ dmsg_msg_alloc(dmsg_state_t *state,
                state = malloc(sizeof(*state));
                bzero(state, sizeof(*state));
                TAILQ_INIT(&state->subq);
+               atomic_add_int(&pstate->refs, 1);
                state->parent = pstate;
                state->iocom = iocom;
                state->flags = DMSG_STATE_DYNAMIC;
@@ -300,6 +301,7 @@ dmsg_msg_alloc(dmsg_state_t *state,
                state->icmd = state->txcmd & DMSGF_BASECMDMASK;
                state->func = func;
                state->any.any = data;
+
                RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
                state->flags |= DMSG_STATE_INSERTED;
@@ -980,8 +982,14 @@ again:
                xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
                if (xcrc32 != msg->any.head.aux_crc) {
                        ioq->error = DMSG_IOQ_ERROR_ACRC;
-                       fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
-                               xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
+                       fprintf(stderr,
+                               "iocom: ACRC error %08x vs %08x "
+                               "msgid %016jx msgcmd %08x auxsize %d\n",
+                               xcrc32,
+                               msg->any.head.aux_crc,
+                               (intmax_t)msg->any.head.msgid,
+                               msg->any.head.cmd,
+                               msg->any.head.aux_bytes);
                        break;
                }
                break;
@@ -1208,13 +1216,6 @@ skip:
                                goto again;
                        }
 
-                       /*
-                        * msg routed, msg pointer no longer owned by us.
-                        * Go to the top and start reading another.
-                        */
-                       if (error == DMSG_IOQ_ERROR_ROUTED)
-                               goto again;
-
                        /*
                         * Process real error and throw away message.
                         */
@@ -1543,7 +1544,8 @@ dmsg_msg_write(dmsg_msg_t *msg)
         */
        pthread_mutex_lock(&iocom->mtx);
        state = msg->state;
-       if (state != &state->iocom->state0) {
+
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                /*
                 * Existing transaction (could be reply).  It is also
                 * possible for this to be the first reply (CREATE is set),
@@ -1603,7 +1605,6 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
        dmsg_msg_t *nmsg;
        uint32_t cmd;
 
-
        /*
         * Reply with a simple error code and terminate the transaction.
         */
@@ -1618,7 +1619,7 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
         * If our direction has already been closed we just return without
         * doing anything.
         */
-       if (state != &state->iocom->state0) {
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                if (state->txcmd & DMSGF_DELETE)
                        return;
                if (state->txcmd & DMSGF_REPLY)
@@ -1635,7 +1636,7 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
         * allocate new state.  We have our state already.
         */
        nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
-       if (state != &state->iocom->state0) {
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
@@ -1672,7 +1673,7 @@ dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
         * If our direction has already been closed we just return without
         * doing anything.
         */
-       if (state != &state->iocom->state0) {
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                if (state->txcmd & DMSGF_DELETE)
                        return;
                if (state->txcmd & DMSGF_REPLY)
@@ -1683,7 +1684,7 @@ dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
                        cmd |= DMSGF_REPLY;
        }
        nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
-       if (state != &state->iocom->state0) {
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
@@ -1716,7 +1717,7 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
                cmd |= DMSGF_REPLY;
 
        nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
-       if (state != &state->iocom->state0) {
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
@@ -1748,7 +1749,7 @@ dmsg_state_result(dmsg_state_t *state, uint32_t error)
                cmd |= DMSGF_REPLY;
 
        nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
-       if (state != &state->iocom->state0) {
+       if ((state->flags & DMSG_STATE_ROOT) == 0) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
@@ -1763,8 +1764,8 @@ dmsg_state_result(dmsg_state_t *state, uint32_t error)
  */
 
 /*
- * Process circuit and state tracking for a message after reception, prior
- * to execution.
+ * Process state tracking for a message after reception, prior to execution.
+ * Possibly route the message (consuming it).
  *
  * Called with msglk held and the msg dequeued.
  *
@@ -1823,10 +1824,14 @@ dmsg_state_result(dmsg_state_t *state, uint32_t error)
  *
  * --
  *
- * One-off messages (no reply expected) are sent with neither CREATE or DELETE
- * set.  One-off messages cannot be aborted and typically aren't processed
- * by these routines.  The REPLY bit can be used to distinguish whether a
- * one-off message is a command or reply.  For example, one-off replies
+ * One-off messages (no reply expected) are sent without an established
+ * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
+ * For one-off messages sent over circuits msgid generally MUST be 0.
+ *
+ * One-off messages cannot be aborted and typically aren't processed
+ * by these routines.  Order is still guaranteed for messages sent over
+ * the same circuit.  The REPLY bit can be used to distinguish whether
+ * a one-off message is a command or reply.  For example, one-off replies
  * will typically just contain status updates.
  */
 static int
@@ -1838,19 +1843,40 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
        dmsg_state_t sdummy;
        int error;
 
-#if 0
-       fprintf(stderr,
-               "MSGREAD  %016jx %08x\n",
-               msg->any.head.msgid, msg->any.head.cmd);
-#endif
-
        pthread_mutex_lock(&iocom->mtx);
 
        /*
-        * XXX handle circuit accounting
+        * Lookup the circuit (pstate).  The circuit will be an open
+        * transaction.  The REVCIRC bit in the message tells us which side
+        * initiated it.
         */
+       if (msg->any.head.circuit) {
+               sdummy.msgid = msg->any.head.circuit;
+
+               if (msg->any.head.cmd & DMSGF_REVCIRC) {
+                       pstate = RB_FIND(dmsg_state_tree,
+                                        &iocom->statewr_tree,
+                                        &sdummy);
+               } else {
+                       pstate = RB_FIND(dmsg_state_tree,
+                                        &iocom->staterd_tree,
+                                        &sdummy);
+               }
+               if (pstate == NULL) {
+                       fprintf(stderr,
+                               "missing parent in stacked trans %s\n",
+                               dmsg_msg_str(msg));
+                       error = DMSG_IOQ_ERROR_TRANS;
+                       pthread_mutex_unlock(&iocom->mtx);
+                       assert(0);
+               }
+       } else {
+               pstate = &iocom->state0;
+       }
 
        /*
+        * Lookup the msgid.
+        *
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         * Otherwise there is no state (retain &iocom->state0)
@@ -1860,34 +1886,42 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
        else
                state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
-       if (state)
-               msg->state = state;     /* found an open transaction */
-       else
-               state = msg->state;     /* retain &iocom->state0 */
 
-       pthread_mutex_unlock(&iocom->mtx);
-
-       /*
-        * Short-cut one-off or mid-stream messages (state may be NULL).
-        */
-       if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
-                                 DMSGF_ABORT)) == 0) {
-               error = 0;
-               goto done;
+       if (state) {
+               /*
+                * Message over an existing transaction (CREATE should not
+                * be set).
+                */
+               msg->state = state;
+               assert(pstate == state->parent);
+       } else {
+               /*
+                * Either a new transaction (if CREATE set) or a one-off.
+                */
+               state = pstate;
        }
 
+       pthread_mutex_unlock(&iocom->mtx);
+
        /*
         * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
         * inside the case statements.
+        *
+        * Construct new state as necessary.
         */
        switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
                                    DMSGF_REPLY)) {
        case DMSGF_CREATE:
        case DMSGF_CREATE | DMSGF_DELETE:
                /*
-                * New persistant command received.
+                * Create new sub-transaction under pstate.
+                * (any DELETE is handled in post-processing of msg).
+                *
+                * (During routing the msgid was made unique for this
+                * direction over the comlink, so our RB trees can be
+                * iocom-based instead of state-based).
                 */
-               if (state != &state->iocom->state0) {
+               if (state != pstate) {
                        fprintf(stderr,
                                "duplicate transaction %s\n",
                                dmsg_msg_str(msg));
@@ -1897,60 +1931,38 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                }
 
                /*
-                * Lookup the circuit.  The circuit is an open transaction.
-                * the REVCIRC bit in the message tells us which side
-                * initiated the transaction representing the circuit.
-                */
-               if (msg->any.head.circuit) {
-                       pthread_mutex_lock(&iocom->mtx);
-                       sdummy.msgid = msg->any.head.circuit;
-
-                       if (msg->any.head.cmd & DMSGF_REVCIRC) {
-                               pstate = RB_FIND(dmsg_state_tree,
-                                                &iocom->statewr_tree,
-                                                &sdummy);
-                       } else {
-                               pstate = RB_FIND(dmsg_state_tree,
-                                                &iocom->staterd_tree,
-                                                &sdummy);
-                       }
-                       if (pstate == NULL) {
-                               fprintf(stderr,
-                                       "missing parent in stacked trans %s\n",
-                                       dmsg_msg_str(msg));
-                               error = DMSG_IOQ_ERROR_TRANS;
-                               pthread_mutex_unlock(&iocom->mtx);
-                               assert(0);
-                               break;
-                       }
-                       pthread_mutex_unlock(&iocom->mtx);
-               } else {
-                       pstate = &iocom->state0;
-               }
-
-               /*
-                * Allocate new state
+                * Allocate the new state.
                 */
                state = malloc(sizeof(*state));
                bzero(state, sizeof(*state));
                TAILQ_INIT(&state->subq);
+               atomic_add_int(&pstate->refs, 1);
                state->parent = pstate;
                state->iocom = iocom;
                state->flags = DMSG_STATE_DYNAMIC |
-                              DMSG_STATE_OPPOSITE |
-                              (pstate->flags & DMSG_STATE_ROUTED);
+                              DMSG_STATE_OPPOSITE;
                state->msgid = msg->any.head.msgid;
                state->txcmd = DMSGF_REPLY;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
                msg->state = state;
-
                pthread_mutex_lock(&iocom->mtx);
                RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
                state->flags |= DMSG_STATE_INSERTED;
+
+               /*
+                * If the parent is a relay set up the state handler to
+                * automatically route the message.  Local processing will
+                * not occur if set.
+                *
+                * (state relays are seeded by SPAN processing)
+                */
+               if (pstate->relay)
+                       state->func = dmsg_state_relay;
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
+
                if (DMsgDebugOpt) {
                        fprintf(stderr,
                                "create state %p id=%08x on iocom staterd %p\n",
@@ -1961,15 +1973,17 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                /*
                 * Persistent state is expected but might not exist if an
                 * ABORT+DELETE races the close.
+                *
+                * (any DELETE is handled in post-processing of msg).
                 */
-               if (state == &state->iocom->state0) {
+               if (state == pstate) {
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = DMSG_IOQ_ERROR_EALREADY;
                        } else {
                                fprintf(stderr, "missing-state %s\n",
                                        dmsg_msg_str(msg));
                                error = DMSG_IOQ_ERROR_TRANS;
-                       assert(0);
+                               assert(0);
                        }
                        break;
                }
@@ -1985,7 +1999,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                                fprintf(stderr, "reused-state %s\n",
                                        dmsg_msg_str(msg));
                                error = DMSG_IOQ_ERROR_TRANS;
-                       assert(0);
+                               assert(0);
                        }
                        break;
                }
@@ -1997,7 +2011,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                 * allow.
                 */
                if (msg->any.head.cmd & DMSGF_ABORT) {
-                       if (state == &state->iocom->state0 ||
+                       if ((state == pstate) ||
                            (state->rxcmd & DMSGF_CREATE) == 0) {
                                error = DMSG_IOQ_ERROR_EALREADY;
                                break;
@@ -2011,15 +2025,14 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                 * When receiving a reply with CREATE set the original
                 * persistent state message should already exist.
                 */
-               if (state == &state->iocom->state0) {
+               if (state == pstate) {
                        fprintf(stderr, "no-state(r) %s\n",
                                dmsg_msg_str(msg));
                        error = DMSG_IOQ_ERROR_TRANS;
                        assert(0);
                        break;
                }
-               assert(((state->rxcmd ^ msg->any.head.cmd) &
-                       DMSGF_REPLY) == 0);
+               assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                error = 0;
                break;
@@ -2028,14 +2041,14 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                 * Received REPLY+ABORT+DELETE in case where msgid has
                 * already been fully closed, ignore the message.
                 */
-               if (state == &state->iocom->state0) {
+               if (state == pstate) {
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = DMSG_IOQ_ERROR_EALREADY;
                        } else {
                                fprintf(stderr, "no-state(r,d) %s\n",
                                        dmsg_msg_str(msg));
                                error = DMSG_IOQ_ERROR_TRANS;
-                       assert(0);
+                               assert(0);
                        }
                        break;
                }
@@ -2052,7 +2065,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                                fprintf(stderr, "reused-state(r,d) %s\n",
                                        dmsg_msg_str(msg));
                                error = DMSG_IOQ_ERROR_TRANS;
-                       assert(0);
+                               assert(0);
                        }
                        break;
                }
@@ -2063,7 +2076,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                 * Check for mid-stream ABORT reply received to sent command.
                 */
                if (msg->any.head.cmd & DMSGF_ABORT) {
-                       if (state == &state->iocom->state0 ||
+                       if (state == pstate ||
                            (state->rxcmd & DMSGF_CREATE) == 0) {
                                error = DMSG_IOQ_ERROR_EALREADY;
                                break;
@@ -2083,9 +2096,8 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
         * The two can be told apart because outer-transaction commands
         * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
         */
-done:
        if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
-               if (state != &state->iocom->state0) {
+               if ((state->flags & DMSG_STATE_ROOT) == 0) {
                        msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
                                    (msg->any.head.cmd & (DMSGF_CREATE |
                                                          DMSGF_DELETE |
@@ -2096,52 +2108,72 @@ done:
        } else {
                msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
        }
-
-       /*
-        * Possibly route the message if the state inherited the ROUTED
-        * flag.
-        */
-       if (state->flags & DMSG_STATE_ROUTED)
-               error = dmsg_state_routedrx(state, msg);
-
        return (error);
 }
 
 /*
- * Routed messages still do state-tracking
+ * Route the message and handle pair-state processing.
  */
-static int
-dmsg_state_routedrx(dmsg_state_t *state, dmsg_msg_t *msg)
+void
+dmsg_state_relay(dmsg_msg_t *lmsg)
 {
-       /*
-        * If this message is a CREATE or DELETE on the LNK_SPAN transaction
-        * itself we process it normally rather than route it.
-        */
-       if (state->parent == &state->iocom->state0 &&
-           (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE))) {
-               assert(state->icmd == DMSG_LNK_SPAN);
-               return 0;
-       }
-
-       /*
-        * When routing the msgid must be translated to our representation
-        * of the transaction. XXX
-        */
-       fprintf(stderr, "ROUTING MESSAGE\n");
-
-       if (state->parent == &state->iocom->state0 &&
-           state->icmd == DMSG_LNK_SPAN) {
+       dmsg_state_t *lpstate;
+       dmsg_state_t *rpstate;
+       dmsg_state_t *lstate;
+       dmsg_state_t *rstate;
+       dmsg_msg_t *rmsg;
+
+       if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
+           DMSGF_CREATE) {
                /*
-                * Route a non-transactional command through the SPAN.
+                * New sub-transaction, establish new state and relay.
                 */
+               lstate = lmsg->state;
+               lpstate = lstate->parent;
+               rpstate = lpstate->relay;
+               assert(lstate->relay == NULL);
+               assert(rpstate != NULL);
+
+               rmsg = dmsg_msg_alloc(rpstate,
+                                     lmsg->aux_size,
+                                     lmsg->any.head.cmd,
+                                     dmsg_state_relay, NULL);
+               rstate = rmsg->state;
+               rstate->relay = lstate;
+               lstate->relay = rstate;
+               atomic_add_int(&lstate->refs, 1);
+               atomic_add_int(&rstate->refs, 1);
        } else {
                /*
-                * Route a transactional message stacked under the LNK_SPAN.
+                * State & relay already established
                 */
+               lstate = lmsg->state;
+               rstate = lstate->relay;
+               assert(rstate != NULL);
+
+               rmsg = dmsg_msg_alloc(rstate,
+                                     lmsg->aux_size,
+                                     lmsg->any.head.cmd,
+                                     dmsg_state_relay, NULL);
+       }
+       if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
+               bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
+                     lmsg->hdr_size - sizeof(lmsg->any.head));
        }
-       return DMSG_IOQ_ERROR_ROUTED;
+       rmsg->any.head.error = lmsg->any.head.error;
+       rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
+       rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
+       rmsg->aux_data = lmsg->aux_data;
+       lmsg->aux_data = NULL;
+       /*
+       fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
+       */
+       dmsg_msg_write(rmsg);
 }
 
+/*
+ * Cleanup and retire msg after processing
+ */
 void
 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 {
@@ -2150,7 +2182,7 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 
        assert(msg->state->iocom == iocom);
        state = msg->state;
-       if (state == &iocom->state0) {
+       if (state->flags & DMSG_STATE_ROOT) {
                /*
                 * Free a non-transactional message, there is no state
                 * to worry about.
@@ -2178,13 +2210,20 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                        }
                        pstate = state->parent;
                        TAILQ_REMOVE(&pstate->subq, state, entry);
-                       if (pstate != &pstate->iocom->state0 &&
+                       state->flags &= ~DMSG_STATE_INSERTED;
+                       state->parent = NULL;
+                       atomic_add_int(&pstate->refs, -1);
+
+                       if ((pstate->flags & DMSG_STATE_ROOT) == 0 &&
                            TAILQ_EMPTY(&pstate->subq) &&
                            (pstate->flags & DMSG_STATE_INSERTED) == 0) {
                                dmsg_state_free(pstate);
                        }
-                       state->flags &= ~DMSG_STATE_INSERTED;
-                       state->parent = NULL;
+
+                       if (state->relay) {
+                               atomic_add_int(&state->relay->refs, -1);
+                               state->relay = NULL;
+                       }
                        dmsg_msg_free(msg);
                        if (TAILQ_EMPTY(&state->subq))
                                dmsg_state_free(state);
@@ -2209,7 +2248,7 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 
        assert(iocom == msg->state->iocom);
        state = msg->state;
-       if (state == &state->iocom->state0) {
+       if (state->flags & DMSG_STATE_ROOT) {
                dmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
                pthread_mutex_lock(&iocom->mtx);
@@ -2229,13 +2268,20 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                        }
                        pstate = state->parent;
                        TAILQ_REMOVE(&pstate->subq, state, entry);
-                       if (pstate != &pstate->iocom->state0 &&
+                       state->flags &= ~DMSG_STATE_INSERTED;
+                       state->parent = NULL;
+                       atomic_add_int(&pstate->refs, -1);
+
+                       if ((pstate->flags & DMSG_STATE_ROOT) == 0 &&
                            TAILQ_EMPTY(&pstate->subq) &&
                            (pstate->flags & DMSG_STATE_INSERTED) == 0) {
                                dmsg_state_free(pstate);
                        }
-                       state->flags &= ~DMSG_STATE_INSERTED;
-                       state->parent = NULL;
+
+                       if (state->relay) {
+                               atomic_add_int(&state->relay->refs, -1);
+                               state->relay = NULL;
+                       }
                        dmsg_msg_free(msg);
                        if (TAILQ_EMPTY(&state->subq))
                                dmsg_state_free(state);
index 37bac15..86f199b 100644 (file)
@@ -528,8 +528,11 @@ dmsg_lnk_span(dmsg_msg_t *msg)
        char *alloc = NULL;
 
        /*
-        * Ignore reply to LNK_SPAN.  The reply is needed to forge the
-        * return path for the transaction.
+        * Ignore reply to LNK_SPAN.  The reply is expected and will commands
+        * to flow in both directions on the open transaction.  This will also
+        * ignore DMSGF_REPLY|DMSGF_DELETE messages.  Since we take no action
+        * if the other end unexpectedly closes their side of the transaction,
+        * we can ignore that too.
         */
        if (msg->any.head.cmd & DMSGF_REPLY) {
                printf("Ignore reply to LNK_SPAN\n");
@@ -585,14 +588,26 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                        node->cls = cls;
                        RB_INIT(&node->tree);
                        RB_INSERT(h2span_node_tree, &cls->tree, node);
-                       if (iocom->node_handler) {
-                               iocom->node_handler(&node->opaque, msg,
-                                                   DMSG_NODEOP_ADD);
-                       }
                }
 
                /*
                 * Create the link
+                *
+                * NOTE: Sub-transactions on the incoming SPAN can be used
+                *       to talk to the originator.  We should not set-up
+                *       state->relay for incoming SPANs since our sub-trans
+                *       is running on the same interface (i.e. no actual
+                *       relaying need be done).
+                *
+                * NOTE: Later on when we relay the SPAN out the outgoing
+                *       SPAN state will be set up to relay back to this
+                *       state.
+                *
+                * NOTE: It is possible for SPAN targets to send one-way
+                *       messages to the originator but it is not possible
+                *       for the originator to (currently) broadcast one-way
+                *       messages to all of its SPAN targets.  The protocol
+                *       allows such a feature to be added in the future.
                 */
                assert(state->any.link == NULL);
                slink = dmsg_alloc(sizeof(*slink));
@@ -616,6 +631,12 @@ dmsg_lnk_span(dmsg_msg_t *msg)
 #if 0
                dmsg_relay_scan(NULL, node);
 #endif
+               /*
+                * Ack the open, which will issue a CREATE on our side, and
+                * leave the transaction open.  Necessary to allow the
+                * transaction to be used as a virtual circuit.
+                */
+               dmsg_state_result(state, 0);
                dmsg_iocom_signal(iocom);
        }
 
@@ -650,10 +671,6 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                RB_REMOVE(h2span_link_tree, &node->tree, slink);
                if (RB_EMPTY(&node->tree)) {
                        RB_REMOVE(h2span_node_tree, &cls->tree, node);
-                       if (iocom->node_handler) {
-                               iocom->node_handler(&node->opaque, msg,
-                                                   DMSG_NODEOP_DEL);
-                       }
                        if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
                                RB_REMOVE(h2span_cluster_tree,
                                          &cluster_tree, cls);
@@ -1023,7 +1040,6 @@ dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
                             0, DMSG_LNK_SPAN | DMSGF_CREATE,
                             dmsg_lnk_relay, relay);
        relay->target_rt = msg->state;
-       msg->state->flags |= DMSG_STATE_ROUTED;
 
        msg->any.lnk_span = slink->lnk_span;
        msg->any.lnk_span.dist = slink->lnk_span.dist + 1;
@@ -1032,6 +1048,13 @@ dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
        RB_INSERT(h2span_relay_tree, &conn->tree, relay);
        TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
 
+       /*
+        * Seed the relay so new sub-transactions received on the outgoing
+        * SPAN circuit are relayed back to the originator.
+        */
+       msg->state->relay = relay->source_rt;
+       atomic_add_int(&relay->source_rt->refs, 1);
+
        dmsg_msg_write(msg);
 
        return (relay);
@@ -1090,6 +1113,11 @@ dmsg_relay_delete(h2span_relay_t *relay)
                /* state invalid after reply */
                relay->target_rt = NULL;
        }
+
+       /*
+        * NOTE: relay->source_rt->refs is held by the relay SPAN
+        *       state, not by this relay structure.
+        */
        relay->conn = NULL;
        relay->source_rt = NULL;
        dmsg_free(relay);
index afc5aa3..e1e4ec6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
+ * Copyright (c) 2011-2014 The DragonFly Project.  All rights reserved.
  *
  * This code is derived from software contributed to The DragonFly Project
  * by Matthew Dillon <dillon@dragonflybsd.org>
@@ -61,7 +61,6 @@ dmsg_master_service(void *data)
                        master_auth_rxmsg,
                        info->usrmsg_callback,
                        info->altmsg_callback);
-       iocom.node_handler = info->node_handler;
        if (info->noclosealt)
                iocom.flags &= ~DMSG_IOCOMF_CLOSEALT;
        if (info->label) {
@@ -168,9 +167,16 @@ master_link_rxmsg(dmsg_msg_t *msg)
        iocom = state->iocom;
        cmd = (state != &iocom->state0) ? state->icmd : msg->any.head.cmd;
 
-       if (state != &iocom->state0 && state->func) {
+       if (state->func) {
+               /*
+                * Call function or router
+                */
+               assert(state != &iocom->state0);
                state->func(msg);
        } else {
+               /*
+                * Top-level message
+                */
                switch(cmd & DMSGF_PROTOS) {
                case DMSG_PROTO_LNK:
                        dmsg_msg_lnk(msg);
index 7fafdb3..2466044 100644 (file)
@@ -90,6 +90,7 @@ dmsg_connect(const char *hostname)
        struct sockaddr_in lsin;
        struct hostent *hen;
        int fd;
+       int opt;
 
        /*
         * Acquire socket and set options
@@ -99,6 +100,8 @@ dmsg_connect(const char *hostname)
                        strerror(errno));
                return -1;
        }
+       opt = 1;
+       setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
 
        /*
         * Connect to the target
index 072c4fc..93d19ba 100644 (file)
@@ -61,7 +61,6 @@ cmd_shell(const char *hostname)
        info->detachme = 0;
        info->usrmsg_callback = shell_msghandler;
        info->altmsg_callback = shell_ttymsg;
-       info->node_handler = NULL;
        info->label = strdup("debug");
        pthread_create(&thread, NULL, dmsg_master_service, info);
        pthread_join(thread, NULL);
index 6251770..4c1d2f5 100644 (file)
@@ -94,7 +94,6 @@ static void disk_reconnect(const char *disk);
 static void disk_disconnect(void *handle);
 static void udev_check_disks(void);
 static void hammer2_usrmsg_handler(dmsg_msg_t *msg, int unmanaged);
-static void hammer2_node_handler(void **opaque, struct dmsg_msg *msg, int op);
 static void *hammer2_volconf_thread(void *info);
 static void hammer2_volconf_signal(dmsg_iocom_t *iocom);
 static void hammer2_volconf_start(hammer2_media_config_t *conf,
@@ -102,9 +101,7 @@ static void hammer2_volconf_start(hammer2_media_config_t *conf,
 static void hammer2_volconf_stop(hammer2_media_config_t *conf);
 
 
-static void xdisk_reconnect(struct service_node_opaque *info);
-static void xdisk_disconnect(void *handle);
-static void *xdisk_attach_tmpthread(void *data);
+static void xdisk_connect(void);
 
 /*
  * Start-up the master listener daemon for the machine.  This daemon runs
@@ -216,6 +213,7 @@ service_thread(void *data)
        int fd;
        int i;
        int count;
+       int opt;
        struct statfs *mntbuf = NULL;
        struct statvfs *mntvbuf = NULL;
 
@@ -226,11 +224,18 @@ service_thread(void *data)
        pthread_detach(pthread_self());
 
        /*
-        * Start up a thread to handle block device monitoring
+        * Start up a thread to handle block device monitoring for
+        * export to the cluster.
         */
        thread = NULL;
        pthread_create(&thread, NULL, udev_thread, NULL);
 
+       /*
+        * Start up a thread to tie /dev/xdisk into the cluster
+        * controller.
+        */
+       xdisk_connect();
+
        /*
         * Start thread to manage /etc/hammer2/autoconn
         */
@@ -259,6 +264,8 @@ service_thread(void *data)
                                continue;
                        break;
                }
+               opt = 1;
+               setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof opt);
                thread = NULL;
                fprintf(stderr, "service_thread: accept fd %d\n", fd);
                info = malloc(sizeof(*info));
@@ -266,7 +273,6 @@ service_thread(void *data)
                info->fd = fd;
                info->detachme = 1;
                info->usrmsg_callback = hammer2_usrmsg_handler;
-               info->node_handler = hammer2_node_handler;
                info->label = strdup("client");
                pthread_create(&thread, NULL, dmsg_master_service, info);
        }
@@ -483,63 +489,6 @@ hammer2_volconf_signal(dmsg_iocom_t *iocom)
        atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
 }
 
-
-/*
- * Node discovery code on received SPANs (or loss of SPANs).  This code
- * is used to track the availability of remote block devices and install
- * or deinstall them using the xdisk driver (/dev/xdisk).
- *
- * An installed xdisk creates /dev/xa%d and /dev/serno/<blah> based on
- * the data handed to it.  When opened, a virtual circuit is forged and
- * maintained to the block device server via DMSG.  Temporary failures
- * stall the device until successfully reconnected or explicitly destroyed.
- */
-static
-void
-hammer2_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
-{
-       struct service_node_opaque *info = *opaquep;
-
-       switch(op) {
-       case DMSG_NODEOP_ADD:
-               if (msg->any.lnk_span.peer_type != DMSG_PEER_BLOCK)
-                       break;
-               if (msg->any.lnk_span.pfs_type != DMSG_PFSTYPE_SERVER)
-                       break;
-               if (info == NULL) {
-                       info = malloc(sizeof(*info));
-                       bzero(info, sizeof(*info));
-                       *opaquep = info;
-               }
-               snprintf(info->cl_label, sizeof(info->cl_label),
-                        "%s", msg->any.lnk_span.cl_label);
-               snprintf(info->fs_label, sizeof(info->fs_label),
-                        "%s", msg->any.lnk_span.fs_label);
-               info->block = msg->any.lnk_span.media.block;
-               fprintf(stderr, "NODE ADD %s serno %s\n",
-                       info->cl_label, info->fs_label);
-               info->attached = 1;
-               xdisk_reconnect(info);
-               break;
-       case DMSG_NODEOP_DEL:
-               if (info) {
-                       fprintf(stderr, "NODE DEL %s serno %s\n",
-                               info->cl_label, info->fs_label);
-                       pthread_mutex_lock(&diskmtx);
-                       *opaquep = NULL;
-                       info->attached = 0;
-                       if (info->servicing == 0)
-                               free(info);
-                       else
-                               shutdown(info->servicefd, SHUT_RDWR);/*XXX*/
-                       pthread_mutex_unlock(&diskmtx);
-               }
-               break;
-       default:
-               break;
-       }
-}
-
 /*
  * Monitor block devices.  Currently polls every ~10 seconds or so.
  */
@@ -881,7 +830,6 @@ master_reconnect(const char *mntpt)
        info->fd = pipefds[1];
        info->detachme = 1;
        info->usrmsg_callback = hammer2_usrmsg_handler;
-       info->node_handler = hammer2_node_handler;
        info->label = strdup("hammer2");
        pthread_create(&thread, NULL, dmsg_master_service, info);
 }
@@ -963,7 +911,6 @@ disk_reconnect(const char *disk)
        info->fd = pipefds[1];
        info->detachme = 1;
        info->usrmsg_callback = hammer2_usrmsg_handler;
-       info->node_handler = hammer2_node_handler;
        info->exit_callback = disk_disconnect;
        info->handle = dc;
        info->label = strdup(dc->disk);
@@ -986,87 +933,60 @@ disk_disconnect(void *handle)
 }
 
 /*
- * [re]connect a remote disk service to the local system via /dev/xdisk.
+ * Connect our cluster controller to /dev/xdisk.  xdisk will pick up
+ * SPAN messages that we route to it, makes remote block devices
+ * available to the host, and can issue dmsg transactions based on
+ * device requests.
  */
 static
 void
-xdisk_reconnect(struct service_node_opaque *xdisk)
+xdisk_connect(void)
 {
-       struct xdisk_attach_ioctl *xaioc;
        dmsg_master_service_info_t *info;
+       struct xdisk_attach_ioctl xaioc;
        pthread_t thread;
        int pipefds[2];
+       int xfd;
+       int error;
+
+       /*
+        * Is /dev/xdisk available?
+        */
+       xfd = open("/dev/xdisk", O_RDWR, 0600);
+       if (xfd < 0) {
+               fprintf(stderr, "xdisk_connect: Unable to open /dev/xdisk\n");
+               return;
+       }
 
        if (pipe(pipefds) < 0) {
-               fprintf(stderr, "reconnect %s: pipe() failed\n",
-                       xdisk->cl_label);
+               fprintf(stderr, "xdisk_connect: pipe() failed\n");
                return;
        }
 
+       /*
+        * Pipe between cluster controller (this user process).
+        */
        info = malloc(sizeof(*info));
        bzero(info, sizeof(*info));
        info->fd = pipefds[1];
        info->detachme = 1;
        info->usrmsg_callback = hammer2_usrmsg_handler;
-       info->node_handler = hammer2_node_handler;
-       info->exit_callback = xdisk_disconnect;
-       info->handle = xdisk;
-       xdisk->servicing = 1;
-       xdisk->servicefd = info->fd;
-       info->label = strdup(xdisk->cl_label);
+       info->exit_callback = NULL;
        pthread_create(&thread, NULL, dmsg_master_service, info);
 
        /*
-        * We have to run the attach in its own pthread because it will
-        * synchronously interact with the messaging subsystem over the
-        * pipe.  If we do it here we will deadlock.
+        * And the xdisk device.
         */
-       xaioc = malloc(sizeof(*xaioc));
-       bzero(xaioc, sizeof(*xaioc));
-       snprintf(xaioc->cl_label, sizeof(xaioc->cl_label),
-                "%s", xdisk->cl_label);
-       snprintf(xaioc->fs_label, sizeof(xaioc->fs_label),
-                "X-%s", xdisk->fs_label);
-       xaioc->bytes = xdisk->block.bytes;
-       xaioc->blksize = xdisk->block.blksize;
-       xaioc->fd = pipefds[0];
-
-       pthread_create(&thread, NULL, xdisk_attach_tmpthread, xaioc);
-}
-
-static
-void *
-xdisk_attach_tmpthread(void *data)
-{
-       struct xdisk_attach_ioctl *xaioc = data;
-       int fd;
-
-       pthread_detach(pthread_self());
+       bzero(&xaioc, sizeof(xaioc));
+       xaioc.fd = pipefds[0];
+       error = ioctl(xfd, XDISKIOCATTACH, &xaioc);
+       close(pipefds[0]);
+       close(xfd);
 
-       fd = open("/dev/xdisk", O_RDWR, 0600);
-       if (fd < 0) {
-               fprintf(stderr, "xdisk_reconnect: Unable to open /dev/xdisk\n");
-       }
-       if (ioctl(fd, XDISKIOCATTACH, xaioc) < 0) {
-               fprintf(stderr, "reconnect %s: xdisk attach failed\n",
-                       xaioc->cl_label);
+       if (error < 0) {
+               fprintf(stderr,
+                       "xdisk_connect: cannot attach %s\n",
+                       strerror(errno));
+               return;
        }
-       close(xaioc->fd);
-       close(fd);
-       return (NULL);
-}
-
-static
-void
-xdisk_disconnect(void *handle)
-{
-       struct service_node_opaque *info = handle;
-
-       assert(info->servicing == 1);
-
-       pthread_mutex_lock(&diskmtx);
-       info->servicing = 0;
-       if (info->attached == 0)
-               free(info);
-       pthread_mutex_unlock(&diskmtx);
 }
index a04c9b3..ebdef0a 100644 (file)
@@ -53,6 +53,7 @@
 #include <dirent.h>
 
 #include <netinet/in.h>
+#include <netinet/ip.h>
 #include <netinet/tcp.h>
 #include <arpa/inet.h>
 #include <netdb.h>
index b374032..1e171ce 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
+ * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
  *
  * This code is derived from software contributed to The DragonFly Project
  * by Matthew Dillon <dillon@dragonflybsd.org>
  *
  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
  * devices.  These devices look like raw disks to the system.
- *
- * TODO:
- *     Handle circuit disconnects, leave bio's pending
- *     Restart bio's on circuit reconnect.
  */
 #include <sys/param.h>
 #include <sys/systm.h>
@@ -60,6 +56,7 @@
 #include <sys/sysctl.h>
 #include <sys/proc.h>
 #include <sys/queue.h>
+#include <sys/tree.h>
 #include <sys/udev.h>
 #include <sys/uuid.h>
 #include <sys/kern_syscall.h>
 #include <sys/thread2.h>
 
 struct xa_softc;
+struct xa_softc_tree;
+RB_HEAD(xa_softc_tree, xa_softc);
+RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
 
+/*
+ * Track a BIO tag
+ */
 struct xa_tag {
        TAILQ_ENTRY(xa_tag) entry;
-       struct xa_softc *xa;
+       struct xa_softc *sc;
        dmsg_blk_error_t status;
        kdmsg_state_t   *state;
-       kdmsg_circuit_t *circ;
        struct bio      *bio;
-       int             running;        /* transaction running */
-       int             waitseq;        /* streaming reply */
-       int             done;           /* final (transaction closed) */
+       int             waiting;
+       int             async;
+       int             done;
 };
 
 typedef struct xa_tag  xa_tag_t;
 
+/*
+ * Track devices.
+ */
 struct xa_softc {
-       TAILQ_ENTRY(xa_softc) entry;
+       struct kdmsg_state_list spanq;
+       RB_ENTRY(xa_softc) rbnode;
        cdev_t          dev;
-       kdmsg_iocom_t   iocom;
-       struct xdisk_attach_ioctl xaioc;
        struct disk_info info;
        struct disk     disk;
        uuid_t          pfs_fsid;
        int             unit;
-       int             serializing;
-       int             attached;
        int             opencnt;
+       int             spancnt;
        uint64_t        keyid;
-       xa_tag_t        *opentag;
-       TAILQ_HEAD(, bio) bioq;
-       TAILQ_HEAD(, xa_tag) tag_freeq;
-       TAILQ_HEAD(, xa_tag) tag_pendq;
-       TAILQ_HEAD(, kdmsg_circuit) circq;
+       int             serializing;
+       int             last_error;
+       char            cl_label[64];   /* from LNK_SPAN cl_label (host/dev) */
+       char            fs_label[64];   /* from LNK_SPAN fs_label (serno str) */
+       xa_tag_t        *open_tag;
+       TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
+       TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
+       TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
        struct lwkt_token tok;
 };
 
 typedef struct xa_softc        xa_softc_t;
 
+struct xa_iocom {
+       TAILQ_ENTRY(xa_iocom) entry;
+       kdmsg_iocom_t   iocom;
+       xa_softc_t      dummysc;
+};
+
+typedef struct xa_iocom xa_iocom_t;
+
+static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
+RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
+static struct xa_softc_tree xa_device_tree;
+
 #define MAXTAGS                64      /* no real limit */
 
 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
-static void xa_exit(kdmsg_iocom_t *iocom);
-static void xa_terminate_check(struct xa_softc *xa);
-static int xa_rcvdmsg(kdmsg_msg_t *msg);
-static void xa_autodmsg(kdmsg_msg_t *msg);
-
-static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio);
-static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg);
-static uint32_t xa_wait(xa_tag_t *tag, int seq);
+static void xaio_exit(kdmsg_iocom_t *iocom);
+static int xaio_rcvdmsg(kdmsg_msg_t *msg);
+
+static void xa_terminate_check(struct xa_softc *sc);
+
+static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
+static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
 static void xa_done(xa_tag_t *tag, int wasbio);
+static void xa_release(xa_tag_t *tag, int wasbio);
+static uint32_t xa_wait(xa_tag_t *tag);
 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
-static void xa_restart_deferred(xa_softc_t *xa);
+static void xa_restart_deferred(xa_softc_t *sc);
 
 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
 
@@ -165,7 +184,7 @@ static struct dev_ops xa_ops = {
 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
 static int xdisk_opencount;
 static cdev_t xdisk_dev;
-static TAILQ_HEAD(, xa_softc) xa_queue;
+static TAILQ_HEAD(, xa_iocom) xaiocomq;
 
 /*
  * Module initialization
@@ -175,13 +194,14 @@ xdisk_modevent(module_t mod, int type, void *data)
 {
        switch (type) {
        case MOD_LOAD:
-               TAILQ_INIT(&xa_queue);
+               TAILQ_INIT(&xaiocomq);
+               RB_INIT(&xa_device_tree);
                xdisk_dev = make_dev(&xdisk_ops, 0,
                                     UID_ROOT, GID_WHEEL, 0600, "xdisk");
                break;
        case MOD_UNLOAD:
        case MOD_SHUTDOWN:
-               if (xdisk_opencount || TAILQ_FIRST(&xa_queue))
+               if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
                        return (EBUSY);
                if (xdisk_dev) {
                        destroy_dev(xdisk_dev);
@@ -198,6 +218,12 @@ xdisk_modevent(module_t mod, int type, void *data)
 
 DEV_MODULE(xdisk, xdisk_modevent, 0);
 
+static int
+xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
+{
+       return(bcmp(sc1->fs_label, sc2->fs_label, sizeof(sc1->fs_label)));
+}
+
 /*
  * Control device
  */
@@ -245,26 +271,17 @@ xdisk_ioctl(struct dev_ioctl_args *ap)
 static int
 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
 {
-       xa_softc_t *xa;
-       xa_tag_t *tag;
+       xa_iocom_t *xaio;
        struct file *fp;
-       int unit;
-       int n;
-       char devname[64];
-       cdev_t dev;
 
        /*
         * Normalize ioctl params
         */
+       kprintf("xdisk_attach1\n");
        fp = holdfp(curproc->p_fd, xaioc->fd, -1);
        if (fp == NULL)
                return EINVAL;
-       if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0)
-               return EINVAL;
-       if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0)
-               return EINVAL;
-       if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE)
-               return EINVAL;
+       kprintf("xdisk_attach2\n");
 
        /*
         * See if the serial number is already present.  If we are
@@ -273,164 +290,48 @@ xdisk_attach(struct xdisk_attach_ioctl *xaioc)
         * retry.
         */
        lwkt_gettoken(&xdisk_token);
-again:
-       TAILQ_FOREACH(xa, &xa_queue, entry) {
-               if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
-                          xaioc->fs_label) == 0) {
-                       if (xa->serializing) {
-                               tsleep(xa, 0, "xadelay", hz / 10);
-                               goto again;
-                       }
-                       xa->serializing = 1;
-                       kdmsg_iocom_uninit(&xa->iocom);
-                       break;
-               }
-       }
 
-       /*
-        * Create a new xa if not already present
-        */
-       if (xa == NULL) {
-               unit = 0;
-               for (;;) {
-                       TAILQ_FOREACH(xa, &xa_queue, entry) {
-                               if (xa->unit == unit)
-                                       break;
-                       }
-                       if (xa == NULL)
-                               break;
-                       ++unit;
-               }
-               xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
-               xa->unit = unit;
-               xa->serializing = 1;
-               lwkt_token_init(&xa->tok, "xa");
-               TAILQ_INIT(&xa->circq);
-               TAILQ_INIT(&xa->bioq);
-               TAILQ_INIT(&xa->tag_freeq);
-               TAILQ_INIT(&xa->tag_pendq);
-               for (n = 0; n < MAXTAGS; ++n) {
-                       tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO);
-                       tag->xa = xa;
-                       TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
-               }
-               TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
-       } else {
-               unit = xa->unit;
-       }
+       xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
+       kprintf("xdisk_attach3\n");
+       kdmsg_iocom_init(&xaio->iocom, xaio,
+                        KDMSG_IOCOMF_AUTOCONN,
+                        M_XDISK, xaio_rcvdmsg);
+       xaio->iocom.exit_func = xaio_exit;
 
-       /*
-        * (xa) is now serializing.
-        */
-       xa->xaioc = *xaioc;
-       xa->attached = 1;
-       lwkt_reltoken(&xdisk_token);
-
-       /*
-        * Create device
-        */
-       if (xa->dev == NULL) {
-               dev = disk_create(unit, &xa->disk, &xa_ops);
-               dev->si_drv1 = xa;
-               xa->dev = dev;
-       }
-
-       xa->info.d_media_blksize = xaioc->blksize;
-       xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize;
-       xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
-       xa->info.d_secpertrack = 32;
-       xa->info.d_nheads = 64;
-       xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads;
-       xa->info.d_ncylinders = 0;
-       if (xa->xaioc.fs_label[0])
-               xa->info.d_serialno = xa->xaioc.fs_label;
-
-       /*
-        * Set up messaging connection
-        */
-       ksnprintf(devname, sizeof(devname), "xa%d", unit);
-       kdmsg_iocom_init(&xa->iocom, xa,
-                        KDMSG_IOCOMF_AUTOCONN |
-                        KDMSG_IOCOMF_AUTORXSPAN |
-                        KDMSG_IOCOMF_AUTOTXSPAN |
-                        KDMSG_IOCOMF_AUTORXCIRC |
-                        KDMSG_IOCOMF_AUTOTXCIRC,
-                        M_XDISK, xa_rcvdmsg);
-       xa->iocom.exit_func = xa_exit;
-
-       kdmsg_iocom_reconnect(&xa->iocom, fp, devname);
+       kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
 
        /*
         * Setup our LNK_CONN advertisement for autoinitiate.
         *
         * Our filter is setup to only accept PEER_BLOCK/SERVER
         * advertisements.
-        */
-       xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
-       xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
-       xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
-       xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
-       xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
-       ksnprintf(xa->iocom.auto_lnk_conn.cl_label,
-                 sizeof(xa->iocom.auto_lnk_conn.cl_label),
-                 "%s", xaioc->cl_label);
-
-       /*
+        *
         * We need a unique pfs_fsid to avoid confusion.
-        * We supply a rendezvous fs_label using the serial number.
         */
-       kern_uuidgen(&xa->pfs_fsid, 1);
-       xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid;
-       ksnprintf(xa->iocom.auto_lnk_conn.fs_label,
-                 sizeof(xa->iocom.auto_lnk_conn.fs_label),
-                 "%s", xaioc->fs_label);
+       xaio->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
+       xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
+       xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
+       xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
+       xaio->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
+       ksnprintf(xaio->iocom.auto_lnk_conn.fs_label,
+                 sizeof(xaio->iocom.auto_lnk_conn.fs_label),
+                 "xdisk");
+       kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1);
 
        /*
         * Setup our LNK_SPAN advertisement for autoinitiate
         */
-       xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT;
-       xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
-       xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
-       ksnprintf(xa->iocom.auto_lnk_span.cl_label,
-                 sizeof(xa->iocom.auto_lnk_span.cl_label),
-                 "%s", xa->xaioc.cl_label);
-
-       kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg);
-       disk_setdiskinfo_sync(&xa->disk, &xa->info);
-
-       lwkt_gettoken(&xdisk_token);
-       xa->serializing = 0;
-       xa_terminate_check(xa);
+       TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
+       kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
        lwkt_reltoken(&xdisk_token);
 
-       return(0);
+       return 0;
 }
 
 static int
 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
 {
-       struct xa_softc *xa;
-
-       lwkt_gettoken(&xdisk_token);
-       for (;;) {
-               TAILQ_FOREACH(xa, &xa_queue, entry) {
-                       if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
-                                  xaioc->fs_label) == 0) {
-                               break;
-                       }
-               }
-               if (xa == NULL || xa->serializing == 0) {
-                       xa->serializing = 1;
-                       break;
-               }
-               tsleep(xa, 0, "xadet", hz / 10);
-       }
-       if (xa) {
-               kdmsg_iocom_uninit(&xa->iocom);
-               xa->serializing = 0;
-       }
-       lwkt_reltoken(&xdisk_token);
-       return(0);
+       return EINVAL;
 }
 
 /*
@@ -438,170 +339,165 @@ xdisk_detach(struct xdisk_attach_ioctl *xaioc)
  */
 static
 void
-xa_exit(kdmsg_iocom_t *iocom)
+xaio_exit(kdmsg_iocom_t *iocom)
 {
-       struct xa_softc *xa = iocom->handle;
+       xa_iocom_t *xaio = iocom->handle;
 
-       lwkt_gettoken(&xa->tok);
+       kprintf("xdisk_detach -xaio_exit\n");
        lwkt_gettoken(&xdisk_token);
-
-       /*
-        * We must wait for any I/O's to complete to ensure that all
-        * state structure references are cleaned up before returning.
-        */
-       xa->attached = -1;      /* force deferral or failure */
-       while (TAILQ_FIRST(&xa->tag_pendq)) {
-               tsleep(xa, 0, "xabiow", hz / 10);
-       }
-
-       /*
-        * All serializing code checks for de-initialization so only
-        * do it if we aren't already serializing.
-        */
-       if (xa->serializing == 0) {
-               xa->serializing = 1;
-               kdmsg_iocom_uninit(iocom);
-               xa->serializing = 0;
-       }
-
-       /*
-        * If the drive is not in use and no longer attach it can be
-        * destroyed.
-        */
-       xa->attached = 0;
-       xa_terminate_check(xa);
+       TAILQ_REMOVE(&xaiocomq, xaio, entry);
        lwkt_reltoken(&xdisk_token);
-       lwkt_reltoken(&xa->tok);
-}
-
-/*
- * Determine if we can destroy the xa_softc.
- *
- * Called with xdisk_token held.
- */
-static
-void
-xa_terminate_check(struct xa_softc *xa)
-{
-       xa_tag_t *tag;
-       struct bio *bio;
 
-       if (xa->opencnt || xa->attached || xa->serializing)
-               return;
-       xa->serializing = 1;
-       kdmsg_iocom_uninit(&xa->iocom);
-
-       /*
-        * When destroying an xa make sure all pending I/O (typically
-        * from the disk probe) is done.
-        *
-        * XXX what about new I/O initiated prior to disk_destroy().
-        */
-       while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
-               TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
-               if ((bio = tag->bio) != NULL) {
-                       tag->bio = NULL;
-                       bio->bio_buf->b_error = ENXIO;
-                       bio->bio_buf->b_flags |= B_ERROR;
-                       biodone(bio);
-               }
-               TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
-       }
-       if (xa->dev) {
-               disk_destroy(&xa->disk);
-               xa->dev->si_drv1 = NULL;
-               xa->dev = NULL;
-       }
-       KKASSERT(xa->opencnt == 0 && xa->attached == 0);
-       while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
-               TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
-               tag->xa = NULL;
-               kfree(tag, M_XDISK);
-       }
-       KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
-       TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
-       kfree(xa, M_XDISK);
+       kfree(xaio, M_XDISK);
 }
 
 /*
- * Shim to catch and record virtual circuit events.
+ * Called from iocom core to handle messages that the iocom core does not
+ * handle itself and for which a state function callback has not yet been
+ * established.
+ *
+ * We primarily care about LNK_SPAN transactions here.
  */
-static void
-xa_autodmsg(kdmsg_msg_t *msg)
+static int
+xaio_rcvdmsg(kdmsg_msg_t *msg)
 {
-       xa_softc_t *xa = msg->iocom->handle;
+       kdmsg_state_t   *state = msg->state;
+       xa_iocom_t      *xaio = state->iocom->handle;
+       xa_softc_t      *sc;
 
-       kdmsg_circuit_t *circ;
-       kdmsg_circuit_t *cscan;
-       uint32_t xcmd;
-
-       /*
-        * Because this is just a shim we don't have a state callback for
-        * the transactions we are sniffing, so make things easier by
-        * calculating the original command along with the current message's
-        * flags.  This is because transactions are made up of numerous
-        * messages and only the first typically specifies the actual command.
-        */
-       if (msg->state) {
-               xcmd = msg->state->icmd |
-                      (msg->any.head.cmd & (DMSGF_CREATE |
-                                            DMSGF_DELETE |
-                                            DMSGF_REPLY));
-       } else {
-               xcmd = msg->any.head.cmd;
-       }
+       kprintf("xdisk_rcvdmsg %08x\n", msg->any.head.cmd);
+       lwkt_gettoken(&xdisk_token);
 
-       /*
-        * Add or remove a circuit, sorted by weight (lower numbers are
-        * better).
-        */
-       switch(xcmd) {
-       case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
+       switch(msg->tcmd) {
+       case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
                /*
-                * Track established circuits
+                * A LNK_SPAN transaction which is opened and closed
+                * degenerately is not useful to us, just ignore it.
                 */
-               circ = msg->state->any.circ;
-               lwkt_gettoken(&xa->tok);
-               if (circ->recorded == 0) {
-                       TAILQ_FOREACH(cscan, &xa->circq, entry) {
-                               if (circ->weight < cscan->weight)
-                                       break;
+               kdmsg_msg_reply(msg, 0);
+               break;
+       case DMSG_LNK_SPAN | DMSGF_CREATE:
+               /*
+                * Manage the tracking node for the remote LNK_SPAN.
+                *
+                * Return a streaming result, leaving the transaction open
+                * in both directions to allow sub-transactions.
+                */
+               bcopy(msg->any.lnk_span.cl_label, xaio->dummysc.cl_label,
+                     sizeof(xaio->dummysc.cl_label));
+               xaio->dummysc.cl_label[sizeof(xaio->dummysc.cl_label) - 1] = 0;
+
+               bcopy(msg->any.lnk_span.fs_label, xaio->dummysc.fs_label,
+                     sizeof(xaio->dummysc.fs_label));
+               xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
+
+               kprintf("xdisk: %s LNK_SPAN create\n",
+                       msg->any.lnk_span.fs_label);
+
+               sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
+               if (sc == NULL) {
+                       xa_softc_t *sctmp;
+                       xa_tag_t *tag;
+                       cdev_t dev;
+                       int unit;
+                       int n;
+
+                       sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
+                       bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
+                             sizeof(sc->cl_label));
+                       sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
+                       bcopy(msg->any.lnk_span.fs_label, sc->fs_label,
+                             sizeof(sc->fs_label));
+                       sc->fs_label[sizeof(sc->fs_label) - 1] = 0;
+
+                       /* XXX FIXME O(N^2) */
+                       unit = -1;
+                       do {
+                               ++unit;
+                               RB_FOREACH(sctmp, xa_softc_tree,
+                                          &xa_device_tree) {
+                                       if (sctmp->unit == unit)
+                                               break;
+                               }
+                       } while (sctmp);
+
+                       sc->unit = unit;
+                       sc->serializing = 1;
+                       sc->spancnt = 1;
+                       lwkt_token_init(&sc->tok, "xa");
+                       TAILQ_INIT(&sc->spanq);
+                       TAILQ_INIT(&sc->bioq);
+                       TAILQ_INIT(&sc->tag_freeq);
+                       TAILQ_INIT(&sc->tag_pendq);
+                       RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
+                       TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
+                       msg->state->any.xa_sc = sc;
+
+                       /*
+                        * Setup block device
+                        */
+                       for (n = 0; n < MAXTAGS; ++n) {
+                               tag = kmalloc(sizeof(*tag),
+                                             M_XDISK, M_WAITOK|M_ZERO);
+                               tag->sc = sc;
+                               TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
+                       }
+
+                       if (sc->dev == NULL) {
+                               dev = disk_create(unit, &sc->disk, &xa_ops);
+                               dev->si_drv1 = sc;
+                               sc->dev = dev;
                        }
-                       if (cscan)
-                               TAILQ_INSERT_BEFORE(cscan, circ, entry);
-                       else
-                               TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
-                       circ->recorded = 1;
-               }
 
+                       sc->info.d_media_blksize =
+                               msg->any.lnk_span.media.block.blksize;
+                       if (sc->info.d_media_blksize <= 0)
+                               sc->info.d_media_blksize = 1;
+                       sc->info.d_media_blocks =
+                               msg->any.lnk_span.media.block.bytes /
+                               sc->info.d_media_blksize;
+                       sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
+                       sc->info.d_secpertrack = 32;
+                       sc->info.d_nheads = 64;
+                       sc->info.d_secpercyl = sc->info.d_secpertrack *
+                                              sc->info.d_nheads;
+                       sc->info.d_ncylinders = 0;
+                       if (sc->fs_label[0])
+                               sc->info.d_serialno = sc->fs_label;
+                       disk_setdiskinfo_sync(&sc->disk, &sc->info);
+                       xa_restart_deferred(sc);        /* eats serializing */
+               } else {
+                       ++sc->spancnt;
+                       TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
+                       msg->state->any.xa_sc = sc;
+                       if (sc->serializing == 0 && sc->open_tag == NULL) {
+                               sc->serializing = 1;
+                               xa_restart_deferred(sc); /* eats serializing */
+                       }
+               }
+               kdmsg_msg_result(msg, 0);
+               break;
+       case DMSG_LNK_SPAN | DMSGF_DELETE:
+       case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
                /*
-                * Restart any deferred I/O.
+                * Manage the tracking node for the remote LNK_SPAN.
+                *
+                * Return a final result, closing our end of the transaction.
                 */
-               xa_restart_deferred(xa);
-               lwkt_reltoken(&xa->tok);
+               sc = msg->state->any.xa_sc;
+               kprintf("xdisk: %s LNK_SPAN terminate\n", sc->fs_label);
+               msg->state->any.xa_sc = NULL;
+               TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
+               --sc->spancnt;
+               xa_terminate_check(sc);
+               kdmsg_msg_reply(msg, 0);
                break;
-       case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
+       case DMSG_LNK_SPAN | DMSGF_REPLY:
                /*
-                * Losing virtual circuit.  Remove the circ from contention.
+                * Ignore unimplemented streaming replies on our LNK_SPAN
+                * transaction.
                 */
-               circ = msg->state->any.circ;
-               lwkt_gettoken(&xa->tok);
-               if (circ->recorded) {
-                       TAILQ_REMOVE(&xa->circq, circ, entry);
-                       circ->recorded = 0;
-               }
-               xa_restart_deferred(xa);
-               lwkt_reltoken(&xa->tok);
-               break;
-       default:
                break;
-       }
-}
-
-static int
-xa_rcvdmsg(kdmsg_msg_t *msg)
-{
-       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_DBG_SHELL:
                /*
                 * Execute shell command (not supported atm).
@@ -614,24 +510,26 @@ xa_rcvdmsg(kdmsg_msg_t *msg)
                break;
        case DMSG_DBG_SHELL | DMSGF_REPLY:
                /*
-                * Receive one or more replies to a shell command that we
-                * sent.
+                * Receive one or more replies to a shell command
+                * that we sent.  Just dump it to the console.
                 *
-                * This is a one-way packet but if not (e.g. if part of
-                * a streaming transaction), we will have already closed
-                * our end.
+                * This is a one-way packet but if not (e.g. if
+                * part of a streaming transaction), we will have
+                * already closed our end.
                 */
                if (msg->aux_data) {
                        msg->aux_data[msg->aux_size - 1] = 0;
-                       kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
+                       kprintf("xdisk: DEBUGMSG: %s\n",
+                               msg->aux_data);
                }
                break;
        default:
                /*
-                * Unsupported LNK message received.  We only need to
-                * reply if it's a transaction in order to close our end.
-                * Ignore any one-way messages are any further messages
-                * associated with the transaction.
+                * Unsupported one-way message, streaming message, or
+                * transaction.
+                *
+                * Terminate any unsupported transactions with an error
+                * and ignore any unsupported streaming messages.
                 *
                 * NOTE: This case also includes DMSG_LNK_ERROR messages
                 *       which might be one-way, replying to those would
@@ -641,9 +539,51 @@ xa_rcvdmsg(kdmsg_msg_t *msg)
                        kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
                break;
        }
-       return(0);
+       lwkt_reltoken(&xdisk_token);
+
+       return 0;
 }
 
+/*
+ * Determine if we can destroy the xa_softc.
+ *
+ * Called with xdisk_token held.
+ */
+static
+void
+xa_terminate_check(struct xa_softc *sc)
+{
+       xa_tag_t *tag;
+
+       /*
+        * Determine if we can destroy the softc.
+        */
+       kprintf("xdisk: terminate check xa%d (%d,%d,%d)\n",
+               sc->unit,
+               sc->opencnt, sc->serializing, sc->spancnt);
+
+       if (sc->opencnt || sc->serializing || sc->spancnt)
+               return;
+       sc->serializing = 1;
+       KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
+
+       RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
+
+       if (sc->dev) {
+               disk_destroy(&sc->disk);
+               sc->dev->si_drv1 = NULL;
+               sc->dev = NULL;
+       }
+       KKASSERT(sc->opencnt == 0);
+       KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
+
+       while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
+               TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
+               tag->sc = NULL;
+               kfree(tag, M_XDISK);
+       }
+       kfree(sc, M_XDISK);
+}
 
 /************************************************************************
  *                        XA DEVICE INTERFACE                          *
@@ -653,9 +593,7 @@ static int
 xa_open(struct dev_open_args *ap)
 {
        cdev_t dev = ap->a_head.a_dev;
-       xa_softc_t *xa;
-       xa_tag_t *tag;
-       kdmsg_msg_t *msg;
+       xa_softc_t *sc;
        int error;
 
        dev->si_bsize_phys = 512;
@@ -667,60 +605,52 @@ xa_open(struct dev_open_args *ap)
         */
        lwkt_gettoken(&xdisk_token);
 again:
-       xa = dev->si_drv1;
-       if (xa == NULL) {
+       sc = dev->si_drv1;
+       if (sc == NULL) {
                lwkt_reltoken(&xdisk_token);
                return ENXIO;   /* raced destruction */
        }
-       if (xa->serializing) {
-               tsleep(xa, 0, "xarace", hz / 10);
+       if (sc->serializing) {
+               tsleep(sc, 0, "xarace", hz / 10);
                goto again;
        }
-       if (xa->attached == 0) {
-               lwkt_reltoken(&xdisk_token);
-               return ENXIO;   /* raced destruction */
-       }
+       sc->serializing = 1;
 
        /*
         * Serialize initial open
         */
-       if (xa->opencnt++ > 0) {
+       if (sc->opencnt++ > 0) {
                lwkt_reltoken(&xdisk_token);
                return(0);
        }
-       xa->serializing = 1;
        lwkt_reltoken(&xdisk_token);
 
-       tag = xa_setup_cmd(xa, NULL);
-       if (tag == NULL) {
-               lwkt_gettoken(&xdisk_token);
-               KKASSERT(xa->opencnt > 0);
-               --xa->opencnt;
-               xa->serializing = 0;
-               xa_terminate_check(xa);
-               lwkt_reltoken(&xdisk_token);
-               return(ENXIO);
-       }
-       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
-                             DMSG_BLK_OPEN | DMSGF_CREATE,
-                             xa_sync_completion, tag);
-       msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
-       xa_start(tag, msg);
-       if (xa_wait(tag, 0) == 0) {
-               xa->keyid = tag->status.keyid;
-               xa->opentag = tag;      /* leave tag open */
-               xa->serializing = 0;
-               error = 0;
+       /*
+        * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
+        */
+       if (sc->open_tag == NULL) {
+               xa_restart_deferred(sc); /* eats serializing */
        } else {
-               xa_done(tag, 0);
-               lwkt_gettoken(&xdisk_token);
-               KKASSERT(xa->opencnt > 0);
-               --xa->opencnt;
-               xa->serializing = 0;
-               xa_terminate_check(xa);
-               lwkt_reltoken(&xdisk_token);
-               error = ENXIO;
+               sc->serializing = 0;
+               wakeup(sc);
        }
+
+       /*
+        * Wait for completion of the BLK_OPEN
+        */
+       lwkt_gettoken(&xdisk_token);
+       while (sc->serializing)
+               tsleep(sc, 0, "xaopen", hz);
+
+       error = sc->last_error;
+       if (error) {
+               KKASSERT(sc->opencnt > 0);
+               --sc->opencnt;
+               xa_terminate_check(sc);
+               sc = NULL;      /* sc may be invalid now */
+       }
+       lwkt_reltoken(&xdisk_token);
+
        return (error);
 }
 
@@ -728,27 +658,29 @@ static int
 xa_close(struct dev_close_args *ap)
 {
        cdev_t dev = ap->a_head.a_dev;
-       xa_softc_t *xa;
+       xa_softc_t *sc;
        xa_tag_t *tag;
 
-       xa = dev->si_drv1;
-       if (xa == NULL)
+       sc = dev->si_drv1;
+       if (sc == NULL)
                return ENXIO;   /* raced destruction */
+       lwkt_gettoken(&xdisk_token);
+       lwkt_gettoken(&sc->tok);
 
-       lwkt_gettoken(&xa->tok);
-       if ((tag = xa->opentag) != NULL) {
-               xa->opentag = NULL;
-               kdmsg_state_reply(tag->state, 0);
-               while (tag->done == 0)
-                       xa_wait(tag, tag->waitseq);
-               xa_done(tag, 0);
+       /*
+        * NOTE: Clearing open_tag allows a concurrent open to re-open
+        *       the device and prevents autonomous completion of the tag.
+        */
+       if (sc->opencnt == 1 && sc->open_tag) {
+               tag = sc->open_tag;
+               sc->open_tag = NULL;
+               kdmsg_state_reply(tag->state, 0);       /* close our side */
+               xa_wait(tag);                           /* wait on remote */
        }
-       lwkt_reltoken(&xa->tok);
-
-       lwkt_gettoken(&xdisk_token);
-       KKASSERT(xa->opencnt > 0);
-       --xa->opencnt;
-       xa_terminate_check(xa);
+       lwkt_reltoken(&sc->tok);
+       KKASSERT(sc->opencnt > 0);
+       --sc->opencnt;
+       xa_terminate_check(sc);
        lwkt_reltoken(&xdisk_token);
 
        return(0);
@@ -757,7 +689,7 @@ xa_close(struct dev_close_args *ap)
 static int
 xa_strategy(struct dev_strategy_args *ap)
 {
-       xa_softc_t *xa = ap->a_head.a_dev->si_drv1;
+       xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
        xa_tag_t *tag;
        struct bio *bio = ap->a_bio;
 
@@ -766,16 +698,16 @@ xa_strategy(struct dev_strategy_args *ap)
         * only if the device is not open.  That is, we allow the disk
         * probe code prior to mount to fail.
         */
-       if (xa->attached == 0 && xa->opencnt == 0) {
+       if (sc->opencnt == 0) {
                bio->bio_buf->b_error = ENXIO;
                bio->bio_buf->b_flags |= B_ERROR;
                biodone(bio);
                return(0);
        }
 
-       tag = xa_setup_cmd(xa, bio);
+       tag = xa_setup_cmd(sc, bio);
        if (tag)
-               xa_start(tag, NULL);
+               xa_start(tag, NULL, 1);
        return(0);
 }
 
@@ -788,11 +720,11 @@ xa_ioctl(struct dev_ioctl_args *ap)
 static int
 xa_size(struct dev_psize_args *ap)
 {
-       struct xa_softc *xa;
+       struct xa_softc *sc;
 
-       if ((xa = ap->a_head.a_dev->si_drv1) == NULL)
+       if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
                return (ENXIO);
-       ap->a_result = xa->info.d_media_blocks;
+       ap->a_result = sc->info.d_media_blocks;
        return (0);
 }
 
@@ -803,44 +735,38 @@ xa_size(struct dev_psize_args *ap)
  * Implement tag/msg setup and related functions.
  */
 static xa_tag_t *
-xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
+xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
 {
-       kdmsg_circuit_t *circ;
        xa_tag_t *tag;
 
        /*
         * Only get a tag if we have a valid virtual circuit to the server.
         */
-       lwkt_gettoken(&xa->tok);
-       TAILQ_FOREACH(circ, &xa->circq, entry) {
-               if (circ->lost == 0)
-                       break;
-       }
-       if (circ == NULL || xa->attached <= 0) {
-               tag = NULL;
-       } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
-               TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
+       lwkt_gettoken(&sc->tok);
+       if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
+               TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
                tag->bio = bio;
-               tag->circ = circ;
-               kdmsg_circ_hold(circ);
-               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
+               TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
        }
 
        /*
         * If we can't dispatch now and this is a bio, queue it for later.
         */
        if (tag == NULL && bio) {
-               TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act);
+               TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
        }
-       lwkt_reltoken(&xa->tok);
+       lwkt_reltoken(&sc->tok);
 
        return (tag);
 }
 
 static void
-xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
+xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
 {
-       xa_softc_t *xa = tag->xa;
+       xa_softc_t *sc = tag->sc;
+
+       tag->done = 0;
+       tag->async = async;
 
        if (msg == NULL) {
                struct bio *bio;
@@ -852,40 +778,40 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
 
                switch(bp->b_cmd) {
                case BUF_CMD_READ:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
+                       msg = kdmsg_msg_alloc(sc->open_tag->state,
                                              DMSG_BLK_READ |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
-                       msg->any.blk_read.keyid = xa->keyid;
+                       msg->any.blk_read.keyid = sc->keyid;
                        msg->any.blk_read.offset = bio->bio_offset;
                        msg->any.blk_read.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_WRITE:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
+                       msg = kdmsg_msg_alloc(sc->open_tag->state,
                                              DMSG_BLK_WRITE |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
-                       msg->any.blk_write.keyid = xa->keyid;
+                       msg->any.blk_write.keyid = sc->keyid;
                        msg->any.blk_write.offset = bio->bio_offset;
                        msg->any.blk_write.bytes = bp->b_bcount;
                        msg->aux_data = bp->b_data;
                        msg->aux_size = bp->b_bcount;
                        break;
                case BUF_CMD_FLUSH:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
+                       msg = kdmsg_msg_alloc(sc->open_tag->state,
                                              DMSG_BLK_FLUSH |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
-                       msg->any.blk_flush.keyid = xa->keyid;
+                       msg->any.blk_flush.keyid = sc->keyid;
                        msg->any.blk_flush.offset = bio->bio_offset;
                        msg->any.blk_flush.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_FREEBLKS:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
+                       msg = kdmsg_msg_alloc(sc->open_tag->state,
                                              DMSG_BLK_FREEBLKS |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
-                       msg->any.blk_freeblks.keyid = xa->keyid;
+                       msg->any.blk_freeblks.keyid = sc->keyid;
                        msg->any.blk_freeblks.offset = bio->bio_offset;
                        msg->any.blk_freeblks.bytes = bp->b_bcount;
                        break;
@@ -898,87 +824,145 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                }
        }
 
-       tag->done = 0;
-       tag->waitseq = 0;
        if (msg) {
                tag->state = msg->state;
                kdmsg_msg_write(msg);
        } else {
+               tag->status.head.error = DMSG_ERR_IO;
                xa_done(tag, 1);
        }
 }
 
 static uint32_t
-xa_wait(xa_tag_t *tag, int seq)
+xa_wait(xa_tag_t *tag)
 {
-       xa_softc_t *xa = tag->xa;
+       xa_softc_t *sc = tag->sc;
+       uint32_t error;
+
+       kprintf("xdisk: xa_wait  %p\n", tag);
 
-       lwkt_gettoken(&xa->tok);
-       while (tag->waitseq == seq)
+       lwkt_gettoken(&sc->tok);
+       tag->waiting = 1;
+       while (tag->done == 0)
                tsleep(tag, 0, "xawait", 0);
-       lwkt_reltoken(&xa->tok);
-       return (tag->status.head.error);
+       lwkt_reltoken(&sc->tok);
+       error = tag->status.head.error;
+       tag->waiting = 0;
+       xa_release(tag, 0);
+
+       return error;
 }
 
 static void
 xa_done(xa_tag_t *tag, int wasbio)
 {
-       xa_softc_t *xa = tag->xa;
-       struct bio *bio;
-
        KKASSERT(tag->bio == NULL);
-       tag->done = 1;
+
        tag->state = NULL;
+       tag->done = 1;
+       if (tag->waiting)
+               wakeup(tag);
+       if (tag->async)
+               xa_release(tag, wasbio);
+}
 
-       lwkt_gettoken(&xa->tok);
-       if (wasbio && (bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
-               TAILQ_REMOVE(&xa->bioq, bio, bio_act);
+static
+void
+xa_release(xa_tag_t *tag, int wasbio)
+{
+       xa_softc_t *sc = tag->sc;
+       struct bio *bio;
+
+       lwkt_gettoken(&sc->tok);
+       if (wasbio && (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
+               TAILQ_REMOVE(&sc->bioq, bio, bio_act);
                tag->bio = bio;
-               lwkt_reltoken(&xa->tok);
-               xa_start(tag, NULL);
+               lwkt_reltoken(&sc->tok);
+               xa_start(tag, NULL, 1);
        } else {
-               if (tag->circ) {
-                       kdmsg_circ_drop(tag->circ);
-                       tag->circ = NULL;
-               }
-               TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
-               TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
-               lwkt_reltoken(&xa->tok);
+               TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
+               TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
+               lwkt_reltoken(&sc->tok);
        }
 }
 
+/*
+ * Handle messages under the BLKOPEN transaction.
+ */
 static int
 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
 {
        xa_tag_t *tag = state->any.any;
-       xa_softc_t *xa = tag->xa;
+       xa_softc_t *sc = tag->sc;
+       struct bio *bio;
 
-       switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
-       case DMSG_LNK_ERROR | DMSGF_REPLY:
-               bzero(&tag->status, sizeof(tag->status));
-               tag->status.head = msg->any.head;
-               break;
-       case DMSG_BLK_ERROR | DMSGF_REPLY:
-               tag->status = msg->any.blk_error;
-               break;
+       /*
+        * If the tag has been cleaned out we already closed our side
+        * of the transaction and we are waiting for the other side to
+        * close.
+        */
+       if (tag == NULL) {
+               if (msg->any.head.cmd & DMSGF_CREATE)
+                       kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
+               return 0;
+       }
+
+       /*
+        * Validate the tag
+        */
+       lwkt_gettoken(&sc->tok);
+
+       /*
+        * Handle initial response to our open and restart any deferred
+        * BIOs on success.
+        *
+        * NOTE: DELETE may also be set.
+        */
+       if (msg->any.head.cmd & DMSGF_CREATE) {
+               switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
+               case DMSG_LNK_ERROR | DMSGF_REPLY:
+                       bzero(&tag->status, sizeof(tag->status));
+                       tag->status.head = msg->any.head;
+                       break;
+               case DMSG_BLK_ERROR | DMSGF_REPLY:
+                       tag->status = msg->any.blk_error;
+                       break;
+               }
+               sc->last_error = tag->status.head.error;
+               kprintf("xdisk: blk_open completion status %d\n",
+                       sc->last_error);
+               if (sc->last_error == 0) {
+                       while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
+                               tag = xa_setup_cmd(sc, NULL);
+                               if (tag == NULL)
+                                       break;
+                               TAILQ_REMOVE(&sc->bioq, bio, bio_act);
+                               tag->bio = bio;
+                               xa_start(tag, NULL, 1);
+                       }
+               }
+               sc->serializing = 0;
+               wakeup(sc);
        }
-       lwkt_gettoken(&xa->tok);
-       if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
-               if (xa->opentag == tag) {
-                       xa->opentag = NULL;     /* XXX */
-                       kdmsg_state_reply(tag->state, 0);
+
+       /*
+        * Handle unexpected termination (or lost comm channel) from other
+        * side.  Autonomous completion only if open_tag matches,
+        * otherwise another thread is probably waiting on the tag.
+        *
+        * (see xa_close() for other interactions)
+        */
+       if (msg->any.head.cmd & DMSGF_DELETE) {
+               kdmsg_state_reply(tag->state, 0);
+               if (sc->open_tag == tag) {
+                       sc->open_tag = NULL;
                        xa_done(tag, 0);
-                       lwkt_reltoken(&xa->tok);
-                       return(0);
                } else {
-                       tag->done = 1;
+                       tag->async = 0;
+                       xa_done(tag, 0);
                }
        }
-       ++tag->waitseq;
-       lwkt_reltoken(&xa->tok);
-
-       wakeup(tag);
-
+       lwkt_reltoken(&sc->tok);
        return (0);
 }
 
@@ -986,7 +970,7 @@ static int
 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
 {
        xa_tag_t *tag = state->any.any;
-       xa_softc_t *xa = tag->xa;
+       xa_softc_t *sc = tag->sc;
        struct bio *bio;
        struct buf *bp;
 
@@ -1021,7 +1005,7 @@ xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
         * circuit.
         */
        if (tag->status.head.error &&
-           (msg->any.head.cmd & DMSGF_DELETE) && xa->opencnt) {
+           (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
                if (tag->status.head.error == DMSG_ERR_LOSTLINK ||
                    tag->status.head.error == DMSG_ERR_CANTCIRC) {
                        goto handle_repend;
@@ -1085,50 +1069,89 @@ handle_done:
        /*
         * Handle the case where the transaction failed due to a
         * connectivity issue.  The tag is put away with wasbio=0
-        * and we restart the bio.
-        *
-        * Setting circ->lost causes xa_setup_cmd() to skip the circuit.
-        * Other circuits might still be live.  Once a circuit gets messed
-        * up it will (eventually) be deleted so we can simply leave (lost)
-        * set forever after.
+        * and we put the BIO back onto the bioq for a later restart.
         */
 handle_repend:
-       lwkt_gettoken(&xa->tok);
+       lwkt_gettoken(&sc->tok);
        kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
-       tag->circ->lost = 1;
        tag->bio = NULL;
        xa_done(tag, 0);
        if ((state->txcmd & DMSGF_DELETE) == 0)
                kdmsg_msg_reply(msg, 0);
 
        /*
-        * Restart or requeue the bio
+        * Requeue the bio
         */
-       tag = xa_setup_cmd(xa, bio);
-       if (tag)
-               xa_start(tag, NULL);
-       lwkt_reltoken(&xa->tok);
+       TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
+
+       lwkt_reltoken(&sc->tok);
        return (0);
 }
 
 /*
- * Restart as much deferred I/O as we can.
+ * Restart as much deferred I/O as we can.  The serializer is set and we
+ * eat it (clear it) when done.
  *
- * Called with xa->tok held
+ * Called with sc->tok held
  */
 static
 void
-xa_restart_deferred(xa_softc_t *xa)
+xa_restart_deferred(xa_softc_t *sc)
 {
-       struct bio *bio;
+       kdmsg_state_t *span;
+       kdmsg_msg_t *msg;
        xa_tag_t *tag;
+       int error;
 
-       while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
-               tag = xa_setup_cmd(xa, NULL);
-               if (tag == NULL)
-                       break;
-               TAILQ_REMOVE(&xa->bioq, bio, bio_act);
-               tag->bio = bio;
-               xa_start(tag, NULL);
+       KKASSERT(sc->serializing);
+
+       /*
+        * Determine if a restart is needed.
+        */
+       if (sc->opencnt == 0) {
+               /*
+                * Device is not open, nothing to do, eat serializing.
+                */
+               sc->serializing = 0;
+               wakeup(sc);
+       } else if (sc->open_tag == NULL) {
+               /*
+                * BLK_OPEN required before we can restart any BIOs.
+                * Select the best LNK_SPAN to issue the BLK_OPEN under.
+                *
+                * serializing interlocks waiting open()s.
+                */
+               error = 0;
+               TAILQ_FOREACH(span, &sc->spanq, user_entry) {
+                       if ((span->rxcmd & DMSGF_DELETE) == 0)
+                               break;
+               }
+               if (span == NULL)
+                       error = ENXIO;
+
+               if (error == 0) {
+                       tag = xa_setup_cmd(sc, NULL);
+                       if (tag == NULL)
+                               error = ENXIO;
+               }
+               if (error == 0) {
+                       kprintf("xdisk: BLK_OPEN\n");
+                       sc->open_tag = tag;
+                       msg = kdmsg_msg_alloc(span,
+                                             DMSG_BLK_OPEN |
+                                             DMSGF_CREATE,
+                                             xa_sync_completion, tag);
+                       msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
+                       xa_start(tag, msg, 0);
+               }
+               if (error) {
+                       sc->serializing = 0;
+                       wakeup(sc);
+               }
+               /* else leave serializing set until BLK_OPEN response */
+       } else {
+               /* nothing to do */
+               sc->serializing = 0;
+               wakeup(sc);
        }
 }
index 978c981..9faa95c 100644 (file)
@@ -517,14 +517,13 @@ cleanuprd:
         * Simulate received MSGF_DELETE's for any remaining states.
         * (For local masters).
         */
-cleanupwr:
        kdmsg_drain_msgq(iocom);
        RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
                if ((state->rxcmd & DMSGF_DELETE) == 0) {
                        lockmgr(&iocom->msglk, LK_RELEASE);
                        kdmsg_state_abort(state);
                        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-                       goto cleanupwr;
+                       goto cleanuprd;
                }
        }
 
@@ -934,7 +933,7 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
         * the outer-transaction command for any transaction-create or
         * transaction-delete, and the inner message command for any
         * non-transaction or inside-transaction command.  tcmd will be
-        * set to 0 for any messaging error condition.
+        * set to 0 if the message state is illegal.
         *
         * The two can be told apart because outer-transaction commands
         * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
index 4fc1f12..f8fc59b 100644 (file)
@@ -209,7 +209,6 @@ disk_rcvdmsg(kdmsg_msg_t *msg)
 
        switch(msg->state->rxcmd & DMSGF_CMDSWMASK) {
        case DMSG_BLK_OPEN:
-       case DMSG_BLK_CLOSE:
                disk_blk_open(dp, msg);
                break;
        case DMSG_BLK_READ:
@@ -266,6 +265,7 @@ disk_blk_open(struct disk *dp, kdmsg_msg_t *msg)
                                ++openst->openwr;
                }
        }
+#if 0
        if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_CLOSE &&
            openst) {
                fflags = 0;
@@ -287,6 +287,7 @@ disk_blk_open(struct disk *dp, kdmsg_msg_t *msg)
                                --openst->openwr;
                }
        }
+#endif
        if (msg->any.head.cmd & DMSGF_DELETE) {
                if (openst) {
                        while (openst->openrd && openst->openwr) {
index 5fc24bb..7f8e279 100644 (file)
@@ -278,6 +278,8 @@ typedef struct dmsg_hdr dmsg_hdr_t;
                                         DMSGF_CREATE | \
                                         DMSGF_DELETE)
 
+#define DMSGF_BASEFLAGS                (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)
+
 #define DMSG_PROTO_LNK         0x00000000U
 #define DMSG_PROTO_DBG         0x00100000U
 #define DMSG_PROTO_HM2         0x00200000U
@@ -361,6 +363,8 @@ typedef struct dmsg_hdr dmsg_hdr_t;
  */
 #define DMSG_LNK_CMD_HAMMER2_VOLCONF   0x20
 
+#define DMSG_LABEL_SIZE                128     /* fixed at 128, do not change */
+
 /*
  * LNK_AUTH - Authentication (often omitted)
  */
@@ -406,8 +410,8 @@ struct dmsg_lnk_conn {
        uint8_t         reserved02[8];
        uint32_t        reserved03[12];
        uint64_t        pfs_mask;       /* PFS mask for SPAN filtering */
-       char            cl_label[128];  /* cluster label (for PEER_BLOCK) */
-       char            fs_label[128];  /* PFS label (for PEER_HAMMER2) */
+       char            cl_label[DMSG_LABEL_SIZE]; /* cluster label */
+       char            fs_label[DMSG_LABEL_SIZE]; /* PFS label */
 };
 
 typedef struct dmsg_lnk_conn dmsg_lnk_conn_t;
@@ -506,8 +510,8 @@ struct dmsg_lnk_span {
         *       for PEER_BLOCK cl_label is typically host/device and
         *       fs_label is typically the serial number string.
         */
-       char            cl_label[128];  /* cluster label */
-       char            fs_label[128];  /* PFS label */
+       char            cl_label[DMSG_LABEL_SIZE]; /* cluster label */
+       char            fs_label[DMSG_LABEL_SIZE]; /* PFS label */
 };
 
 typedef struct dmsg_lnk_span dmsg_lnk_span_t;
@@ -561,7 +565,7 @@ typedef struct dmsg_dbg_shell dmsg_dbg_shell_t;
  *               within a BLK_OPEN transaction.  It may NOT initiate a
  *               transaction.  Note that a termination of the transaction
  *               (e.g. with LNK_ERROR or BLK_ERROR) closes all active OPENs
- *               for that transaction.
+ *               for that transaction.  XXX not well defined atm.
  *
  * BLK_READ    - Strategy read.  Not typically streaming.
  *
@@ -684,6 +688,7 @@ typedef union dmsg_any dmsg_any_t;
 #if defined(_KERNEL) || defined(_KERNEL_STRUCTURES)
 
 struct hammer2_mount;
+struct xa_softc;
 struct kdmsg_iocom;
 struct kdmsg_state;
 struct kdmsg_msg;
@@ -701,10 +706,13 @@ struct kdmsg_msg;
  * transaction might represent a cache state (and thus have a chain
  * association), or a VOP op, LNK_SPAN, or other things.
  */
+TAILQ_HEAD(kdmsg_state_list, kdmsg_state);
+
 struct kdmsg_state {
        RB_ENTRY(kdmsg_state) rbnode;           /* indexed by msgid */
-       TAILQ_HEAD(, kdmsg_state) subq;         /* active stacked states */
+       struct kdmsg_state_list subq;           /* active stacked states */
        TAILQ_ENTRY(kdmsg_state) entry;         /* on parent subq */
+       TAILQ_ENTRY(kdmsg_state) user_entry;    /* available to devices */
        struct kdmsg_iocom *iocom;
        struct kdmsg_state *parent;
        uint32_t        icmd;                   /* record cmd creating state */
@@ -718,6 +726,7 @@ struct kdmsg_state {
        union {
                void *any;
                struct hammer2_mount *hmp;
+               struct xa_softc *xa_sc;
        } any;
 };
 
index 678ce2a..4282da4 100644 (file)
  */
 struct xdisk_attach_ioctl {
        int     fd;
-       int     unit;
-       int64_t bytes;          /* size of disk in bytes */
-       int     blksize;
        int     unused01;
        int32_t reserved02[8];
-       char    cl_label[64];   /* LNK_SPAN cl_label match (typ host/dev) */
-       char    fs_label[64];   /* LNK_SPAN fs_label match (typ serialno) */
 };
 
 #define XDISKIOCATTACH _IOWR('X', 0, struct xdisk_attach_ioctl)