From: Matthew Dillon Date: Fri, 30 Nov 2012 23:35:44 +0000 (-0800) Subject: kernel - Preliminary xdisk remote block driver for cluster X-Git-Tag: v3.4.0rc~769 X-Git-Url: https://gitweb.dragonflybsd.org/~hofmann/dragonfly.git/commitdiff_plain/0f50fb465b70dde1d5a607feac25b4aad05a8321 kernel - Preliminary xdisk remote block driver for cluster * Basic infrastructure, ties into kern_dmsg.c cluster messaging system. (still a skeleton). * Allows a userland program (aka hammer2 service) to add networked block devices to the local machine. --- diff --git a/sys/dev/disk/xdisk/xdisk.c b/sys/dev/disk/xdisk/xdisk.c index dd5812024f..8aff1647a6 100644 --- a/sys/dev/disk/xdisk/xdisk.c +++ b/sys/dev/disk/xdisk/xdisk.c @@ -66,13 +66,60 @@ #include #include +struct xa_softc; + +struct xa_tag { + TAILQ_ENTRY(xa_tag) entry; + struct xa_softc *xa; + dmsg_blk_error_t status; + kdmsg_state_t *state; + struct bio *bio; + uint64_t circuit; + int running; /* transaction running */ + int waitseq; /* streaming reply */ + int done; /* final (transaction closed) */ +}; + +typedef struct xa_tag xa_tag_t; + +struct xa_softc { + TAILQ_ENTRY(xa_softc) entry; + cdev_t dev; + kdmsg_iocom_t iocom; + struct xdisk_attach_ioctl xaioc; + struct disk_info info; + struct disk disk; + uuid_t pfs_fsid; + int unit; + int serializing; + int attached; + int opencnt; + uint64_t keyid; + xa_tag_t *opentag; + TAILQ_HEAD(, bio) bioq; + TAILQ_HEAD(, xa_tag) tag_freeq; + TAILQ_HEAD(, xa_tag) tag_pendq; + TAILQ_HEAD(, kdmsg_circuit) circq; + struct lwkt_token tok; +}; + +typedef struct xa_softc xa_softc_t; + +#define MAXTAGS 64 /* no real limit */ + static int xdisk_attach(struct xdisk_attach_ioctl *xaioc); +static int xdisk_detach(struct xdisk_attach_ioctl *xaioc); static void xa_exit(kdmsg_iocom_t *iocom); -static int xa_msg_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg); -static int xa_msg_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg); -static int xa_lnk_rcvmsg(kdmsg_msg_t *msg); -static int xa_lnk_dbgmsg(kdmsg_msg_t *msg); -static int xa_adhoc_input(kdmsg_msg_t *msg); +static void xa_terminate_check(struct xa_softc *xa); +static int xa_rcvdmsg(kdmsg_msg_t *msg); +static void xa_autodmsg(kdmsg_msg_t *msg); + +static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio); +static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg); +static uint32_t xa_wait(xa_tag_t *tag, int seq); +static void xa_done(xa_tag_t *tag, int wasbio); +static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); +static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg); MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks"); @@ -84,7 +131,7 @@ static d_close_t xdisk_close; static d_ioctl_t xdisk_ioctl; static struct dev_ops xdisk_ops = { - { "xdisk", 0, D_MPSAFE }, + { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE }, .d_open = xdisk_open, .d_close = xdisk_close, .d_ioctl = xdisk_ioctl @@ -100,7 +147,7 @@ static d_strategy_t xa_strategy; static d_psize_t xa_size; static struct dev_ops xa_ops = { - { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE }, + { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE }, .d_open = xa_open, .d_close = xa_close, .d_ioctl = xa_ioctl, @@ -110,19 +157,6 @@ static struct dev_ops xa_ops = { .d_psize = xa_size }; -struct xa_softc { - TAILQ_ENTRY(xa_softc) entry; - cdev_t dev; - kdmsg_iocom_t iocom; - struct xdisk_attach_ioctl xaioc; - struct disk_info info; - struct disk disk; - uuid_t pfs_fsid; - int unit; - int inprog; - int connected; -}; - static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token); static int xdisk_opencount; static cdev_t xdisk_dev; @@ -165,6 +199,7 @@ DEV_MODULE(xdisk, xdisk_modevent, 0); static int xdisk_open(struct dev_open_args *ap) { + kprintf("XDISK_OPEN\n"); lwkt_gettoken(&xdisk_token); ++xdisk_opencount; lwkt_reltoken(&xdisk_token); @@ -174,6 +209,7 @@ xdisk_open(struct dev_open_args *ap) static int xdisk_close(struct dev_close_args *ap) { + kprintf("XDISK_CLOSE\n"); lwkt_gettoken(&xdisk_token); --xdisk_opencount; lwkt_reltoken(&xdisk_token); @@ -189,6 +225,9 @@ xdisk_ioctl(struct dev_ioctl_args *ap) case XDISKIOCATTACH: error = xdisk_attach((void *)ap->a_data); break; + case XDISKIOCDETACH: + error = xdisk_detach((void *)ap->a_data); + break; default: error = ENOTTY; break; @@ -203,135 +242,191 @@ xdisk_ioctl(struct dev_ioctl_args *ap) static int xdisk_attach(struct xdisk_attach_ioctl *xaioc) { - struct xa_softc *scan; - struct xa_softc *xa; + xa_softc_t *xa; + xa_tag_t *tag; struct file *fp; - kdmsg_msg_t *msg; int unit; + int n; char devname[64]; cdev_t dev; + kprintf("xdisk attach %d %jd/%d %s %s\n", + xaioc->fd, (intmax_t)xaioc->bytes, xaioc->blksize, + xaioc->cl_label, xaioc->fs_label); + + /* + * Normalize ioctl params + */ fp = holdfp(curproc->p_fd, xaioc->fd, -1); if (fp == NULL) return EINVAL; - - xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO); + if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0) + return EINVAL; + if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0) + return EINVAL; + if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE) + return EINVAL; /* - * Find unit + * See if the serial number is already present. If we are + * racing a termination the disk subsystem may still have + * duplicate entries not yet removed so we wait a bit and + * retry. */ lwkt_gettoken(&xdisk_token); - unit = 0; - do { - TAILQ_FOREACH(scan, &xa_queue, entry) { - if (scan->unit == unit) +again: + TAILQ_FOREACH(xa, &xa_queue, entry) { + if (strcmp(xa->iocom.auto_lnk_conn.fs_label, + xaioc->fs_label) == 0) { + if (xa->serializing) { + tsleep(xa, 0, "xadelay", hz / 10); + goto again; + } + xa->serializing = 1; + kdmsg_iocom_uninit(&xa->iocom); + break; + } + } + + /* + * Create a new xa if not already present + */ + if (xa == NULL) { + unit = 0; + for (;;) { + TAILQ_FOREACH(xa, &xa_queue, entry) { + if (xa->unit == unit) + break; + } + if (xa == NULL) break; + ++unit; } - } while (scan != NULL); - xa->unit = unit; + xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO); + kprintf("ALLOCATE XA %p\n", xa); + xa->unit = unit; + xa->serializing = 1; + lwkt_token_init(&xa->tok, "xa"); + TAILQ_INIT(&xa->circq); + TAILQ_INIT(&xa->bioq); + TAILQ_INIT(&xa->tag_freeq); + TAILQ_INIT(&xa->tag_pendq); + for (n = 0; n < MAXTAGS; ++n) { + tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO); + tag->xa = xa; + TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry); + } + TAILQ_INSERT_TAIL(&xa_queue, xa, entry); + } xa->xaioc = *xaioc; - TAILQ_INSERT_TAIL(&xa_queue, xa, entry); + xa->attached = 1; lwkt_reltoken(&xdisk_token); /* * Create device */ - dev = disk_create(unit, &xa->disk, &xa_ops); - dev->si_drv1 = xa; - xa->dev = dev; + if (xa->dev == NULL) { + dev = disk_create(unit, &xa->disk, &xa_ops); + dev->si_drv1 = xa; + xa->dev = dev; + } - xa->info.d_media_blksize = 512; - xa->info.d_media_blocks = xaioc->size / 512; + xa->info.d_media_blksize = xaioc->blksize; + xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize; xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE; xa->info.d_secpertrack = 32; xa->info.d_nheads = 64; xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads; xa->info.d_ncylinders = 0; - disk_setdiskinfo_sync(&xa->disk, &xa->info); + if (xa->xaioc.fs_label[0]) + xa->info.d_serialno = xa->xaioc.fs_label; /* * Set up messaging connection */ ksnprintf(devname, sizeof(devname), "xa%d", unit); - kdmsg_iocom_init(&xa->iocom, xa, M_XDISK, - xa_lnk_rcvmsg, - xa_lnk_dbgmsg, - xa_adhoc_input); + kdmsg_iocom_init(&xa->iocom, xa, + KDMSG_IOCOMF_AUTOCONN | + KDMSG_IOCOMF_AUTOSPAN | + KDMSG_IOCOMF_AUTOCIRC | + KDMSG_IOCOMF_AUTOFORGE, + M_XDISK, xa_rcvdmsg); xa->iocom.exit_func = xa_exit; - xa->inprog = 1; - kern_uuidgen(&xa->pfs_fsid, 1); + kdmsg_iocom_reconnect(&xa->iocom, fp, devname); /* - * Issue DMSG_LNK_CONN for device. This sets up filters so hopefully - * the only SPANs we receive are from servers providing the label - * being configured. Hopefully that's just a single server(!)(!). - * (HAMMER peers might have multiple servers but block device peers - * currently only allow one). There could still be multiple spans - * due to there being multiple paths available, however. + * Setup our LNK_CONN advertisement for autoinitiate. + * + * Our filter is setup to only accept PEER_BLOCK/SERVER + * advertisements. */ - - msg = kdmsg_msg_alloc(&xa->iocom.router, DMSG_LNK_CONN | DMSGF_CREATE, - xa_msg_conn_reply, xa); - msg->any.lnk_conn.pfs_type = 0; - msg->any.lnk_conn.proto_version = DMSG_SPAN_PROTO_1; - msg->any.lnk_conn.peer_type = DMSG_PEER_BLOCK; - msg->any.lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK; - ksnprintf(msg->any.lnk_conn.cl_label, - sizeof(msg->any.lnk_conn.cl_label), + xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT; + xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1; + xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK; + xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK; + xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER; + ksnprintf(xa->iocom.auto_lnk_conn.cl_label, + sizeof(xa->iocom.auto_lnk_conn.cl_label), "%s", xaioc->cl_label); - msg->any.lnk_conn.pfs_fsid = xa->pfs_fsid; - xa->iocom.conn_state = msg->state; - kdmsg_msg_write(msg); - xa->inprog = 0; /* unstall msg thread exit (if racing) */ + /* + * We need a unique pfs_fsid to avoid confusion. + * We supply a rendezvous fs_label using the serial number. + */ + kern_uuidgen(&xa->pfs_fsid, 1); + xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid; + ksnprintf(xa->iocom.auto_lnk_conn.fs_label, + sizeof(xa->iocom.auto_lnk_conn.fs_label), + "%s", xaioc->fs_label); - return(0); -} + /* + * Setup our LNK_SPAN advertisement for autoinitiate + */ + xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT; + xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1; + xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK; + ksnprintf(xa->iocom.auto_lnk_span.cl_label, + sizeof(xa->iocom.auto_lnk_span.cl_label), + "%s", xa->xaioc.cl_label); + + kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg); + disk_setdiskinfo_sync(&xa->disk, &xa->info); + + lwkt_gettoken(&xdisk_token); + xa->serializing = 0; + xa_terminate_check(xa); + lwkt_reltoken(&xdisk_token); -/* - * Handle reply to our LNK_CONN transaction (transaction remains open) - */ -static -int -xa_msg_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg) -{ - struct xa_softc *xa = state->any.any; - kdmsg_msg_t *rmsg; - - if (msg->any.head.cmd & DMSGF_CREATE) { - kprintf("XA LNK_CONN received reply\n"); - rmsg = kdmsg_msg_alloc(&xa->iocom.router, - DMSG_LNK_SPAN | DMSGF_CREATE, - xa_msg_span_reply, xa); - rmsg->any.lnk_span.pfs_type = 0; - rmsg->any.lnk_span.proto_version = DMSG_SPAN_PROTO_1; - rmsg->any.lnk_span.peer_type = DMSG_PEER_BLOCK; - - ksnprintf(rmsg->any.lnk_span.cl_label, - sizeof(rmsg->any.lnk_span.cl_label), - "%s", xa->xaioc.cl_label); - kdmsg_msg_write(rmsg); - } - if ((state->txcmd & DMSGF_DELETE) == 0 && - (msg->any.head.cmd & DMSGF_DELETE)) { - kprintf("DISK LNK_CONN terminated by remote\n"); - xa->iocom.conn_state = NULL; - kdmsg_msg_reply(msg, 0); - } return(0); } static int -xa_msg_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg) +xdisk_detach(struct xdisk_attach_ioctl *xaioc) { - if ((state->txcmd & DMSGF_DELETE) == 0 && - (msg->any.head.cmd & DMSGF_DELETE)) { - kprintf("SPAN REPLY - Our sent span was terminated by the " - "remote %08x state %p\n", msg->any.head.cmd, state); - kdmsg_msg_reply(msg, 0); + struct xa_softc *xa; + + lwkt_gettoken(&xdisk_token); + for (;;) { + TAILQ_FOREACH(xa, &xa_queue, entry) { + if (strcmp(xa->iocom.auto_lnk_conn.fs_label, + xaioc->fs_label) == 0) { + break; + } + } + if (xa == NULL || xa->serializing == 0) { + xa->serializing = 1; + break; + } + tsleep(xa, 0, "xadet", hz / 10); } - return (0); + if (xa) { + kprintf("DETACHING XA\n"); + kdmsg_iocom_uninit(&xa->iocom); + xa->serializing = 0; + } + lwkt_reltoken(&xdisk_token); + return(0); } /* @@ -345,85 +440,122 @@ xa_exit(kdmsg_iocom_t *iocom) kprintf("XA_EXIT UNIT %d\n", xa->unit); - kdmsg_iocom_uninit(iocom); - - while (xa->inprog) { - tsleep(xa, 0, "xarace", hz); - } + if (xa->serializing == 0) + kdmsg_iocom_uninit(iocom); /* - * XXX allow reconnection, wait for users to terminate? + * If the drive is not in use and no longer attach it can be + * destroyed. */ - - disk_destroy(&xa->disk); - lwkt_gettoken(&xdisk_token); - TAILQ_REMOVE(&xa_queue, xa, entry); + xa->attached = 0; + xa_terminate_check(xa); lwkt_reltoken(&xdisk_token); +} +/* + * Determine if we can destroy the xa_softc. + * + * Called with xdisk_token held. + */ +static +void +xa_terminate_check(struct xa_softc *xa) +{ + xa_tag_t *tag; + + if (xa->opencnt || xa->attached || xa->serializing) + return; + xa->serializing = 1; + kprintf("TERMINATE XA %p %d\n", xa, xa->unit); + kdmsg_iocom_uninit(&xa->iocom); + if (xa->dev) { + disk_destroy(&xa->disk); + xa->dev->si_drv1 = NULL; + xa->dev = NULL; + } + kprintf("REMOVEQ XA %p %d\n", xa, xa->unit); + KKASSERT(xa->opencnt == 0 && xa->attached == 0); + kprintf("IOCOMUN XA %p %d\n", xa, xa->unit); + while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) { + TAILQ_REMOVE(&xa->tag_freeq, tag, entry); + tag->xa = NULL; + kfree(tag, M_XDISK); + } + KKASSERT(TAILQ_EMPTY(&xa->tag_pendq)); + TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */ kfree(xa, M_XDISK); + kprintf("xa_close: destroy unreferenced disk\n"); } -static int -xa_lnk_rcvmsg(kdmsg_msg_t *msg) +/* + * Shim to catch and record virtual circuit events. + */ +static void +xa_autodmsg(kdmsg_msg_t *msg) { - switch(msg->any.head.cmd & DMSGF_TRANSMASK) { - case DMSG_LNK_CONN | DMSGF_CREATE: - /* - * connection request from peer, send a streaming - * result of 0 (leave the transaction open). Transaction - * is left open for the duration of the connection, we - * let the kern_dmsg module clean it up on disconnect. - */ - kdmsg_msg_result(msg, 0); - break; - case DMSG_LNK_SPAN | DMSGF_CREATE: - /* - * Incoming SPAN - transaction create - * - * We do not have to respond right now. Instead we will - * respond later on when the peer deletes their side. - */ + struct xa_softc *xa = msg->iocom->handle; + kdmsg_circuit_t *circ; + kdmsg_circuit_t *cscan; + uint32_t xcmd; + + /* + * Because this is just a shim we don't have a state callback for + * the transactions we are sniffing, so make things easier by + * calculating the original command along with the current message's + * flags. This is because transactions are made up of numerous + * messages and only the first typically specifies the actual command. + */ + if (msg->state) { + xcmd = msg->state->icmd | + (msg->any.head.cmd & (DMSGF_CREATE | + DMSGF_DELETE | + DMSGF_REPLY)); + } else { + xcmd = msg->any.head.cmd; + } + + /* + * Add or remove a circuit, sorted by weight (lower numbers are + * better). + */ + switch(xcmd) { + case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY: + kprintf("XA: Received autodmsg: CREATE+REPLY\n"); + circ = msg->state->any.circ; + lwkt_gettoken(&xa->tok); + if (circ->recorded == 0) { + TAILQ_FOREACH(cscan, &xa->circq, entry) { + if (circ->weight < cscan->weight) + break; + } + if (cscan) + TAILQ_INSERT_BEFORE(cscan, circ, entry); + else + TAILQ_INSERT_TAIL(&xa->circq, circ, entry); + circ->recorded = 1; + } + lwkt_reltoken(&xa->tok); break; - case DMSG_LNK_SPAN | DMSGF_DELETE: - /* - * Incoming SPAN - transaction delete. - * - * We must terminate our side so both ends can free up - * their recorded state. - */ - /* fall through */ - case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE: - /* - * Incoming SPAN - transaction delete (degenerate span). - * - * We must terminate our side so both ends can free up - * their recorded state. - */ - kdmsg_msg_reply(msg, 0); + case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY: + kprintf("XA: Received autodmsg: DELETE+REPLY\n"); + circ = msg->state->any.circ; + lwkt_gettoken(&xa->tok); + if (circ->recorded) { + TAILQ_REMOVE(&xa->circq, circ, entry); + circ->recorded = 0; + } + lwkt_reltoken(&xa->tok); break; default: - /* - * Unsupported LNK message received. We only need to - * reply if it's a transaction in order to close our end. - * Ignore any one-way messages are any further messages - * associated with the transaction. - * - * NOTE: This case also includes DMSG_LNK_ERROR messages - * which might be one-way, replying to those would - * cause an infinite ping-pong. - */ - if (msg->any.head.cmd & DMSGF_CREATE) - kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); break; } - return(0); } static int -xa_lnk_dbgmsg(kdmsg_msg_t *msg) +xa_rcvdmsg(kdmsg_msg_t *msg) { - switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { + switch(msg->any.head.cmd & DMSGF_TRANSMASK) { case DMSG_DBG_SHELL: /* * Execute shell command (not supported atm). @@ -450,22 +582,22 @@ xa_lnk_dbgmsg(kdmsg_msg_t *msg) break; default: /* - * We don't understand what is going on, issue a reply. - * This will take care of all left-over cases whether it - * is a transaction or one-way. + * Unsupported LNK message received. We only need to + * reply if it's a transaction in order to close our end. + * Ignore any one-way messages are any further messages + * associated with the transaction. + * + * NOTE: This case also includes DMSG_LNK_ERROR messages + * which might be one-way, replying to those would + * cause an infinite ping-pong. */ - kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); + if (msg->any.head.cmd & DMSGF_CREATE) + kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP); break; } return(0); } -static int -xa_adhoc_input(kdmsg_msg_t *msg) -{ - kprintf("XA ADHOC INPUT MSG %08x\n", msg->any.head.cmd); - return(0); -} /************************************************************************ * XA DEVICE INTERFACE * @@ -475,37 +607,129 @@ static int xa_open(struct dev_open_args *ap) { cdev_t dev = ap->a_head.a_dev; - struct xa_softc *xa; - - xa = dev->si_drv1; + xa_softc_t *xa; + xa_tag_t *tag; + kdmsg_msg_t *msg; + int error; dev->si_bsize_phys = 512; dev->si_bsize_best = 32768; /* - * Issue streaming open and wait for reply. + * Interlock open with opencnt, wait for attachment operations + * to finish. */ + lwkt_gettoken(&xdisk_token); +again: + xa = dev->si_drv1; + if (xa == NULL) { + lwkt_reltoken(&xdisk_token); + return ENXIO; /* raced destruction */ + } + if (xa->serializing) { + tsleep(xa, 0, "xarace", hz / 10); + goto again; + } - /* XXX check ap->a_oflags & FWRITE, EACCES if read-only */ + /* + * Serialize initial open + */ + if (xa->opencnt++ > 0) { + lwkt_reltoken(&xdisk_token); + return(0); + } + xa->serializing = 1; + lwkt_reltoken(&xdisk_token); - return(0); + kprintf("XA OPEN COMMAND\n"); + + tag = xa_setup_cmd(xa, NULL); + if (tag == NULL) { + lwkt_gettoken(&xdisk_token); + KKASSERT(xa->opencnt > 0); + --xa->opencnt; + xa->serializing = 0; + xa_terminate_check(xa); + lwkt_reltoken(&xdisk_token); + return(ENXIO); + } + msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit, + DMSG_BLK_OPEN | DMSGF_CREATE, + xa_sync_completion, tag); + msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR; + xa_start(tag, msg); + if (xa_wait(tag, 0) == 0) { + kprintf("XA OPEN GOOD\n"); + xa->keyid = tag->status.keyid; + xa->opentag = tag; /* leave tag open */ + xa->serializing = 0; + error = 0; + } else { + kprintf("XA OPEN BAD\n"); + xa_done(tag, 0); + lwkt_gettoken(&xdisk_token); + KKASSERT(xa->opencnt > 0); + --xa->opencnt; + xa->serializing = 0; + xa_terminate_check(xa); + lwkt_reltoken(&xdisk_token); + error = ENXIO; + } + return (error); } static int xa_close(struct dev_close_args *ap) { cdev_t dev = ap->a_head.a_dev; + xa_softc_t *xa; + xa_tag_t *tag; + + xa = dev->si_drv1; + if (xa == NULL) + return ENXIO; /* raced destruction */ + + lwkt_gettoken(&xa->tok); + if ((tag = xa->opentag) != NULL) { + xa->opentag = NULL; + kdmsg_state_reply(tag->state, DMSG_ERR_NOSUPP); + while (tag->done == 0) + xa_wait(tag, tag->waitseq); + xa_done(tag, 0); + } + lwkt_reltoken(&xa->tok); + + lwkt_gettoken(&xdisk_token); + KKASSERT(xa->opencnt > 0); + --xa->opencnt; + xa_terminate_check(xa); + lwkt_reltoken(&xdisk_token); + + return(0); } static int xa_strategy(struct dev_strategy_args *ap) { + xa_softc_t *xa = ap->a_head.a_dev->si_drv1; + xa_tag_t *tag; + struct bio *bio = ap->a_bio; + + bio->bio_buf->b_error = ENXIO; + bio->bio_buf->b_flags |= B_ERROR; + biodone(bio); + return(0); + + tag = xa_setup_cmd(xa, bio); + if (tag) + xa_start(tag, NULL); + return(0); } static int xa_ioctl(struct dev_ioctl_args *ap) { - return (ENOTTY); + return(ENOTTY); } static int @@ -515,8 +739,259 @@ xa_size(struct dev_psize_args *ap) if ((xa = ap->a_head.a_dev->si_drv1) == NULL) return (ENXIO); - if (xa->inprog) - return (ENXIO); ap->a_result = xa->info.d_media_blocks; return (0); } + +/************************************************************************ + * XA BLOCK PROTOCOL STATE MACHINE * + ************************************************************************ + * + * Implement tag/msg setup and related functions. + */ +static xa_tag_t * +xa_setup_cmd(xa_softc_t *xa, struct bio *bio) +{ + kdmsg_circuit_t *circ; + xa_tag_t *tag; + + /* + * Only get a tag if we have a valid virtual circuit to the server. + */ + lwkt_gettoken(&xa->tok); + if ((circ = TAILQ_FIRST(&xa->circq)) == NULL) { + tag = NULL; + } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) { + TAILQ_REMOVE(&xa->tag_freeq, tag, entry); + TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry); + tag->bio = bio; + tag->circuit = circ->circ_state->msgid; + } + + /* + * If we can't dispatch now and this is a bio, queue it for later. + */ + if (tag == NULL && bio) { + TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act); + } + lwkt_reltoken(&xa->tok); + + return (tag); +} + +static void +xa_start(xa_tag_t *tag, kdmsg_msg_t *msg) +{ + xa_softc_t *xa = tag->xa; + + if (msg == NULL) { + struct bio *bio; + struct buf *bp; + + KKASSERT(tag->bio); + bio = tag->bio; + bp = bio->bio_buf; + + switch(bp->b_cmd) { + case BUF_CMD_READ: + msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit, + DMSG_BLK_READ | + DMSGF_CREATE | DMSGF_DELETE, + xa_bio_completion, tag); + msg->any.blk_read.keyid = xa->keyid; + msg->any.blk_read.offset = bio->bio_offset; + msg->any.blk_read.bytes = bp->b_bcount; + break; + case BUF_CMD_WRITE: + msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit, + DMSG_BLK_WRITE | + DMSGF_CREATE | DMSGF_DELETE, + xa_bio_completion, tag); + msg->any.blk_write.keyid = xa->keyid; + msg->any.blk_write.offset = bio->bio_offset; + msg->any.blk_write.bytes = bp->b_bcount; + msg->aux_data = bp->b_data; + msg->aux_size = bp->b_bcount; + break; + case BUF_CMD_FLUSH: + msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit, + DMSG_BLK_FLUSH | + DMSGF_CREATE | DMSGF_DELETE, + xa_bio_completion, tag); + msg->any.blk_flush.keyid = xa->keyid; + msg->any.blk_flush.offset = bio->bio_offset; + msg->any.blk_flush.bytes = bp->b_bcount; + break; + case BUF_CMD_FREEBLKS: + msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit, + DMSG_BLK_FREEBLKS | + DMSGF_CREATE | DMSGF_DELETE, + xa_bio_completion, tag); + msg->any.blk_freeblks.keyid = xa->keyid; + msg->any.blk_freeblks.offset = bio->bio_offset; + msg->any.blk_freeblks.bytes = bp->b_bcount; + break; + default: + bp->b_flags |= B_ERROR; + bp->b_error = EIO; + biodone(bio); + tag->bio = NULL; + break; + } + } + + tag->done = 0; + tag->waitseq = 0; + if (msg) { + tag->state = msg->state; + kdmsg_msg_write(msg); + } else { + xa_done(tag, 1); + } +} + +static uint32_t +xa_wait(xa_tag_t *tag, int seq) +{ + xa_softc_t *xa = tag->xa; + + lwkt_gettoken(&xa->tok); + while (tag->waitseq == seq) + tsleep(tag, 0, "xawait", 0); + lwkt_reltoken(&xa->tok); + return (tag->status.head.error); +} + +static void +xa_done(xa_tag_t *tag, int wasbio) +{ + xa_softc_t *xa = tag->xa; + struct bio *bio; + + KKASSERT(tag->msg == NULL); + KKASSERT(tag->bio == NULL); + tag->done = 1; + + lwkt_gettoken(&xa->tok); + if ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) { + TAILQ_REMOVE(&xa->bioq, bio, bio_act); + tag->bio = bio; + lwkt_reltoken(&xa->tok); + xa_start(tag, NULL); + } else { + TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry); + lwkt_reltoken(&xa->tok); + } +} + +static int +xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg) +{ + xa_tag_t *tag = state->any.any; + xa_softc_t *xa = tag->xa; + + switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { + case DMSG_LNK_ERROR | DMSGF_REPLY: + bzero(&tag->status, sizeof(tag->status)); + tag->status.head = msg->any.head; + break; + case DMSG_BLK_ERROR | DMSGF_REPLY: + tag->status = msg->any.blk_error; + break; + } + kprintf("XA_SYNC_COMPLETION ERROR %u RESID %u\n", + tag->status.head.error, tag->status.resid); + if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */ + kdmsg_msg_reply(msg, 0); /* terminate our side */ + tag->done = 1; + } + lwkt_gettoken(&xa->tok); + ++tag->waitseq; + lwkt_reltoken(&xa->tok); + + wakeup(tag); + + return (0); +} + +static int +xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg) +{ + xa_tag_t *tag = state->any.any; + /*xa_softc_t *xa = tag->xa;*/ + struct bio *bio; + struct buf *bp; + + /* + * Get the bio from the tag. If no bio is present we just do + * 'done' handling. + */ + if ((bio = tag->bio) == NULL) + goto handle_done; + bp = bio->bio_buf; + + /* + * Process return status + */ + switch(msg->any.head.cmd & DMSGF_CMDSWMASK) { + case DMSG_LNK_ERROR | DMSGF_REPLY: + bzero(&tag->status, sizeof(tag->status)); + tag->status.head = msg->any.head; + if (tag->status.head.error) + tag->status.resid = bp->b_bcount; + else + tag->status.resid = 0; + break; + case DMSG_BLK_ERROR | DMSGF_REPLY: + tag->status = msg->any.blk_error; + break; + } + kprintf("XA_BIO_COMPLETION ERROR %u RESID %u\n", + tag->status.head.error, tag->status.resid); + + /* + * Process bio completion + * + * For reads any returned data is zero-extended if necessary, so + * the server can short-cut any all-zeros reads if it desires. + */ + switch(bp->b_cmd) { + case BUF_CMD_READ: + if (msg->aux_data && msg->aux_size) { + if (msg->aux_size < bp->b_bcount) { + bcopy(msg->aux_data, bp->b_data, msg->aux_size); + bzero(bp->b_data + msg->aux_size, + bp->b_bcount - msg->aux_size); + } else { + bcopy(msg->aux_data, bp->b_data, bp->b_bcount); + } + } else { + bzero(bp->b_data, bp->b_bcount); + } + /* fall through */ + case BUF_CMD_WRITE: + case BUF_CMD_FLUSH: + case BUF_CMD_FREEBLKS: + default: + if (tag->status.resid > bp->b_bcount) + tag->status.resid = bp->b_bcount; + bp->b_resid = tag->status.resid; + if ((bp->b_error = tag->status.head.error) != 0) { + bp->b_flags |= B_ERROR; + } else { + bp->b_resid = 0; + } + biodone(bio); + tag->bio = NULL; + break; + } + + /* + * Handle completion of the transaction. If the bioq is not empty + * we can initiate another bio on the same tag. + */ +handle_done: + if (msg->any.head.cmd & DMSGF_DELETE) + xa_done(tag, 1); + return (0); +} diff --git a/sys/sys/xdiskioctl.h b/sys/sys/xdiskioctl.h index d4951750ed..678ce2acba 100644 --- a/sys/sys/xdiskioctl.h +++ b/sys/sys/xdiskioctl.h @@ -48,11 +48,15 @@ struct xdisk_attach_ioctl { int fd; int unit; - int64_t size; /* size of disk in bytes */ - char cl_label[64]; /* LNK_SPAN cl_label match */ - char serno[64]; /* serial number */ + int64_t bytes; /* size of disk in bytes */ + int blksize; + int unused01; + int32_t reserved02[8]; + char cl_label[64]; /* LNK_SPAN cl_label match (typ host/dev) */ + char fs_label[64]; /* LNK_SPAN fs_label match (typ serialno) */ }; #define XDISKIOCATTACH _IOWR('X', 0, struct xdisk_attach_ioctl) +#define XDISKIOCDETACH _IOWR('X', 1, struct xdisk_attach_ioctl) #endif