From: Matthew Dillon Date: Sat, 11 Aug 2012 21:56:27 +0000 (-0700) Subject: hammer2: More hammer2 msg cleanup, get reconnect ioctl working X-Git-Tag: v3.4.0rc~1018 X-Git-Url: http://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/70c3c3b738fd7fbc0be7ee5f70bb483a659cc34b hammer2: More hammer2 msg cleanup, get reconnect ioctl working * Fix state machine sequencing issues when shutting down a connection. We have to properly simulate DELETE messages for any open transactional states and give the function callbacks time to do there stuff. * Properly respond to DELETE messages for LNK_CONN and LNK_SPAN transactions. * Stabilize the RECLUSTER ioctl. --- diff --git a/sys/vfs/hammer2/hammer2.h b/sys/vfs/hammer2/hammer2.h index 3000763..595200d 100644 --- a/sys/vfs/hammer2/hammer2.h +++ b/sys/vfs/hammer2/hammer2.h @@ -335,7 +335,7 @@ struct hammer2_pfsmount { thread_t msgwr_td; /* cluster thread */ int msg_ctl; /* wakeup flags */ int msg_seq; /* cluster msg sequence id */ - uint32_t msgid_iterator; + uint32_t reserved01; struct lock msglk; /* lockmgr lock */ TAILQ_HEAD(, hammer2_msg) msgq; /* transmit queue */ struct hammer2_state *conn_state; /* active LNK_CONN state */ @@ -352,7 +352,9 @@ typedef struct hammer2_pfsmount hammer2_pfsmount_t; * msg_ctl flags (atomic) */ #define HAMMER2_CLUSTERCTL_KILL 0x00000001 -#define HAMMER2_CLUSTERCTL_SLEEPING 0x00000002 /* interlocked w/msglk */ +#define HAMMER2_CLUSTERCTL_KILLRX 0x00000002 /* staged helper exit */ +#define HAMMER2_CLUSTERCTL_KILLTX 0x00000004 /* staged helper exit */ +#define HAMMER2_CLUSTERCTL_SLEEPING 0x00000008 /* interlocked w/msglk */ /* * Transactional state structure, representing an open transaction. The @@ -379,6 +381,7 @@ struct hammer2_state { #define HAMMER2_STATE_INSERTED 0x0001 #define HAMMER2_STATE_DYNAMIC 0x0002 +#define HAMMER2_STATE_DELPEND 0x0004 /* transmit delete pending */ struct hammer2_msg { TAILQ_ENTRY(hammer2_msg) qentry; /* serialized queue */ diff --git a/sys/vfs/hammer2/hammer2_msg.c b/sys/vfs/hammer2/hammer2_msg.c index 272fa5a..a7619e3 100644 --- a/sys/vfs/hammer2/hammer2_msg.c +++ b/sys/vfs/hammer2/hammer2_msg.c @@ -577,6 +577,7 @@ hammer2_state_free(hammer2_state_t *state) hammer2_pfsmount_t *pmp = state->pmp; hammer2_msg_t *msg; + KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0); msg = state->msg; state->msg = NULL; kfree(state, pmp->mmsg); @@ -626,6 +627,7 @@ hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd, lockmgr(&pmp->msglk, LK_EXCLUSIVE); if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state)) panic("duplicate msgid allocated"); + state->flags |= HAMMER2_STATE_INSERTED; msg->any.head.msgid = state->msgid; lockmgr(&pmp->msglk, LK_RELEASE); } @@ -753,10 +755,8 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error) * doing anything. */ if (state) { - if (state->txcmd & HAMMER2_MSGF_DELETE) { - hammer2_msg_free(msg); + if (state->txcmd & HAMMER2_MSGF_DELETE) return; - } if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) cmd |= HAMMER2_MSGF_CREATE; if (state->txcmd & HAMMER2_MSGF_REPLY) @@ -766,6 +766,7 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error) if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0) cmd |= HAMMER2_MSGF_REPLY; } + kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd); /* XXX messy mask cmd to avoid allocating state */ nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK, @@ -804,10 +805,8 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error) * doing anything. */ if (state) { - if (state->txcmd & HAMMER2_MSGF_DELETE) { - hammer2_msg_free(msg); + if (state->txcmd & HAMMER2_MSGF_DELETE) return; - } if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) cmd |= HAMMER2_MSGF_CREATE; if (state->txcmd & HAMMER2_MSGF_REPLY) diff --git a/sys/vfs/hammer2/hammer2_vfsops.c b/sys/vfs/hammer2/hammer2_vfsops.c index 7ebe6a5..21c8652 100644 --- a/sys/vfs/hammer2/hammer2_vfsops.c +++ b/sys/vfs/hammer2/hammer2_vfsops.c @@ -142,6 +142,7 @@ static void hammer2_cluster_thread_wr(void *arg); static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg); static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg); static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg); +static void hammer2_drain_msgq(hammer2_pfsmount_t *pmp); /* * HAMMER2 vfs operations. @@ -1013,13 +1014,30 @@ hammer2_install_volume_header(hammer2_mount_t *hmp) void hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp) { + /* + * Destroy the current connection + */ atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL); while (pmp->msgrd_td || pmp->msgwr_td) { wakeup(&pmp->msg_ctl); tsleep(pmp, 0, "clstrkl", hz); } - atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL); + + /* + * Drop communications descriptor + */ + if (pmp->msg_fp) { + fdrop(pmp->msg_fp); + pmp->msg_fp = NULL; + } + kprintf("RESTART CONNECTION\n"); + + /* + * Setup new communications descriptor + */ + pmp->msg_ctl = 0; pmp->msg_fp = fp; + pmp->msg_seq = 0; lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td, NULL, 0, -1, "hammer2-msgrd"); lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td, @@ -1160,33 +1178,33 @@ hammer2_cluster_thread_rd(void *arg) } /* - * XXX simulate MSGF_DELETEs + * Shutdown the socket before waiting for the transmit side. + * + * If we are dying due to e.g. a socket disconnect verses being + * killed explicity we have to set KILL in order to kick the tx + * side when it might not have any other work to do. KILL might + * already be set if we are in an unmount or reconnect. + */ + fp_shutdown(pmp->msg_fp, SHUT_RDWR); + + atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL); + wakeup(&pmp->msg_ctl); + + /* + * Wait for the transmit side to drain remaining messages + * before cleaning up the rx state. The transmit side will + * set KILLTX and wait for the rx side to completely finish + * (set msgrd_td to NULL) before cleaning up any remaining + * tx states. */ - while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) { - kprintf("y"); - if (state->func && - (state->txcmd & HAMMER2_MSGF_DELETE) == 0 && - (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) { - lockmgr(&pmp->msglk, LK_RELEASE); - msg = hammer2_msg_alloc(&pmp->router, - HAMMER2_LNK_ERROR, - NULL, NULL); - if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) - msg->any.head.cmd |= HAMMER2_MSGF_CREATE; - msg->any.head.cmd |= HAMMER2_MSGF_DELETE; - msg->state = state; - msg->state->func(state, msg); - hammer2_state_cleanuprx(msg); - lockmgr(&pmp->msglk, LK_EXCLUSIVE); - } else { - RB_REMOVE(hammer2_state_tree, - &pmp->staterd_tree, state); - hammer2_state_free(state); - } - } lockmgr(&pmp->msglk, LK_RELEASE); + atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLRX); + wakeup(&pmp->msg_ctl); + while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILLTX) == 0) { + wakeup(&pmp->msg_ctl); + tsleep(pmp, 0, "clstrkw", hz); + } - fp_shutdown(pmp->msg_fp, SHUT_RDWR); pmp->msgrd_td = NULL; /* pmp can be ripped out from under us at this point */ wakeup(pmp); @@ -1203,6 +1221,7 @@ hammer2_cluster_thread_wr(void *arg) ssize_t res; size_t name_len; int error = 0; + int retries = 20; /* * Open a LNK_CONN transaction indicating that we want to take part @@ -1265,6 +1284,7 @@ hammer2_cluster_thread_wr(void *arg) continue; } if (error) { + hammer2_msg_free(msg); lockmgr(&pmp->msglk, LK_EXCLUSIVE); break; } @@ -1308,25 +1328,32 @@ hammer2_cluster_thread_wr(void *arg) hammer2_msg_free(msg); } - while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) { - TAILQ_REMOVE(&pmp->msgq, msg, qentry); - if (msg->state && msg->state->msg == msg) - msg->state->msg = NULL; - hammer2_msg_free(msg); - } + /* + * Shutdown the socket. This will cause the rx thread to get an + * EOF and ensure that both threads get to a termination state. + */ + fp_shutdown(pmp->msg_fp, SHUT_RDWR); - if ((state = pmp->freewr_state) != NULL) { - pmp->freewr_state = NULL; - hammer2_state_free(state); + /* + * Set KILLTX (which the rx side waits for), then wait for the RX + * side to completely finish before we clean out any remaining + * command states. + */ + lockmgr(&pmp->msglk, LK_RELEASE); + atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLTX); + wakeup(&pmp->msg_ctl); + while (pmp->msgrd_td) { + wakeup(&pmp->msg_ctl); + tsleep(pmp, 0, "clstrkw", hz); } + lockmgr(&pmp->msglk, LK_EXCLUSIVE); /* - * XXX simulate MSGF_DELETEs + * Simulate received MSGF_DELETE's for any remaining states. */ - while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) { - kprintf("x"); +cleanuprd: + RB_FOREACH(state, hammer2_state_tree, &pmp->staterd_tree) { if (state->func && - (state->txcmd & HAMMER2_MSGF_DELETE) == 0 && (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) { lockmgr(&pmp->msglk, LK_RELEASE); msg = hammer2_msg_alloc(&pmp->router, @@ -1336,30 +1363,118 @@ hammer2_cluster_thread_wr(void *arg) msg->any.head.cmd |= HAMMER2_MSGF_CREATE; msg->any.head.cmd |= HAMMER2_MSGF_DELETE; msg->state = state; + state->rxcmd = msg->any.head.cmd & + ~HAMMER2_MSGF_DELETE; msg->state->func(state, msg); hammer2_state_cleanuprx(msg); lockmgr(&pmp->msglk, LK_EXCLUSIVE); - } else { + goto cleanuprd; + } + if (state->func == NULL) { + state->flags &= ~HAMMER2_STATE_INSERTED; + RB_REMOVE(hammer2_state_tree, + &pmp->staterd_tree, state); + hammer2_state_free(state); + goto cleanuprd; + } + } + + /* + * NOTE: We have to drain the msgq to handle situations + * where received states have built up output + * messages, to avoid creating messages with + * duplicate CREATE/DELETE flags. + */ +cleanupwr: + hammer2_drain_msgq(pmp); + RB_FOREACH(state, hammer2_state_tree, &pmp->statewr_tree) { + if (state->func && + (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) { + lockmgr(&pmp->msglk, LK_RELEASE); + msg = hammer2_msg_alloc(&pmp->router, + HAMMER2_LNK_ERROR, + NULL, NULL); + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) + msg->any.head.cmd |= HAMMER2_MSGF_CREATE; + msg->any.head.cmd |= HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY; + msg->state = state; + state->rxcmd = msg->any.head.cmd & + ~HAMMER2_MSGF_DELETE; + msg->state->func(state, msg); + hammer2_state_cleanuprx(msg); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + goto cleanupwr; + } + if (state->func == NULL) { + state->flags &= ~HAMMER2_STATE_INSERTED; RB_REMOVE(hammer2_state_tree, &pmp->statewr_tree, state); hammer2_state_free(state); + goto cleanupwr; } } + + hammer2_drain_msgq(pmp); + if (--retries == 0) + panic("hammer2: comm thread shutdown couldn't drain"); + if (RB_ROOT(&pmp->statewr_tree)) + goto cleanupwr; + + if ((state = pmp->freewr_state) != NULL) { + pmp->freewr_state = NULL; + hammer2_state_free(state); + } + lockmgr(&pmp->msglk, LK_RELEASE); /* - * Cleanup descriptor, be sure the read size is shutdown so the - * (probably blocked) read operations returns an error. - * + * The state trees had better be empty now + */ + KKASSERT(RB_EMPTY(&pmp->staterd_tree)); + KKASSERT(RB_EMPTY(&pmp->statewr_tree)); + KKASSERT(pmp->conn_state == NULL); + + /* * pmp can be ripped out from under us once msgwr_td is set to NULL. */ - fp_shutdown(pmp->msg_fp, SHUT_RDWR); pmp->msgwr_td = NULL; wakeup(pmp); lwkt_exit(); } /* + * This cleans out the pending transmit message queue, adjusting any + * persistent states properly in the process. + * + * Caller must hold pmp->msglk + */ +static +void +hammer2_drain_msgq(hammer2_pfsmount_t *pmp) +{ + hammer2_msg_t *msg; + + /* + * Clean out our pending transmit queue, executing the + * appropriate state adjustments. If this tries to open + * any new outgoing transactions we have to loop up and + * clean them out. + */ + while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) { + TAILQ_REMOVE(&pmp->msgq, msg, qentry); + lockmgr(&pmp->msglk, LK_RELEASE); + if (msg->state && msg->state->msg == msg) + msg->state->msg = NULL; + if (hammer2_state_msgtx(msg)) + hammer2_msg_free(msg); + else + hammer2_state_cleanuptx(msg); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + } +} + +/* * Called with msglk held after queueing a new message, wakes up the * transmit thread. We use an interlock thread to avoid unnecessary * wakeups. @@ -1408,26 +1523,29 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg) { hammer2_pfsmount_t *pmp = state->any.pmp; hammer2_mount_t *hmp = pmp->hmp; + hammer2_msg_t *rmsg; size_t name_len; int copyid; + kprintf("LNK_CONN REPLY RECEIVED CMD %08x\n", msg->any.head.cmd); + if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { kprintf("LNK_CONN transaction replied to, initiate SPAN\n"); - msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN | - HAMMER2_MSGF_CREATE, + rmsg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN | + HAMMER2_MSGF_CREATE, hammer2_msg_span_reply, pmp); - msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid; - msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid; - msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type; - msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1; + rmsg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid; + rmsg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid; + rmsg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type; + rmsg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1; name_len = pmp->iroot->ip_data.name_len; - if (name_len >= sizeof(msg->any.lnk_span.label)) - name_len = sizeof(msg->any.lnk_span.label) - 1; + if (name_len >= sizeof(rmsg->any.lnk_span.label)) + name_len = sizeof(rmsg->any.lnk_span.label) - 1; bcopy(pmp->iroot->ip_data.filename, - msg->any.lnk_span.label, + rmsg->any.lnk_span.label, name_len); - msg->any.lnk_span.label[name_len] = 0; - hammer2_msg_write(msg); + rmsg->any.lnk_span.label[name_len] = 0; + hammer2_msg_write(rmsg); /* * Dump the configuration stored in the volume header @@ -1440,7 +1558,8 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg) } hammer2_voldata_unlock(hmp); } - if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + if ((state->txcmd & HAMMER2_MSGF_DELETE) == 0 && + (msg->any.head.cmd & HAMMER2_MSGF_DELETE)) { kprintf("LNK_CONN transaction terminated by remote\n"); pmp->conn_state = NULL; hammer2_msg_reply(msg, 0); @@ -1448,12 +1567,19 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg) return(0); } +/* + * Remote terminated our span transaction. We have to terminate our side. + */ static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg) { hammer2_pfsmount_t *pmp = state->any.pmp; - kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp); + kprintf("SPAN REPLY - Our sent span was terminated by the remote %08x state %p\n", msg->any.head.cmd, state); + if ((state->txcmd & HAMMER2_MSGF_DELETE) == 0 && + (msg->any.head.cmd & HAMMER2_MSGF_DELETE)) { + hammer2_msg_reply(msg, 0); + } return(0); }