From 0c98b96630fb437813171614524285d86ab8c924 Mon Sep 17 00:00:00 2001 From: Matthew Dillon Date: Sat, 1 Dec 2012 02:24:49 -0800 Subject: [PATCH] cluster - remote block device work * xdisk(client) <-> diskiocom(server) now successfully does a full message transaction over a VC for xa_open(). Still lots to do. * Fix I/O request deferrals since the open can occur before the VC is fully established. This fixes a deadlock. * Fix state->msg tracking in kern_dmsg.c. Fixes a double-free. --- sys/dev/disk/xdisk/xdisk.c | 48 ++++++++++++++++++++++++++++++++++++-- sys/kern/kern_dmsg.c | 20 ++++++---------- 2 files changed, 53 insertions(+), 15 deletions(-) diff --git a/sys/dev/disk/xdisk/xdisk.c b/sys/dev/disk/xdisk/xdisk.c index 8aff1647a6..95132b01c4 100644 --- a/sys/dev/disk/xdisk/xdisk.c +++ b/sys/dev/disk/xdisk/xdisk.c @@ -43,6 +43,10 @@ * * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d * devices. These devices look like raw disks to the system. + * + * TODO: + * Handle circuit disconnects, leave bio's pending + * Restart bio's on circuit reconnect. */ #include #include @@ -120,6 +124,7 @@ static uint32_t xa_wait(xa_tag_t *tag, int seq); static void xa_done(xa_tag_t *tag, int wasbio); static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); +static void xa_restart_deferred(xa_softc_t *xa); MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks"); @@ -521,6 +526,9 @@ xa_autodmsg(kdmsg_msg_t *msg) */ switch(xcmd) { case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY: + /* + * Track established circuits + */ kprintf("XA: Received autodmsg: CREATE+REPLY\n"); circ = msg->state->any.circ; lwkt_gettoken(&xa->tok); @@ -535,6 +543,11 @@ xa_autodmsg(kdmsg_msg_t *msg) TAILQ_INSERT_TAIL(&xa->circq, circ, entry); circ->recorded = 1; } + + /* + * Restart any deferred I/O. + */ + xa_restart_deferred(xa); lwkt_reltoken(&xa->tok); break; case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY: @@ -715,10 +728,12 @@ xa_strategy(struct dev_strategy_args *ap) xa_tag_t *tag; struct bio *bio = ap->a_bio; +#if 0 bio->bio_buf->b_error = ENXIO; bio->bio_buf->b_flags |= B_ERROR; biodone(bio); return(0); +#endif tag = xa_setup_cmd(xa, bio); if (tag) @@ -763,7 +778,6 @@ xa_setup_cmd(xa_softc_t *xa, struct bio *bio) tag = NULL; } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) { TAILQ_REMOVE(&xa->tag_freeq, tag, entry); - TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry); tag->bio = bio; tag->circuit = circ->circ_state->msgid; } @@ -843,7 +857,14 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg) tag->done = 0; tag->waitseq = 0; if (msg) { +#if 0 + lwkt_gettoken(&xa->tok); + TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry); +#endif tag->state = msg->state; +#if 0 + lwkt_reltoken(&xa->tok); +#endif kdmsg_msg_write(msg); } else { xa_done(tag, 1); @@ -868,7 +889,6 @@ xa_done(xa_tag_t *tag, int wasbio) xa_softc_t *xa = tag->xa; struct bio *bio; - KKASSERT(tag->msg == NULL); KKASSERT(tag->bio == NULL); tag->done = 1; @@ -995,3 +1015,27 @@ handle_done: xa_done(tag, 1); return (0); } + +/* + * Restart as much deferred I/O as we can. + * + * Called with xa->tok held + */ +static +void +xa_restart_deferred(xa_softc_t *xa) +{ + struct bio *bio; + xa_tag_t *tag; + + while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) { + tag = xa_setup_cmd(xa, NULL); + if (tag == NULL) + break; + kprintf("xa: Restart BIO %p on %s\n", + bio, xa->iocom.auto_lnk_conn.fs_label); + TAILQ_REMOVE(&xa->bioq, bio, bio_act); + tag->bio = bio; + xa_start(tag, NULL); + } +} diff --git a/sys/kern/kern_dmsg.c b/sys/kern/kern_dmsg.c index e8ea583c0e..999b0711ac 100644 --- a/sys/kern/kern_dmsg.c +++ b/sys/kern/kern_dmsg.c @@ -53,15 +53,15 @@ RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp); -static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token); static void kdmsg_circ_free_check(kdmsg_circuit_t *circ); - static void kdmsg_iocom_thread_rd(void *arg); static void kdmsg_iocom_thread_wr(void *arg); static int kdmsg_autorxmsg(kdmsg_msg_t *msg); static void kdmsg_autocirc(kdmsg_msg_t *msg); static int kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg); +static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token); + /* * Initialize the roll-up communications structure for a network * messaging session. This function does not install the socket. @@ -690,11 +690,6 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg) kdmsg_state_t *state; int error; - /* - * XXX resolve msg->any.head.source and msg->any.head.target - * into LNK_SPAN references. - */ - /* * Make sure a state structure is ready to go in case we need a new * one. This is the only routine which uses freerd_state so no @@ -1100,8 +1095,6 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg) lockmgr(&iocom->msglk, LK_EXCLUSIVE); state->rxcmd |= DMSGF_DELETE; if (state->txcmd & DMSGF_DELETE) { - if (state->msg == msg) - state->msg = NULL; KKASSERT(state->flags & KDMSG_STATE_INSERTED); if (state->rxcmd & DMSGF_REPLY) { KKASSERT(msg->any.head.cmd & @@ -1118,9 +1111,10 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg) lockmgr(&iocom->msglk, LK_RELEASE); kdmsg_state_free(state); } else { + if (state->msg != msg) + kdmsg_msg_free(msg); lockmgr(&iocom->msglk, LK_RELEASE); } - kdmsg_msg_free(msg); } else if (state->msg != msg) { kdmsg_msg_free(msg); } @@ -1332,8 +1326,6 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg) lockmgr(&iocom->msglk, LK_EXCLUSIVE); state->txcmd |= DMSGF_DELETE; if (state->rxcmd & DMSGF_DELETE) { - if (state->msg == msg) - state->msg = NULL; KKASSERT(state->flags & KDMSG_STATE_INSERTED); if (state->txcmd & DMSGF_REPLY) { KKASSERT(msg->any.head.cmd & @@ -1347,12 +1339,14 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg) &iocom->statewr_tree, state); } state->flags &= ~KDMSG_STATE_INSERTED; + msg->state = NULL; lockmgr(&iocom->msglk, LK_RELEASE); kdmsg_state_free(state); } else { + if (state->msg != msg) + kdmsg_msg_free(msg); lockmgr(&iocom->msglk, LK_RELEASE); } - kdmsg_msg_free(msg); } else if (state->msg != msg) { kdmsg_msg_free(msg); } -- 2.41.0