cluster - remote block device work
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 1 Dec 2012 10:24:49 +0000 (02:24 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sat, 1 Dec 2012 10:24:49 +0000 (02:24 -0800)
* xdisk(client) <-> diskiocom(server) now successfully does a full
  message transaction over a VC for xa_open().  Still lots to do.

* Fix I/O request deferrals since the open can occur before the VC is
  fully established.  This fixes a deadlock.

* Fix state->msg tracking in kern_dmsg.c.  Fixes a double-free.

sys/dev/disk/xdisk/xdisk.c
sys/kern/kern_dmsg.c

index 8aff164..95132b0 100644 (file)
  *
  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
  * devices.  These devices look like raw disks to the system.
+ *
+ * TODO:
+ *     Handle circuit disconnects, leave bio's pending
+ *     Restart bio's on circuit reconnect.
  */
 #include <sys/param.h>
 #include <sys/systm.h>
@@ -120,6 +124,7 @@ static uint32_t xa_wait(xa_tag_t *tag, int seq);
 static void xa_done(xa_tag_t *tag, int wasbio);
 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
+static void xa_restart_deferred(xa_softc_t *xa);
 
 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
 
@@ -521,6 +526,9 @@ xa_autodmsg(kdmsg_msg_t *msg)
         */
        switch(xcmd) {
        case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
+               /*
+                * Track established circuits
+                */
                kprintf("XA: Received autodmsg: CREATE+REPLY\n");
                circ = msg->state->any.circ;
                lwkt_gettoken(&xa->tok);
@@ -535,6 +543,11 @@ xa_autodmsg(kdmsg_msg_t *msg)
                                TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
                        circ->recorded = 1;
                }
+
+               /*
+                * Restart any deferred I/O.
+                */
+               xa_restart_deferred(xa);
                lwkt_reltoken(&xa->tok);
                break;
        case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
@@ -715,10 +728,12 @@ xa_strategy(struct dev_strategy_args *ap)
        xa_tag_t *tag;
        struct bio *bio = ap->a_bio;
 
+#if 0
        bio->bio_buf->b_error = ENXIO;
        bio->bio_buf->b_flags |= B_ERROR;
        biodone(bio);
        return(0);
+#endif
 
        tag = xa_setup_cmd(xa, bio);
        if (tag)
@@ -763,7 +778,6 @@ xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
                tag = NULL;
        } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
                TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
-               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
                tag->bio = bio;
                tag->circuit = circ->circ_state->msgid;
        }
@@ -843,7 +857,14 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
        tag->done = 0;
        tag->waitseq = 0;
        if (msg) {
+#if 0
+               lwkt_gettoken(&xa->tok);
+               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
+#endif
                tag->state = msg->state;
+#if 0
+               lwkt_reltoken(&xa->tok);
+#endif
                kdmsg_msg_write(msg);
        } else {
                xa_done(tag, 1);
@@ -868,7 +889,6 @@ xa_done(xa_tag_t *tag, int wasbio)
        xa_softc_t *xa = tag->xa;
        struct bio *bio;
 
-       KKASSERT(tag->msg == NULL);
        KKASSERT(tag->bio == NULL);
        tag->done = 1;
 
@@ -995,3 +1015,27 @@ handle_done:
                xa_done(tag, 1);
        return (0);
 }
+
+/*
+ * Restart as much deferred I/O as we can.
+ *
+ * Called with xa->tok held
+ */
+static
+void
+xa_restart_deferred(xa_softc_t *xa)
+{
+       struct bio *bio;
+       xa_tag_t *tag;
+
+       while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
+               tag = xa_setup_cmd(xa, NULL);
+               if (tag == NULL)
+                       break;
+               kprintf("xa: Restart BIO %p on %s\n",
+                       bio, xa->iocom.auto_lnk_conn.fs_label);
+               TAILQ_REMOVE(&xa->bioq, bio, bio_act);
+               tag->bio = bio;
+               xa_start(tag, NULL);
+       }
+}
index e8ea583..999b071 100644 (file)
 
 RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
 
-static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token);
 static void kdmsg_circ_free_check(kdmsg_circuit_t *circ);
-
 static void kdmsg_iocom_thread_rd(void *arg);
 static void kdmsg_iocom_thread_wr(void *arg);
 static int kdmsg_autorxmsg(kdmsg_msg_t *msg);
 static void kdmsg_autocirc(kdmsg_msg_t *msg);
 static int kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
 
+static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token);
+
 /*
  * Initialize the roll-up communications structure for a network
  * messaging session.  This function does not install the socket.
@@ -691,11 +691,6 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
        int error;
 
        /*
-        * XXX resolve msg->any.head.source and msg->any.head.target
-        *     into LNK_SPAN references.
-        */
-
-       /*
         * Make sure a state structure is ready to go in case we need a new
         * one.  This is the only routine which uses freerd_state so no
         * races are possible.
@@ -1100,8 +1095,6 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                state->rxcmd |= DMSGF_DELETE;
                if (state->txcmd & DMSGF_DELETE) {
-                       if (state->msg == msg)
-                               state->msg = NULL;
                        KKASSERT(state->flags & KDMSG_STATE_INSERTED);
                        if (state->rxcmd & DMSGF_REPLY) {
                                KKASSERT(msg->any.head.cmd &
@@ -1118,9 +1111,10 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
                        lockmgr(&iocom->msglk, LK_RELEASE);
                        kdmsg_state_free(state);
                } else {
+                       if (state->msg != msg)
+                               kdmsg_msg_free(msg);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                }
-               kdmsg_msg_free(msg);
        } else if (state->msg != msg) {
                kdmsg_msg_free(msg);
        }
@@ -1332,8 +1326,6 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
-                       if (state->msg == msg)
-                               state->msg = NULL;
                        KKASSERT(state->flags & KDMSG_STATE_INSERTED);
                        if (state->txcmd & DMSGF_REPLY) {
                                KKASSERT(msg->any.head.cmd &
@@ -1347,12 +1339,14 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                                          &iocom->statewr_tree, state);
                        }
                        state->flags &= ~KDMSG_STATE_INSERTED;
+                       msg->state = NULL;
                        lockmgr(&iocom->msglk, LK_RELEASE);
                        kdmsg_state_free(state);
                } else {
+                       if (state->msg != msg)
+                               kdmsg_msg_free(msg);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                }
-               kdmsg_msg_free(msg);
        } else if (state->msg != msg) {
                kdmsg_msg_free(msg);
        }