static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg);
+static void hammer2_drain_msgq(hammer2_pfsmount_t *pmp);
/*
* HAMMER2 vfs operations.
void
hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp)
{
+ /*
+ * Destroy the current connection
+ */
atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
while (pmp->msgrd_td || pmp->msgwr_td) {
wakeup(&pmp->msg_ctl);
tsleep(pmp, 0, "clstrkl", hz);
}
- atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+
+ /*
+ * Drop communications descriptor
+ */
+ if (pmp->msg_fp) {
+ fdrop(pmp->msg_fp);
+ pmp->msg_fp = NULL;
+ }
+ kprintf("RESTART CONNECTION\n");
+
+ /*
+ * Setup new communications descriptor
+ */
+ pmp->msg_ctl = 0;
pmp->msg_fp = fp;
+ pmp->msg_seq = 0;
lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td,
NULL, 0, -1, "hammer2-msgrd");
lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td,
}
/*
- * XXX simulate MSGF_DELETEs
+ * Shutdown the socket before waiting for the transmit side.
+ *
+ * If we are dying due to e.g. a socket disconnect verses being
+ * killed explicity we have to set KILL in order to kick the tx
+ * side when it might not have any other work to do. KILL might
+ * already be set if we are in an unmount or reconnect.
+ */
+ fp_shutdown(pmp->msg_fp, SHUT_RDWR);
+
+ atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
+ wakeup(&pmp->msg_ctl);
+
+ /*
+ * Wait for the transmit side to drain remaining messages
+ * before cleaning up the rx state. The transmit side will
+ * set KILLTX and wait for the rx side to completely finish
+ * (set msgrd_td to NULL) before cleaning up any remaining
+ * tx states.
*/
- while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) {
- kprintf("y");
- if (state->func &&
- (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
- (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
- lockmgr(&pmp->msglk, LK_RELEASE);
- msg = hammer2_msg_alloc(&pmp->router,
- HAMMER2_LNK_ERROR,
- NULL, NULL);
- if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
- msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
- msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
- msg->state = state;
- msg->state->func(state, msg);
- hammer2_state_cleanuprx(msg);
- lockmgr(&pmp->msglk, LK_EXCLUSIVE);
- } else {
- RB_REMOVE(hammer2_state_tree,
- &pmp->staterd_tree, state);
- hammer2_state_free(state);
- }
- }
lockmgr(&pmp->msglk, LK_RELEASE);
+ atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLRX);
+ wakeup(&pmp->msg_ctl);
+ while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILLTX) == 0) {
+ wakeup(&pmp->msg_ctl);
+ tsleep(pmp, 0, "clstrkw", hz);
+ }
- fp_shutdown(pmp->msg_fp, SHUT_RDWR);
pmp->msgrd_td = NULL;
/* pmp can be ripped out from under us at this point */
wakeup(pmp);
ssize_t res;
size_t name_len;
int error = 0;
+ int retries = 20;
/*
* Open a LNK_CONN transaction indicating that we want to take part
continue;
}
if (error) {
+ hammer2_msg_free(msg);
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
break;
}
hammer2_msg_free(msg);
}
- while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
- TAILQ_REMOVE(&pmp->msgq, msg, qentry);
- if (msg->state && msg->state->msg == msg)
- msg->state->msg = NULL;
- hammer2_msg_free(msg);
- }
+ /*
+ * Shutdown the socket. This will cause the rx thread to get an
+ * EOF and ensure that both threads get to a termination state.
+ */
+ fp_shutdown(pmp->msg_fp, SHUT_RDWR);
- if ((state = pmp->freewr_state) != NULL) {
- pmp->freewr_state = NULL;
- hammer2_state_free(state);
+ /*
+ * Set KILLTX (which the rx side waits for), then wait for the RX
+ * side to completely finish before we clean out any remaining
+ * command states.
+ */
+ lockmgr(&pmp->msglk, LK_RELEASE);
+ atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLTX);
+ wakeup(&pmp->msg_ctl);
+ while (pmp->msgrd_td) {
+ wakeup(&pmp->msg_ctl);
+ tsleep(pmp, 0, "clstrkw", hz);
}
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
/*
- * XXX simulate MSGF_DELETEs
+ * Simulate received MSGF_DELETE's for any remaining states.
*/
- while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) {
- kprintf("x");
+cleanuprd:
+ RB_FOREACH(state, hammer2_state_tree, &pmp->staterd_tree) {
if (state->func &&
- (state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
(state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
lockmgr(&pmp->msglk, LK_RELEASE);
msg = hammer2_msg_alloc(&pmp->router,
msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
msg->any.head.cmd |= HAMMER2_MSGF_DELETE;
msg->state = state;
+ state->rxcmd = msg->any.head.cmd &
+ ~HAMMER2_MSGF_DELETE;
msg->state->func(state, msg);
hammer2_state_cleanuprx(msg);
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
- } else {
+ goto cleanuprd;
+ }
+ if (state->func == NULL) {
+ state->flags &= ~HAMMER2_STATE_INSERTED;
+ RB_REMOVE(hammer2_state_tree,
+ &pmp->staterd_tree, state);
+ hammer2_state_free(state);
+ goto cleanuprd;
+ }
+ }
+
+ /*
+ * NOTE: We have to drain the msgq to handle situations
+ * where received states have built up output
+ * messages, to avoid creating messages with
+ * duplicate CREATE/DELETE flags.
+ */
+cleanupwr:
+ hammer2_drain_msgq(pmp);
+ RB_FOREACH(state, hammer2_state_tree, &pmp->statewr_tree) {
+ if (state->func &&
+ (state->rxcmd & HAMMER2_MSGF_DELETE) == 0) {
+ lockmgr(&pmp->msglk, LK_RELEASE);
+ msg = hammer2_msg_alloc(&pmp->router,
+ HAMMER2_LNK_ERROR,
+ NULL, NULL);
+ if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+ msg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+ msg->any.head.cmd |= HAMMER2_MSGF_DELETE |
+ HAMMER2_MSGF_REPLY;
+ msg->state = state;
+ state->rxcmd = msg->any.head.cmd &
+ ~HAMMER2_MSGF_DELETE;
+ msg->state->func(state, msg);
+ hammer2_state_cleanuprx(msg);
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+ goto cleanupwr;
+ }
+ if (state->func == NULL) {
+ state->flags &= ~HAMMER2_STATE_INSERTED;
RB_REMOVE(hammer2_state_tree,
&pmp->statewr_tree, state);
hammer2_state_free(state);
+ goto cleanupwr;
}
}
+
+ hammer2_drain_msgq(pmp);
+ if (--retries == 0)
+ panic("hammer2: comm thread shutdown couldn't drain");
+ if (RB_ROOT(&pmp->statewr_tree))
+ goto cleanupwr;
+
+ if ((state = pmp->freewr_state) != NULL) {
+ pmp->freewr_state = NULL;
+ hammer2_state_free(state);
+ }
+
lockmgr(&pmp->msglk, LK_RELEASE);
/*
- * Cleanup descriptor, be sure the read size is shutdown so the
- * (probably blocked) read operations returns an error.
- *
+ * The state trees had better be empty now
+ */
+ KKASSERT(RB_EMPTY(&pmp->staterd_tree));
+ KKASSERT(RB_EMPTY(&pmp->statewr_tree));
+ KKASSERT(pmp->conn_state == NULL);
+
+ /*
* pmp can be ripped out from under us once msgwr_td is set to NULL.
*/
- fp_shutdown(pmp->msg_fp, SHUT_RDWR);
pmp->msgwr_td = NULL;
wakeup(pmp);
lwkt_exit();
}
/*
+ * This cleans out the pending transmit message queue, adjusting any
+ * persistent states properly in the process.
+ *
+ * Caller must hold pmp->msglk
+ */
+static
+void
+hammer2_drain_msgq(hammer2_pfsmount_t *pmp)
+{
+ hammer2_msg_t *msg;
+
+ /*
+ * Clean out our pending transmit queue, executing the
+ * appropriate state adjustments. If this tries to open
+ * any new outgoing transactions we have to loop up and
+ * clean them out.
+ */
+ while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
+ TAILQ_REMOVE(&pmp->msgq, msg, qentry);
+ lockmgr(&pmp->msglk, LK_RELEASE);
+ if (msg->state && msg->state->msg == msg)
+ msg->state->msg = NULL;
+ if (hammer2_state_msgtx(msg))
+ hammer2_msg_free(msg);
+ else
+ hammer2_state_cleanuptx(msg);
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+ }
+}
+
+/*
* Called with msglk held after queueing a new message, wakes up the
* transmit thread. We use an interlock thread to avoid unnecessary
* wakeups.
{
hammer2_pfsmount_t *pmp = state->any.pmp;
hammer2_mount_t *hmp = pmp->hmp;
+ hammer2_msg_t *rmsg;
size_t name_len;
int copyid;
+ kprintf("LNK_CONN REPLY RECEIVED CMD %08x\n", msg->any.head.cmd);
+
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
- msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
- HAMMER2_MSGF_CREATE,
+ rmsg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
+ HAMMER2_MSGF_CREATE,
hammer2_msg_span_reply, pmp);
- msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
- msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
- msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
- msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
+ rmsg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
+ rmsg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
+ rmsg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
+ rmsg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
name_len = pmp->iroot->ip_data.name_len;
- if (name_len >= sizeof(msg->any.lnk_span.label))
- name_len = sizeof(msg->any.lnk_span.label) - 1;
+ if (name_len >= sizeof(rmsg->any.lnk_span.label))
+ name_len = sizeof(rmsg->any.lnk_span.label) - 1;
bcopy(pmp->iroot->ip_data.filename,
- msg->any.lnk_span.label,
+ rmsg->any.lnk_span.label,
name_len);
- msg->any.lnk_span.label[name_len] = 0;
- hammer2_msg_write(msg);
+ rmsg->any.lnk_span.label[name_len] = 0;
+ hammer2_msg_write(rmsg);
/*
* Dump the configuration stored in the volume header
}
hammer2_voldata_unlock(hmp);
}
- if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+ if ((state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+ (msg->any.head.cmd & HAMMER2_MSGF_DELETE)) {
kprintf("LNK_CONN transaction terminated by remote\n");
pmp->conn_state = NULL;
hammer2_msg_reply(msg, 0);
return(0);
}
+/*
+ * Remote terminated our span transaction. We have to terminate our side.
+ */
static int
hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
{
hammer2_pfsmount_t *pmp = state->any.pmp;
- kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
+ kprintf("SPAN REPLY - Our sent span was terminated by the remote %08x state %p\n", msg->any.head.cmd, state);
+ if ((state->txcmd & HAMMER2_MSGF_DELETE) == 0 &&
+ (msg->any.head.cmd & HAMMER2_MSGF_DELETE)) {
+ hammer2_msg_reply(msg, 0);
+ }
return(0);
}