2 * Copyright (c) 2012-2014 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * This module allows disk devices to be created and associated with a
36 * communications pipe or socket. You open the device and issue an
37 * ioctl() to install a new disk along with its communications descriptor.
39 * All further communication occurs via the descriptor using the DMSG
40 * LNK_CONN, LNK_SPAN, and BLOCK protocols. The descriptor can be a
41 * direct connection to a remote machine's disk (in-kernenl), to a remote
42 * cluster controller, to the local cluster controller, etc.
44 * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45 * devices. These devices look like raw disks to the system.
47 #include <sys/param.h>
48 #include <sys/systm.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
58 #include <sys/queue.h>
62 #include <sys/kern_syscall.h>
65 #include <sys/xdiskioctl.h>
68 #include <sys/thread2.h>
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
76 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
77 "Number of active xdisk IOs");
78 static uint64_t xa_last;
79 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
80 "Offset of last xdisk IO");
86 TAILQ_ENTRY(xa_tag) entry;
88 dmsg_blk_error_t status;
96 typedef struct xa_tag xa_tag_t;
102 struct kdmsg_state_list spanq;
103 RB_ENTRY(xa_softc) rbnode;
105 struct devstat stats;
106 struct disk_info info;
115 char cl_label[64]; /* from LNK_SPAN cl_label (host/dev) */
116 char fs_label[64]; /* from LNK_SPAN fs_label (serno str) */
118 TAILQ_HEAD(, bio) bioq; /* pending BIOs */
119 TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
120 TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
124 typedef struct xa_softc xa_softc_t;
127 TAILQ_ENTRY(xa_iocom) entry;
132 typedef struct xa_iocom xa_iocom_t;
134 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
135 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
136 static struct xa_softc_tree xa_device_tree;
138 #define MAXTAGS 64 /* no real limit */
140 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
141 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
142 static void xaio_exit(kdmsg_iocom_t *iocom);
143 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
145 static void xa_terminate_check(struct xa_softc *sc);
147 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
148 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
149 static void xa_done(xa_tag_t *tag, int wasbio);
150 static void xa_release(xa_tag_t *tag, int wasbio);
151 static uint32_t xa_wait(xa_tag_t *tag);
152 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
153 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
154 static void xa_restart_deferred(xa_softc_t *sc);
156 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
159 * Control device, issue ioctls to create xa devices.
161 static d_open_t xdisk_open;
162 static d_close_t xdisk_close;
163 static d_ioctl_t xdisk_ioctl;
165 static struct dev_ops xdisk_ops = {
166 { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
167 .d_open = xdisk_open,
168 .d_close = xdisk_close,
169 .d_ioctl = xdisk_ioctl
175 static d_open_t xa_open;
176 static d_close_t xa_close;
177 static d_ioctl_t xa_ioctl;
178 static d_strategy_t xa_strategy;
179 static d_psize_t xa_size;
181 static struct dev_ops xa_ops = {
182 { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
187 .d_write = physwrite,
188 .d_strategy = xa_strategy,
192 static int xdisk_opencount;
193 static cdev_t xdisk_dev;
194 struct lock xdisk_lk;
195 static TAILQ_HEAD(, xa_iocom) xaiocomq;
198 * Module initialization
201 xdisk_modevent(module_t mod, int type, void *data)
205 TAILQ_INIT(&xaiocomq);
206 RB_INIT(&xa_device_tree);
207 lockinit(&xdisk_lk, "xdisk", 0, 0);
208 xdisk_dev = make_dev(&xdisk_ops, 0,
209 UID_ROOT, GID_WHEEL, 0600, "xdisk");
213 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
216 destroy_dev(xdisk_dev);
219 dev_ops_remove_all(&xdisk_ops);
220 dev_ops_remove_all(&xa_ops);
228 DEV_MODULE(xdisk, xdisk_modevent, 0);
231 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
233 return(strcmp(sc1->fs_label, sc2->fs_label));
240 xdisk_open(struct dev_open_args *ap)
242 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
244 lockmgr(&xdisk_lk, LK_RELEASE);
249 xdisk_close(struct dev_close_args *ap)
251 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
253 lockmgr(&xdisk_lk, LK_RELEASE);
258 xdisk_ioctl(struct dev_ioctl_args *ap)
264 error = xdisk_attach((void *)ap->a_data);
267 error = xdisk_detach((void *)ap->a_data);
276 /************************************************************************
278 ************************************************************************/
281 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
287 * Normalize ioctl params
289 fp = holdfp(curproc->p_fd, xaioc->fd, -1);
292 kprintf("xdisk_attach fp=%p\n", fp);
295 * See if the serial number is already present. If we are
296 * racing a termination the disk subsystem may still have
297 * duplicate entries not yet removed so we wait a bit and
300 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
302 xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
303 kdmsg_iocom_init(&xaio->iocom, xaio,
304 KDMSG_IOCOMF_AUTOCONN,
305 M_XDISK, xaio_rcvdmsg);
306 xaio->iocom.exit_func = xaio_exit;
308 kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
311 * Setup our LNK_CONN advertisement for autoinitiate.
313 * Our filter is setup to only accept PEER_BLOCK/SERVER
316 * We need a unique pfs_fsid to avoid confusion.
318 xaio->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
319 xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
320 xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
321 xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
322 xaio->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
323 ksnprintf(xaio->iocom.auto_lnk_conn.fs_label,
324 sizeof(xaio->iocom.auto_lnk_conn.fs_label),
326 kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1);
329 * Setup our LNK_SPAN advertisement for autoinitiate
331 TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
332 kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
334 lockmgr(&xdisk_lk, LK_RELEASE);
340 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
346 * Called from iocom core transmit thread upon disconnect.
350 xaio_exit(kdmsg_iocom_t *iocom)
352 xa_iocom_t *xaio = iocom->handle;
354 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
355 kprintf("xdisk_detach [xaio_exit()]\n");
356 TAILQ_REMOVE(&xaiocomq, xaio, entry);
357 lockmgr(&xdisk_lk, LK_RELEASE);
359 kdmsg_iocom_uninit(&xaio->iocom);
361 kfree(xaio, M_XDISK);
365 * Called from iocom core to handle messages that the iocom core does not
366 * handle itself and for which a state function callback has not yet been
369 * We primarily care about LNK_SPAN transactions here.
372 xaio_rcvdmsg(kdmsg_msg_t *msg)
374 kdmsg_state_t *state = msg->state;
375 xa_iocom_t *xaio = state->iocom->handle;
379 kprintf("xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
380 state, state->rxcmd, state->txcmd,
383 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
386 case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
388 * A LNK_SPAN transaction which is opened and closed
389 * degenerately is not useful to us, just ignore it.
391 kdmsg_msg_reply(msg, 0);
393 case DMSG_LNK_SPAN | DMSGF_CREATE:
395 * Manage the tracking node for the remote LNK_SPAN.
397 * Return a streaming result, leaving the transaction open
398 * in both directions to allow sub-transactions.
400 bcopy(msg->any.lnk_span.cl_label, xaio->dummysc.cl_label,
401 sizeof(xaio->dummysc.cl_label));
402 xaio->dummysc.cl_label[sizeof(xaio->dummysc.cl_label) - 1] = 0;
404 bcopy(msg->any.lnk_span.fs_label, xaio->dummysc.fs_label,
405 sizeof(xaio->dummysc.fs_label));
406 xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
408 kprintf("xdisk: LINK_SPAN state %p create for %s\n",
409 msg->state, msg->any.lnk_span.fs_label);
411 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
419 sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
420 bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
421 sizeof(sc->cl_label));
422 sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
423 bcopy(msg->any.lnk_span.fs_label, sc->fs_label,
424 sizeof(sc->fs_label));
425 sc->fs_label[sizeof(sc->fs_label) - 1] = 0;
427 /* XXX FIXME O(N^2) */
431 RB_FOREACH(sctmp, xa_softc_tree,
433 if (sctmp->unit == unit)
441 lockinit(&sc->lk, "xalk", 0, 0);
442 TAILQ_INIT(&sc->spanq);
443 TAILQ_INIT(&sc->bioq);
444 TAILQ_INIT(&sc->tag_freeq);
445 TAILQ_INIT(&sc->tag_pendq);
447 lockmgr(&sc->lk, LK_EXCLUSIVE);
448 RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
449 TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
450 msg->state->any.xa_sc = sc;
455 for (n = 0; n < MAXTAGS; ++n) {
456 tag = kmalloc(sizeof(*tag),
457 M_XDISK, M_WAITOK|M_ZERO);
459 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
462 if (sc->dev == NULL) {
463 dev = disk_create(unit, &sc->disk, &xa_ops);
466 devstat_add_entry(&sc->stats, "xa", unit,
468 DEVSTAT_NO_ORDERED_TAGS,
469 DEVSTAT_TYPE_DIRECT |
470 DEVSTAT_TYPE_IF_OTHER,
471 DEVSTAT_PRIORITY_OTHER);
474 sc->info.d_media_blksize =
475 msg->any.lnk_span.media.block.blksize;
476 if (sc->info.d_media_blksize <= 0)
477 sc->info.d_media_blksize = 1;
478 sc->info.d_media_blocks =
479 msg->any.lnk_span.media.block.bytes /
480 sc->info.d_media_blksize;
481 sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
482 sc->info.d_secpertrack = 32;
483 sc->info.d_nheads = 64;
484 sc->info.d_secpercyl = sc->info.d_secpertrack *
486 sc->info.d_ncylinders = 0;
488 sc->info.d_serialno = sc->fs_label;
490 * WARNING! disk_setdiskinfo() must be asynchronous
491 * because we are in the rxmsg thread. If
492 * it is synchronous and issues more disk
493 * I/Os, we will deadlock.
495 kprintf("xdisk: A1\n");
496 disk_setdiskinfo(&sc->disk, &sc->info);
497 xa_restart_deferred(sc); /* eats serializing */
498 lockmgr(&sc->lk, LK_RELEASE);
500 lockmgr(&sc->lk, LK_EXCLUSIVE);
502 kprintf("xdisk: A2 (%d) ser=%d otag=%p\n", sc->spancnt, sc->serializing, sc->open_tag);
503 TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
504 msg->state->any.xa_sc = sc;
505 if (sc->serializing == 0 && sc->open_tag == NULL) {
507 xa_restart_deferred(sc); /* eats serializing */
509 lockmgr(&sc->lk, LK_RELEASE);
510 if (sc->dev && sc->dev->si_disk) {
511 kprintf("reprobe\n");
512 disk_msg_send(DISK_DISK_REPROBE,
517 kprintf("xdisk: sc %p spancnt %d\n", sc, sc->spancnt);
518 kdmsg_msg_result(msg, 0);
520 case DMSG_LNK_SPAN | DMSGF_DELETE:
522 * Manage the tracking node for the remote LNK_SPAN.
524 * Return a final result, closing our end of the transaction.
526 sc = msg->state->any.xa_sc;
527 kprintf("xdisk: LINK_SPAN state %p delete for %s (sc=%p)\n",
528 msg->state, (sc ? sc->fs_label : "(null)"), sc);
529 lockmgr(&sc->lk, LK_EXCLUSIVE);
530 msg->state->any.xa_sc = NULL;
531 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
534 kprintf("xdisk: sc %p spancnt %d\n", sc, sc->spancnt);
537 * Spans can come and go as the graph stabilizes, so if
538 * we lose a span along with sc->open_tag we may be able
539 * to restart the I/Os on a different span.
542 sc->serializing == 0 && sc->open_tag == NULL) {
544 xa_restart_deferred(sc);
546 lockmgr(&sc->lk, LK_RELEASE);
547 kdmsg_msg_reply(msg, 0);
553 if (sc->spancnt == 0)
554 xa_terminate_check(sc);
557 case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
559 * Ignore unimplemented streaming replies on our LNK_SPAN
562 kprintf("xdisk: LINK_SPAN state %p delete+reply\n",
565 case DMSG_LNK_SPAN | DMSGF_REPLY:
567 * Ignore unimplemented streaming replies on our LNK_SPAN
570 kprintf("xdisk: LINK_SPAN state %p reply\n",
575 * Execute shell command (not supported atm).
577 * This is a one-way packet but if not (e.g. if part of
578 * a streaming transaction), we will have already closed
581 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
583 case DMSG_DBG_SHELL | DMSGF_REPLY:
585 * Receive one or more replies to a shell command
586 * that we sent. Just dump it to the console.
588 * This is a one-way packet but if not (e.g. if
589 * part of a streaming transaction), we will have
590 * already closed our end.
593 msg->aux_data[msg->aux_size - 1] = 0;
594 kprintf("xdisk: DEBUGMSG: %s\n",
600 * Unsupported one-way message, streaming message, or
603 * Terminate any unsupported transactions with an error
604 * and ignore any unsupported streaming messages.
606 * NOTE: This case also includes DMSG_LNK_ERROR messages
607 * which might be one-way, replying to those would
608 * cause an infinite ping-pong.
610 if (msg->any.head.cmd & DMSGF_CREATE)
611 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
614 lockmgr(&xdisk_lk, LK_RELEASE);
620 * Determine if we can destroy the xa_softc.
622 * Called with xdisk_lk held.
626 xa_terminate_check(struct xa_softc *sc)
631 * Determine if we can destroy the softc.
633 kprintf("xdisk: terminate check xa%d (%d,%d,%d) sc=%p ",
635 sc->opencnt, sc->serializing, sc->spancnt,
638 if (sc->opencnt || sc->serializing || sc->spancnt) {
639 kprintf("(leave intact)\n");
644 * Remove from device tree, a race with a new incoming span
645 * will create a new softc and disk.
647 RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
650 * Device has to go first to prevent device ops races.
653 disk_destroy(&sc->disk);
654 devstat_remove_entry(&sc->stats);
655 sc->dev->si_drv1 = NULL;
659 kprintf("(remove from tree)\n");
661 KKASSERT(sc->opencnt == 0);
662 KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
664 while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
665 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
673 /************************************************************************
674 * XA DEVICE INTERFACE *
675 ************************************************************************/
678 xa_open(struct dev_open_args *ap)
680 cdev_t dev = ap->a_head.a_dev;
684 dev->si_bsize_phys = 512;
685 dev->si_bsize_best = 32768;
688 * Interlock open with opencnt, wait for attachment operations
691 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
695 lockmgr(&xdisk_lk, LK_RELEASE);
696 return ENXIO; /* raced destruction */
698 if (sc->serializing) {
699 tsleep(sc, 0, "xarace", hz / 10);
705 * Serialize initial open
707 if (sc->opencnt++ > 0) {
710 lockmgr(&xdisk_lk, LK_RELEASE);
715 * Issue BLK_OPEN if necessary. ENXIO is returned if we have trouble.
717 if (sc->open_tag == NULL) {
718 xa_restart_deferred(sc); /* eats serializing */
723 lockmgr(&xdisk_lk, LK_RELEASE);
726 * Wait for completion of the BLK_OPEN
728 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
729 while (sc->serializing)
730 lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
732 error = sc->last_error;
734 KKASSERT(sc->opencnt > 0);
736 xa_terminate_check(sc);
737 sc = NULL; /* sc may be invalid now */
739 lockmgr(&xdisk_lk, LK_RELEASE);
745 xa_close(struct dev_close_args *ap)
747 cdev_t dev = ap->a_head.a_dev;
753 return ENXIO; /* raced destruction */
754 lockmgr(&xdisk_lk, LK_EXCLUSIVE);
755 lockmgr(&sc->lk, LK_EXCLUSIVE);
758 * NOTE: Clearing open_tag allows a concurrent open to re-open
759 * the device and prevents autonomous completion of the tag.
761 if (sc->opencnt == 1 && sc->open_tag) {
764 lockmgr(&sc->lk, LK_RELEASE);
765 kdmsg_state_reply(tag->state, 0); /* close our side */
766 xa_wait(tag); /* wait on remote */
768 lockmgr(&sc->lk, LK_RELEASE);
770 KKASSERT(sc->opencnt > 0);
772 xa_terminate_check(sc);
773 lockmgr(&xdisk_lk, LK_RELEASE);
779 xa_strategy(struct dev_strategy_args *ap)
781 xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
783 struct bio *bio = ap->a_bio;
785 devstat_start_transaction(&sc->stats);
786 atomic_add_int(&xa_active, 1);
787 xa_last = bio->bio_offset;
789 lockmgr(&sc->lk, LK_EXCLUSIVE);
790 tag = xa_setup_cmd(sc, bio);
792 xa_start(tag, NULL, 1);
793 lockmgr(&sc->lk, LK_RELEASE);
799 xa_ioctl(struct dev_ioctl_args *ap)
805 xa_size(struct dev_psize_args *ap)
809 if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
811 ap->a_result = sc->info.d_media_blocks;
815 /************************************************************************
816 * XA BLOCK PROTOCOL STATE MACHINE *
817 ************************************************************************
819 * Implement tag/msg setup and related functions.
820 * Called with sc->lk held.
823 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
828 * Only get a tag if we have a valid virtual circuit to the server.
830 if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
831 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
833 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
837 * If we can't dispatch now and this is a bio, queue it for later.
839 if (tag == NULL && bio) {
840 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
847 * Called with sc->lk held
850 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
852 xa_softc_t *sc = tag->sc;
860 kdmsg_state_t *trans;
862 if (sc->opencnt == 0 || sc->open_tag == NULL) {
863 TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
864 if ((trans->rxcmd & DMSGF_DELETE) == 0)
868 trans = sc->open_tag->state;
879 msg = kdmsg_msg_alloc(trans,
883 xa_bio_completion, tag);
884 msg->any.blk_read.keyid = sc->keyid;
885 msg->any.blk_read.offset = bio->bio_offset;
886 msg->any.blk_read.bytes = bp->b_bcount;
889 msg = kdmsg_msg_alloc(trans,
891 DMSGF_CREATE | DMSGF_DELETE,
892 xa_bio_completion, tag);
893 msg->any.blk_write.keyid = sc->keyid;
894 msg->any.blk_write.offset = bio->bio_offset;
895 msg->any.blk_write.bytes = bp->b_bcount;
896 msg->aux_data = bp->b_data;
897 msg->aux_size = bp->b_bcount;
900 msg = kdmsg_msg_alloc(trans,
902 DMSGF_CREATE | DMSGF_DELETE,
903 xa_bio_completion, tag);
904 msg->any.blk_flush.keyid = sc->keyid;
905 msg->any.blk_flush.offset = bio->bio_offset;
906 msg->any.blk_flush.bytes = bp->b_bcount;
908 case BUF_CMD_FREEBLKS:
909 msg = kdmsg_msg_alloc(trans,
911 DMSGF_CREATE | DMSGF_DELETE,
912 xa_bio_completion, tag);
913 msg->any.blk_freeblks.keyid = sc->keyid;
914 msg->any.blk_freeblks.offset = bio->bio_offset;
915 msg->any.blk_freeblks.bytes = bp->b_bcount;
918 bp->b_flags |= B_ERROR;
920 devstat_end_transaction_buf(&sc->stats, bp);
921 atomic_add_int(&xa_active, -1);
929 * If no msg was allocated this ia failure
933 tag->state = msg->state;
934 kdmsg_msg_write(msg);
936 lockmgr(&sc->lk, LK_RELEASE);
937 tag->status.head.error = DMSG_ERR_IO;
939 lockmgr(&sc->lk, LK_EXCLUSIVE);
944 xa_wait(xa_tag_t *tag)
946 xa_softc_t *sc = tag->sc;
949 lockmgr(&sc->lk, LK_EXCLUSIVE);
951 while (tag->done == 0)
952 lksleep(tag, &sc->lk, 0, "xawait", 0);
953 lockmgr(&sc->lk, LK_RELEASE);
955 error = tag->status.head.error;
963 xa_done(xa_tag_t *tag, int wasbio)
965 KKASSERT(tag->bio == NULL);
972 xa_release(tag, wasbio);
976 * Release a tag. If everything looks ok and there are pending BIOs
977 * (due to all tags in-use), we can use the tag to start the next BIO.
978 * Do not try to restart if the connection is currently failed.
982 xa_release(xa_tag_t *tag, int wasbio)
984 xa_softc_t *sc = tag->sc;
987 if ((bio = tag->bio) != NULL) {
988 struct buf *bp = bio->bio_buf;
991 bp->b_flags |= B_ERROR;
992 devstat_end_transaction_buf(&sc->stats, bp);
993 atomic_add_int(&xa_active, -1);
998 lockmgr(&sc->lk, LK_EXCLUSIVE);
1000 if (wasbio && sc->open_tag &&
1001 (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1002 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1004 xa_start(tag, NULL, 1);
1006 TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1007 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1009 lockmgr(&sc->lk, LK_RELEASE);
1013 * Handle messages under the BLKOPEN transaction.
1016 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1018 xa_tag_t *tag = state->any.any;
1023 * If the tag has been cleaned out we already closed our side
1024 * of the transaction and we are waiting for the other side to
1027 kprintf("xa_sync_completion: tag %p msg %08x state %p\n",
1028 tag, msg->any.head.cmd, msg->state);
1031 if (msg->any.head.cmd & DMSGF_CREATE)
1032 kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1040 lockmgr(&sc->lk, LK_EXCLUSIVE);
1043 * Handle initial response to our open and restart any deferred
1046 * NOTE: DELETE may also be set.
1048 if (msg->any.head.cmd & DMSGF_CREATE) {
1049 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1050 case DMSG_LNK_ERROR | DMSGF_REPLY:
1051 bzero(&tag->status, sizeof(tag->status));
1052 tag->status.head = msg->any.head;
1054 case DMSG_BLK_ERROR | DMSGF_REPLY:
1055 tag->status = msg->any.blk_error;
1058 sc->last_error = tag->status.head.error;
1059 kprintf("xdisk: blk_open completion status %d\n",
1061 if (sc->last_error == 0) {
1062 while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1063 tag = xa_setup_cmd(sc, NULL);
1066 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1068 xa_start(tag, NULL, 1);
1071 sc->serializing = 0;
1076 * Handle unexpected termination (or lost comm channel) from other
1077 * side. Autonomous completion only if open_tag matches,
1078 * otherwise another thread is probably waiting on the tag.
1080 * (see xa_close() for other interactions)
1082 if (msg->any.head.cmd & DMSGF_DELETE) {
1083 kdmsg_state_reply(tag->state, 0);
1084 if (sc->open_tag == tag) {
1085 sc->open_tag = NULL;
1092 lockmgr(&sc->lk, LK_RELEASE);
1098 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1100 xa_tag_t *tag = state->any.any;
1101 xa_softc_t *sc = tag->sc;
1106 * Get the bio from the tag. If no bio is present we just do
1109 if ((bio = tag->bio) == NULL)
1114 * Process return status
1116 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1117 case DMSG_LNK_ERROR | DMSGF_REPLY:
1118 bzero(&tag->status, sizeof(tag->status));
1119 tag->status.head = msg->any.head;
1120 if (tag->status.head.error)
1121 tag->status.resid = bp->b_bcount;
1123 tag->status.resid = 0;
1125 case DMSG_BLK_ERROR | DMSGF_REPLY:
1126 tag->status = msg->any.blk_error;
1131 * If the device is open stall the bio on DMSG errors. If an
1132 * actual I/O error occured on the remote device, DMSG_ERR_IO
1135 if (tag->status.head.error &&
1136 (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1137 if (tag->status.head.error != DMSG_ERR_IO)
1142 * Process bio completion
1144 * For reads any returned data is zero-extended if necessary, so
1145 * the server can short-cut any all-zeros reads if it desires.
1149 if (msg->aux_data && msg->aux_size) {
1150 if (msg->aux_size < bp->b_bcount) {
1151 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1152 bzero(bp->b_data + msg->aux_size,
1153 bp->b_bcount - msg->aux_size);
1155 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1158 bzero(bp->b_data, bp->b_bcount);
1163 case BUF_CMD_FREEBLKS:
1165 if (tag->status.resid > bp->b_bcount)
1166 tag->status.resid = bp->b_bcount;
1167 bp->b_resid = tag->status.resid;
1168 if (tag->status.head.error != 0) {
1170 bp->b_flags |= B_ERROR;
1174 devstat_end_transaction_buf(&sc->stats, bp);
1175 atomic_add_int(&xa_active, -1);
1182 * Handle completion of the transaction. If the bioq is not empty
1183 * we can initiate another bio on the same tag.
1185 * NOTE: Most of our transactions will be single-message
1186 * CREATE+DELETEs, so we won't have to terminate the
1187 * transaction separately, here. But just in case they
1188 * aren't be sure to terminate the transaction.
1191 if (msg->any.head.cmd & DMSGF_DELETE) {
1193 if ((state->txcmd & DMSGF_DELETE) == 0)
1194 kdmsg_msg_reply(msg, 0);
1199 * Handle the case where the transaction failed due to a
1200 * connectivity issue. The tag is put away with wasbio=0
1201 * and we put the BIO back onto the bioq for a later restart.
1203 * probe I/Os (where the device is not open) will be failed
1204 * instead of requeued.
1208 if (bio->bio_buf->b_flags & B_FAILONDIS) {
1209 kprintf("xa_strategy: disconnected, fail bp %p\n",
1211 bio->bio_buf->b_error = ENXIO;
1212 bio->bio_buf->b_flags |= B_ERROR;
1215 kprintf("BIO CIRC FAILURE, FAIL BIO %p\n", bio);
1217 kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1220 if ((state->txcmd & DMSGF_DELETE) == 0)
1221 kdmsg_msg_reply(msg, 0);
1227 lockmgr(&sc->lk, LK_EXCLUSIVE);
1228 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1229 lockmgr(&sc->lk, LK_RELEASE);
1235 * Restart as much deferred I/O as we can. The serializer is set and we
1236 * eat it (clear it) when done.
1238 * Called with sc->lk held
1242 xa_restart_deferred(xa_softc_t *sc)
1244 kdmsg_state_t *span;
1249 KKASSERT(sc->serializing);
1252 * Determine if a restart is needed.
1254 if (sc->opencnt == 0) {
1256 * Device is not open, nothing to do, eat serializing.
1258 sc->serializing = 0;
1260 } else if (sc->open_tag == NULL) {
1262 * BLK_OPEN required before we can restart any BIOs.
1263 * Select the best LNK_SPAN to issue the BLK_OPEN under.
1265 * serializing interlocks waiting open()s.
1268 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1269 if ((span->rxcmd & DMSGF_DELETE) == 0)
1276 tag = xa_setup_cmd(sc, NULL);
1282 msg = kdmsg_msg_alloc(span,
1285 xa_sync_completion, tag);
1286 msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1287 kprintf("xdisk: BLK_OPEN tag %p state %p "
1289 tag, msg->state, span);
1290 xa_start(tag, msg, 0);
1293 sc->serializing = 0;
1296 /* else leave serializing set until BLK_OPEN response */
1299 sc->serializing = 0;