hammer2: More hammer2 msg cleanup, get reconnect ioctl working
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 11 Aug 2012 21:56:27 +0000 (14:56 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sat, 11 Aug 2012 21:56:27 +0000 (14:56 -0700)
* Fix state machine sequencing issues when shutting down a connection.  We
  have to properly simulate DELETE messages for any open transactional
  states and give the function callbacks time to do there stuff.

* Properly respond to DELETE messages for LNK_CONN and LNK_SPAN transactions.

* Stabilize the RECLUSTER ioctl.

sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_msg.c
sys/vfs/hammer2/hammer2_vfsops.c

index 3000763..595200d 100644 (file)
@@ -335,7 +335,7 @@ struct hammer2_pfsmount {
        thread_t                msgwr_td;       /* cluster thread */
        int                     msg_ctl;        /* wakeup flags */
        int                     msg_seq;        /* cluster msg sequence id */
-       uint32_t                msgid_iterator;
+       uint32_t                reserved01;
        struct lock             msglk;          /* lockmgr lock */
        TAILQ_HEAD(, hammer2_msg) msgq;         /* transmit queue */
        struct hammer2_state    *conn_state;    /* active LNK_CONN state */
@@ -352,7 +352,9 @@ typedef struct hammer2_pfsmount hammer2_pfsmount_t;
  * msg_ctl flags (atomic)
  */
 #define HAMMER2_CLUSTERCTL_KILL                0x00000001
-#define HAMMER2_CLUSTERCTL_SLEEPING    0x00000002 /* interlocked w/msglk */
+#define HAMMER2_CLUSTERCTL_KILLRX      0x00000002 /* staged helper exit */
+#define HAMMER2_CLUSTERCTL_KILLTX      0x00000004 /* staged helper exit */
+#define HAMMER2_CLUSTERCTL_SLEEPING    0x00000008 /* interlocked w/msglk */
 
 /*
  * Transactional state structure, representing an open transaction.  The
@@ -379,6 +381,7 @@ struct hammer2_state {
 
 #define HAMMER2_STATE_INSERTED 0x0001
 #define HAMMER2_STATE_DYNAMIC  0x0002
+#define HAMMER2_STATE_DELPEND  0x0004          /* transmit delete pending */
 
 struct hammer2_msg {
        TAILQ_ENTRY(hammer2_msg) qentry;        /* serialized queue */
index 272fa5a..a7619e3 100644 (file)
@@ -577,6 +577,7 @@ hammer2_state_free(hammer2_state_t *state)
        hammer2_pfsmount_t *pmp = state->pmp;
        hammer2_msg_t *msg;
 
+       KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
        msg = state->msg;
        state->msg = NULL;
        kfree(state, pmp->mmsg);
@@ -626,6 +627,7 @@ hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
                        panic("duplicate msgid allocated");
+               state->flags |= HAMMER2_STATE_INSERTED;
                msg->any.head.msgid = state->msgid;
                lockmgr(&pmp->msglk, LK_RELEASE);
        }
@@ -753,10 +755,8 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
         * doing anything.
         */
        if (state) {
-               if (state->txcmd & HAMMER2_MSGF_DELETE) {
-                       hammer2_msg_free(msg);
+               if (state->txcmd & HAMMER2_MSGF_DELETE)
                        return;
-               }
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        cmd |= HAMMER2_MSGF_CREATE;
                if (state->txcmd & HAMMER2_MSGF_REPLY)
@@ -766,6 +766,7 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
                if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
+       kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
 
        /* XXX messy mask cmd to avoid allocating state */
        nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
@@ -804,10 +805,8 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
         * doing anything.
         */
        if (state) {
-               if (state->txcmd & HAMMER2_MSGF_DELETE) {
-                       hammer2_msg_free(msg);
+               if (state->txcmd & HAMMER2_MSGF_DELETE)
                        return;
-               }
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        cmd |= HAMMER2_MSGF_CREATE;
                if (state->txcmd & HAMMER2_MSGF_REPLY)
index 7ebe6a5..21c8652 100644 (file)
@@ -142,6 +142,7 @@ static void hammer2_cluster_thread_wr(void *arg);
 static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
 static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
 static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg);
+static void hammer2_drain_msgq(hammer2_pfsmount_t *pmp);
 
 /*
  * HAMMER2 vfs operations.
@@ -1013,13 +1014,30 @@ hammer2_install_volume_header(hammer2_mount_t *hmp)
 void
 hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp)
 {
+       /*
+        * Destroy the current connection
+        */
        atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
        while (pmp->msgrd_td || pmp->msgwr_td) {
               wakeup(&pmp->msg_ctl);
               tsleep(pmp, 0, "clstrkl", hz);
        }
-       atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+
+       /*
+        * Drop communications descriptor
+        */
+       if (pmp->msg_fp) {
+               fdrop(pmp->msg_fp);
+               pmp->msg_fp = NULL;
+       }
+       kprintf("RESTART CONNECTION\n");
+
+       /*
+        * Setup new communications descriptor
+        */
+       pmp->msg_ctl = 0;
        pmp->msg_fp = fp;
+       pmp->msg_seq = 0;
        lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td,
                    NULL, 0, -1, "hammer2-msgrd");
        lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td,
@@ -1160,33 +1178,33 @@ hammer2_cluster_thread_rd(void *arg)
        }
 
        /*
-        * XXX simulate MSGF_DELETEs
+        * Shutdown the socket before waiting for the transmit side.
+        *
+        * If we are dying due to e.g. a socket disconnect verses being
+        * killed explicity we have to set KILL in order to kick the tx
+        * side when it might not have any other work to do.  KILL might
+        * already be set if we are in an unmount or reconnect.
+        */
+       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
+
+       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+       wakeup(&pmp->msg_ctl);
+
+       /*
+        * Wait for the transmit side to drain remaining messages
+        * before cleaning up the rx state.  The transmit side will
+        * set KILLTX and wait for the rx side to completely finish
+        * (set msgrd_td to NULL) before cleaning up any remaining
+        * tx states.
         */
-       while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) {
-               kprintf("y");
-               if (state->func &&
-                   (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
-                   (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
-                       lockmgr(&pmp->msglk, LK_RELEASE);
-                       msg = hammer2_msg_alloc(&pmp->router,
-                                               HAMMER2_LNK_ERROR,
-                                               NULL, NULL);
-                       if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
-                               msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
-                       msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
-                       msg->state = state;
-                       msg->state->func(state, msg);
-                       hammer2_state_cleanuprx(msg);
-                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               } else {
-                       RB_REMOVE(hammer2_state_tree,
-                                 &pmp->staterd_tree, state);
-                       hammer2_state_free(state);
-               }
-       }
        lockmgr(&pmp->msglk, LK_RELEASE);
+       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLRX);
+       wakeup(&pmp->msg_ctl);
+       while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILLTX) == 0) {
+               wakeup(&pmp->msg_ctl);
+               tsleep(pmp, 0, "clstrkw", hz);
+       }
 
-       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
        pmp->msgrd_td = NULL;
        /* pmp can be ripped out from under us at this point */
        wakeup(pmp);
@@ -1203,6 +1221,7 @@ hammer2_cluster_thread_wr(void *arg)
        ssize_t res;
        size_t name_len;
        int error = 0;
+       int retries = 20;
 
        /*
         * Open a LNK_CONN transaction indicating that we want to take part
@@ -1265,6 +1284,7 @@ hammer2_cluster_thread_wr(void *arg)
                                continue;
                        }
                        if (error) {
+                               hammer2_msg_free(msg);
                                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                                break;
                        }
@@ -1308,25 +1328,32 @@ hammer2_cluster_thread_wr(void *arg)
                hammer2_msg_free(msg);
        }
 
-       while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
-               TAILQ_REMOVE(&pmp->msgq, msg, qentry);
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
-               hammer2_msg_free(msg);
-       }
+       /*
+        * Shutdown the socket.  This will cause the rx thread to get an
+        * EOF and ensure that both threads get to a termination state.
+        */
+       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
 
-       if ((state = pmp->freewr_state) != NULL) {
-               pmp->freewr_state = NULL;
-               hammer2_state_free(state);
+       /*
+        * Set KILLTX (which the rx side waits for), then wait for the RX
+        * side to completely finish before we clean out any remaining
+        * command states.
+        */
+       lockmgr(&pmp->msglk, LK_RELEASE);
+       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLTX);
+       wakeup(&pmp->msg_ctl);
+       while (pmp->msgrd_td) {
+               wakeup(&pmp->msg_ctl);
+               tsleep(pmp, 0, "clstrkw", hz);
        }
+       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
 
        /*
-        * XXX simulate MSGF_DELETEs
+        * Simulate received MSGF_DELETE's for any remaining states.
         */
-       while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) {
-               kprintf("x");
+cleanuprd:
+       RB_FOREACH(state, hammer2_state_tree, &pmp->staterd_tree) {
                if (state->func &&
-                   (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
                    (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
                        lockmgr(&pmp->msglk, LK_RELEASE);
                        msg = hammer2_msg_alloc(&pmp->router,
@@ -1336,30 +1363,118 @@ hammer2_cluster_thread_wr(void *arg)
                                msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
                        msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
                        msg->state = state;
+                       state->rxcmd = msg->any.head.cmd &
+                                      ~HAMMER2_MSGF_DELETE;
                        msg->state->func(state, msg);
                        hammer2_state_cleanuprx(msg);
                        lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               } else {
+                       goto cleanuprd;
+               }
+               if (state->func == NULL) {
+                       state->flags &= ~HAMMER2_STATE_INSERTED;
+                       RB_REMOVE(hammer2_state_tree,
+                                 &pmp->staterd_tree, state);
+                       hammer2_state_free(state);
+                       goto cleanuprd;
+               }
+       }
+
+       /*
+        * NOTE: We have to drain the msgq to handle situations
+        *       where received states have built up output
+        *       messages, to avoid creating messages with
+        *       duplicate CREATE/DELETE flags.
+        */
+cleanupwr:
+       hammer2_drain_msgq(pmp);
+       RB_FOREACH(state, hammer2_state_tree, &pmp->statewr_tree) {
+               if (state->func &&
+                   (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       msg = hammer2_msg_alloc(&pmp->router,
+                                               HAMMER2_LNK_ERROR,
+                                               NULL, NULL);
+                       if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+                               msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+                       msg->any.head.cmd |= HAMMER2_MSGF_DELETE |
+                                            HAMMER2_MSGF_REPLY;
+                       msg->state = state;
+                       state->rxcmd = msg->any.head.cmd &
+                                      ~HAMMER2_MSGF_DELETE;
+                       msg->state->func(state, msg);
+                       hammer2_state_cleanuprx(msg);
+                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+                       goto cleanupwr;
+               }
+               if (state->func == NULL) {
+                       state->flags &= ~HAMMER2_STATE_INSERTED;
                        RB_REMOVE(hammer2_state_tree,
                                  &pmp->statewr_tree, state);
                        hammer2_state_free(state);
+                       goto cleanupwr;
                }
        }
+
+       hammer2_drain_msgq(pmp);
+       if (--retries == 0)
+               panic("hammer2: comm thread shutdown couldn't drain");
+       if (RB_ROOT(&pmp->statewr_tree))
+               goto cleanupwr;
+
+       if ((state = pmp->freewr_state) != NULL) {
+               pmp->freewr_state = NULL;
+               hammer2_state_free(state);
+       }
+
        lockmgr(&pmp->msglk, LK_RELEASE);
 
        /*
-        * Cleanup descriptor, be sure the read size is shutdown so the
-        * (probably blocked) read operations returns an error.
-        *
+        * The state trees had better be empty now
+        */
+       KKASSERT(RB_EMPTY(&pmp->staterd_tree));
+       KKASSERT(RB_EMPTY(&pmp->statewr_tree));
+       KKASSERT(pmp->conn_state == NULL);
+
+       /*
         * pmp can be ripped out from under us once msgwr_td is set to NULL.
         */
-       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
        pmp->msgwr_td = NULL;
        wakeup(pmp);
        lwkt_exit();
 }
 
 /*
+ * This cleans out the pending transmit message queue, adjusting any
+ * persistent states properly in the process.
+ *
+ * Caller must hold pmp->msglk
+ */
+static
+void
+hammer2_drain_msgq(hammer2_pfsmount_t *pmp)
+{
+       hammer2_msg_t *msg;
+
+       /*
+        * Clean out our pending transmit queue, executing the
+        * appropriate state adjustments.  If this tries to open
+        * any new outgoing transactions we have to loop up and
+        * clean them out.
+        */
+       while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
+               TAILQ_REMOVE(&pmp->msgq, msg, qentry);
+               lockmgr(&pmp->msglk, LK_RELEASE);
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               if (hammer2_state_msgtx(msg))
+                       hammer2_msg_free(msg);
+               else
+                       hammer2_state_cleanuptx(msg);
+               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+       }
+}
+
+/*
  * Called with msglk held after queueing a new message, wakes up the
  * transmit thread.  We use an interlock thread to avoid unnecessary
  * wakeups.
@@ -1408,26 +1523,29 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
 {
        hammer2_pfsmount_t *pmp = state->any.pmp;
        hammer2_mount_t *hmp = pmp->hmp;
+       hammer2_msg_t *rmsg;
        size_t name_len;
        int copyid;
 
+       kprintf("LNK_CONN REPLY RECEIVED CMD %08x\n", msg->any.head.cmd);
+
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
-               msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
-                                                     HAMMER2_MSGF_CREATE,
+               rmsg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
+                                                      HAMMER2_MSGF_CREATE,
                                        hammer2_msg_span_reply, pmp);
-               msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
-               msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
-               msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
-               msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
+               rmsg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
+               rmsg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
+               rmsg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
+               rmsg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
                name_len = pmp->iroot->ip_data.name_len;
-               if (name_len >= sizeof(msg->any.lnk_span.label))
-                       name_len = sizeof(msg->any.lnk_span.label) - 1;
+               if (name_len >= sizeof(rmsg->any.lnk_span.label))
+                       name_len = sizeof(rmsg->any.lnk_span.label) - 1;
                bcopy(pmp->iroot->ip_data.filename,
-                     msg->any.lnk_span.label,
+                     rmsg->any.lnk_span.label,
                      name_len);
-               msg->any.lnk_span.label[name_len] = 0;
-               hammer2_msg_write(msg);
+               rmsg->any.lnk_span.label[name_len] = 0;
+               hammer2_msg_write(rmsg);
 
                /*
                 * Dump the configuration stored in the volume header
@@ -1440,7 +1558,8 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
                }
                hammer2_voldata_unlock(hmp);
        }
-       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+       if ((state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+           (msg->any.head.cmd & HAMMER2_MSGF_DELETE)) {
                kprintf("LNK_CONN transaction terminated by remote\n");
                pmp->conn_state = NULL;
                hammer2_msg_reply(msg, 0);
@@ -1448,12 +1567,19 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
        return(0);
 }
 
+/*
+ * Remote terminated our span transaction.  We have to terminate our side.
+ */
 static int
 hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
 {
        hammer2_pfsmount_t *pmp = state->any.pmp;
 
-       kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
+       kprintf("SPAN REPLY - Our sent span was terminated by the remote %08x state %p\n", msg->any.head.cmd, state);
+       if ((state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+           (msg->any.head.cmd & HAMMER2_MSGF_DELETE)) {
+               hammer2_msg_reply(msg, 0);
+       }
        return(0);
 }