2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
35 * This module allows disk devices to be created and associated with a
36 * communications pipe or socket. You open the device and issue an
37 * ioctl() to install a new disk along with its communications descriptor.
39 * All further communication occurs via the descriptor using the DMSG
40 * LNK_CONN, LNK_SPAN, and BLOCK protocols. The descriptor can be a
41 * direct connection to a remote machine's disk (in-kernenl), to a remote
42 * cluster controller, to the local cluster controller, etc.
44 * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45 * devices. These devices look like raw disks to the system.
48 * Handle circuit disconnects, leave bio's pending
49 * Restart bio's on circuit reconnect.
51 #include <sys/param.h>
52 #include <sys/systm.h>
55 #include <sys/device.h>
56 #include <sys/devicestat.h>
58 #include <sys/kernel.h>
59 #include <sys/malloc.h>
60 #include <sys/sysctl.h>
62 #include <sys/queue.h>
65 #include <sys/kern_syscall.h>
68 #include <sys/xdiskioctl.h>
71 #include <sys/thread2.h>
76 TAILQ_ENTRY(xa_tag) entry;
78 dmsg_blk_error_t status;
80 kdmsg_circuit_t *circ;
82 int running; /* transaction running */
83 int waitseq; /* streaming reply */
84 int done; /* final (transaction closed) */
87 typedef struct xa_tag xa_tag_t;
90 TAILQ_ENTRY(xa_softc) entry;
93 struct xdisk_attach_ioctl xaioc;
94 struct disk_info info;
103 TAILQ_HEAD(, bio) bioq;
104 TAILQ_HEAD(, xa_tag) tag_freeq;
105 TAILQ_HEAD(, xa_tag) tag_pendq;
106 TAILQ_HEAD(, kdmsg_circuit) circq;
107 struct lwkt_token tok;
110 typedef struct xa_softc xa_softc_t;
112 #define MAXTAGS 64 /* no real limit */
114 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
115 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
116 static void xa_exit(kdmsg_iocom_t *iocom);
117 static void xa_terminate_check(struct xa_softc *xa);
118 static int xa_rcvdmsg(kdmsg_msg_t *msg);
119 static void xa_autodmsg(kdmsg_msg_t *msg);
121 static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio);
122 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg);
123 static uint32_t xa_wait(xa_tag_t *tag, int seq);
124 static void xa_done(xa_tag_t *tag, int wasbio);
125 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
126 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
127 static void xa_restart_deferred(xa_softc_t *xa);
129 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
132 * Control device, issue ioctls to create xa devices.
134 static d_open_t xdisk_open;
135 static d_close_t xdisk_close;
136 static d_ioctl_t xdisk_ioctl;
138 static struct dev_ops xdisk_ops = {
139 { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
140 .d_open = xdisk_open,
141 .d_close = xdisk_close,
142 .d_ioctl = xdisk_ioctl
148 static d_open_t xa_open;
149 static d_close_t xa_close;
150 static d_ioctl_t xa_ioctl;
151 static d_strategy_t xa_strategy;
152 static d_psize_t xa_size;
154 static struct dev_ops xa_ops = {
155 { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
160 .d_write = physwrite,
161 .d_strategy = xa_strategy,
165 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
166 static int xdisk_opencount;
167 static cdev_t xdisk_dev;
168 static TAILQ_HEAD(, xa_softc) xa_queue;
171 * Module initialization
174 xdisk_modevent(module_t mod, int type, void *data)
178 TAILQ_INIT(&xa_queue);
179 xdisk_dev = make_dev(&xdisk_ops, 0,
180 UID_ROOT, GID_WHEEL, 0600, "xdisk");
184 if (xdisk_opencount || TAILQ_FIRST(&xa_queue))
187 destroy_dev(xdisk_dev);
190 dev_ops_remove_all(&xdisk_ops);
191 dev_ops_remove_all(&xa_ops);
199 DEV_MODULE(xdisk, xdisk_modevent, 0);
205 xdisk_open(struct dev_open_args *ap)
207 lwkt_gettoken(&xdisk_token);
209 lwkt_reltoken(&xdisk_token);
214 xdisk_close(struct dev_close_args *ap)
216 lwkt_gettoken(&xdisk_token);
218 lwkt_reltoken(&xdisk_token);
223 xdisk_ioctl(struct dev_ioctl_args *ap)
229 error = xdisk_attach((void *)ap->a_data);
232 error = xdisk_detach((void *)ap->a_data);
241 /************************************************************************
243 ************************************************************************/
246 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
257 * Normalize ioctl params
259 fp = holdfp(curproc->p_fd, xaioc->fd, -1);
262 if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0)
264 if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0)
266 if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE)
270 * See if the serial number is already present. If we are
271 * racing a termination the disk subsystem may still have
272 * duplicate entries not yet removed so we wait a bit and
275 lwkt_gettoken(&xdisk_token);
277 TAILQ_FOREACH(xa, &xa_queue, entry) {
278 if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
279 xaioc->fs_label) == 0) {
280 if (xa->serializing) {
281 tsleep(xa, 0, "xadelay", hz / 10);
285 kdmsg_iocom_uninit(&xa->iocom);
291 * Create a new xa if not already present
296 TAILQ_FOREACH(xa, &xa_queue, entry) {
297 if (xa->unit == unit)
304 xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
307 lwkt_token_init(&xa->tok, "xa");
308 TAILQ_INIT(&xa->circq);
309 TAILQ_INIT(&xa->bioq);
310 TAILQ_INIT(&xa->tag_freeq);
311 TAILQ_INIT(&xa->tag_pendq);
312 for (n = 0; n < MAXTAGS; ++n) {
313 tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO);
315 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
317 TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
323 * (xa) is now serializing.
327 lwkt_reltoken(&xdisk_token);
332 if (xa->dev == NULL) {
333 dev = disk_create(unit, &xa->disk, &xa_ops);
338 xa->info.d_media_blksize = xaioc->blksize;
339 xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize;
340 xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
341 xa->info.d_secpertrack = 32;
342 xa->info.d_nheads = 64;
343 xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads;
344 xa->info.d_ncylinders = 0;
345 if (xa->xaioc.fs_label[0])
346 xa->info.d_serialno = xa->xaioc.fs_label;
349 * Set up messaging connection
351 ksnprintf(devname, sizeof(devname), "xa%d", unit);
352 kdmsg_iocom_init(&xa->iocom, xa,
353 KDMSG_IOCOMF_AUTOCONN |
354 KDMSG_IOCOMF_AUTORXSPAN |
355 KDMSG_IOCOMF_AUTOTXSPAN |
356 KDMSG_IOCOMF_AUTORXCIRC |
357 KDMSG_IOCOMF_AUTOTXCIRC,
358 M_XDISK, xa_rcvdmsg);
359 xa->iocom.exit_func = xa_exit;
361 kdmsg_iocom_reconnect(&xa->iocom, fp, devname);
364 * Setup our LNK_CONN advertisement for autoinitiate.
366 * Our filter is setup to only accept PEER_BLOCK/SERVER
369 xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
370 xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
371 xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
372 xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
373 xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
374 ksnprintf(xa->iocom.auto_lnk_conn.cl_label,
375 sizeof(xa->iocom.auto_lnk_conn.cl_label),
376 "%s", xaioc->cl_label);
379 * We need a unique pfs_fsid to avoid confusion.
380 * We supply a rendezvous fs_label using the serial number.
382 kern_uuidgen(&xa->pfs_fsid, 1);
383 xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid;
384 ksnprintf(xa->iocom.auto_lnk_conn.fs_label,
385 sizeof(xa->iocom.auto_lnk_conn.fs_label),
386 "%s", xaioc->fs_label);
389 * Setup our LNK_SPAN advertisement for autoinitiate
391 xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT;
392 xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
393 xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
394 ksnprintf(xa->iocom.auto_lnk_span.cl_label,
395 sizeof(xa->iocom.auto_lnk_span.cl_label),
396 "%s", xa->xaioc.cl_label);
398 kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg);
399 disk_setdiskinfo_sync(&xa->disk, &xa->info);
401 lwkt_gettoken(&xdisk_token);
403 xa_terminate_check(xa);
404 lwkt_reltoken(&xdisk_token);
410 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
414 lwkt_gettoken(&xdisk_token);
416 TAILQ_FOREACH(xa, &xa_queue, entry) {
417 if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
418 xaioc->fs_label) == 0) {
422 if (xa == NULL || xa->serializing == 0) {
426 tsleep(xa, 0, "xadet", hz / 10);
429 kdmsg_iocom_uninit(&xa->iocom);
432 lwkt_reltoken(&xdisk_token);
437 * Called from iocom core transmit thread upon disconnect.
441 xa_exit(kdmsg_iocom_t *iocom)
443 struct xa_softc *xa = iocom->handle;
445 lwkt_gettoken(&xa->tok);
446 lwkt_gettoken(&xdisk_token);
449 * We must wait for any I/O's to complete to ensure that all
450 * state structure references are cleaned up before returning.
452 xa->attached = -1; /* force deferral or failure */
453 while (TAILQ_FIRST(&xa->tag_pendq)) {
454 tsleep(xa, 0, "xabiow", hz / 10);
458 * All serializing code checks for de-initialization so only
459 * do it if we aren't already serializing.
461 if (xa->serializing == 0) {
463 kdmsg_iocom_uninit(iocom);
468 * If the drive is not in use and no longer attach it can be
472 xa_terminate_check(xa);
473 lwkt_reltoken(&xdisk_token);
474 lwkt_reltoken(&xa->tok);
478 * Determine if we can destroy the xa_softc.
480 * Called with xdisk_token held.
484 xa_terminate_check(struct xa_softc *xa)
489 if (xa->opencnt || xa->attached || xa->serializing)
492 kdmsg_iocom_uninit(&xa->iocom);
495 * When destroying an xa make sure all pending I/O (typically
496 * from the disk probe) is done.
498 * XXX what about new I/O initiated prior to disk_destroy().
500 while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
501 TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
502 if ((bio = tag->bio) != NULL) {
504 bio->bio_buf->b_error = ENXIO;
505 bio->bio_buf->b_flags |= B_ERROR;
508 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
511 disk_destroy(&xa->disk);
512 xa->dev->si_drv1 = NULL;
515 KKASSERT(xa->opencnt == 0 && xa->attached == 0);
516 while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
517 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
521 KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
522 TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
527 * Shim to catch and record virtual circuit events.
530 xa_autodmsg(kdmsg_msg_t *msg)
532 xa_softc_t *xa = msg->iocom->handle;
534 kdmsg_circuit_t *circ;
535 kdmsg_circuit_t *cscan;
539 * Because this is just a shim we don't have a state callback for
540 * the transactions we are sniffing, so make things easier by
541 * calculating the original command along with the current message's
542 * flags. This is because transactions are made up of numerous
543 * messages and only the first typically specifies the actual command.
546 xcmd = msg->state->icmd |
547 (msg->any.head.cmd & (DMSGF_CREATE |
551 xcmd = msg->any.head.cmd;
555 * Add or remove a circuit, sorted by weight (lower numbers are
559 case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
561 * Track established circuits
563 circ = msg->state->any.circ;
564 lwkt_gettoken(&xa->tok);
565 if (circ->recorded == 0) {
566 TAILQ_FOREACH(cscan, &xa->circq, entry) {
567 if (circ->weight < cscan->weight)
571 TAILQ_INSERT_BEFORE(cscan, circ, entry);
573 TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
578 * Restart any deferred I/O.
580 xa_restart_deferred(xa);
581 lwkt_reltoken(&xa->tok);
583 case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
585 * Losing virtual circuit. Remove the circ from contention.
587 circ = msg->state->any.circ;
588 lwkt_gettoken(&xa->tok);
589 if (circ->recorded) {
590 TAILQ_REMOVE(&xa->circq, circ, entry);
593 xa_restart_deferred(xa);
594 lwkt_reltoken(&xa->tok);
602 xa_rcvdmsg(kdmsg_msg_t *msg)
604 switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
607 * Execute shell command (not supported atm).
609 * This is a one-way packet but if not (e.g. if part of
610 * a streaming transaction), we will have already closed
613 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
615 case DMSG_DBG_SHELL | DMSGF_REPLY:
617 * Receive one or more replies to a shell command that we
620 * This is a one-way packet but if not (e.g. if part of
621 * a streaming transaction), we will have already closed
625 msg->aux_data[msg->aux_size - 1] = 0;
626 kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
631 * Unsupported LNK message received. We only need to
632 * reply if it's a transaction in order to close our end.
633 * Ignore any one-way messages are any further messages
634 * associated with the transaction.
636 * NOTE: This case also includes DMSG_LNK_ERROR messages
637 * which might be one-way, replying to those would
638 * cause an infinite ping-pong.
640 if (msg->any.head.cmd & DMSGF_CREATE)
641 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
648 /************************************************************************
649 * XA DEVICE INTERFACE *
650 ************************************************************************/
653 xa_open(struct dev_open_args *ap)
655 cdev_t dev = ap->a_head.a_dev;
661 dev->si_bsize_phys = 512;
662 dev->si_bsize_best = 32768;
665 * Interlock open with opencnt, wait for attachment operations
668 lwkt_gettoken(&xdisk_token);
672 lwkt_reltoken(&xdisk_token);
673 return ENXIO; /* raced destruction */
675 if (xa->serializing) {
676 tsleep(xa, 0, "xarace", hz / 10);
679 if (xa->attached == 0) {
680 lwkt_reltoken(&xdisk_token);
681 return ENXIO; /* raced destruction */
685 * Serialize initial open
687 if (xa->opencnt++ > 0) {
688 lwkt_reltoken(&xdisk_token);
692 lwkt_reltoken(&xdisk_token);
694 tag = xa_setup_cmd(xa, NULL);
696 lwkt_gettoken(&xdisk_token);
697 KKASSERT(xa->opencnt > 0);
700 xa_terminate_check(xa);
701 lwkt_reltoken(&xdisk_token);
704 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
705 DMSG_BLK_OPEN | DMSGF_CREATE,
706 xa_sync_completion, tag);
707 msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
709 if (xa_wait(tag, 0) == 0) {
710 xa->keyid = tag->status.keyid;
711 xa->opentag = tag; /* leave tag open */
716 lwkt_gettoken(&xdisk_token);
717 KKASSERT(xa->opencnt > 0);
720 xa_terminate_check(xa);
721 lwkt_reltoken(&xdisk_token);
728 xa_close(struct dev_close_args *ap)
730 cdev_t dev = ap->a_head.a_dev;
736 return ENXIO; /* raced destruction */
738 lwkt_gettoken(&xa->tok);
739 if ((tag = xa->opentag) != NULL) {
741 kdmsg_state_reply(tag->state, 0);
742 while (tag->done == 0)
743 xa_wait(tag, tag->waitseq);
746 lwkt_reltoken(&xa->tok);
748 lwkt_gettoken(&xdisk_token);
749 KKASSERT(xa->opencnt > 0);
751 xa_terminate_check(xa);
752 lwkt_reltoken(&xdisk_token);
758 xa_strategy(struct dev_strategy_args *ap)
760 xa_softc_t *xa = ap->a_head.a_dev->si_drv1;
762 struct bio *bio = ap->a_bio;
765 * Allow potentially temporary link failures to fail the I/Os
766 * only if the device is not open. That is, we allow the disk
767 * probe code prior to mount to fail.
769 if (xa->attached == 0 && xa->opencnt == 0) {
770 bio->bio_buf->b_error = ENXIO;
771 bio->bio_buf->b_flags |= B_ERROR;
776 tag = xa_setup_cmd(xa, bio);
783 xa_ioctl(struct dev_ioctl_args *ap)
789 xa_size(struct dev_psize_args *ap)
793 if ((xa = ap->a_head.a_dev->si_drv1) == NULL)
795 ap->a_result = xa->info.d_media_blocks;
799 /************************************************************************
800 * XA BLOCK PROTOCOL STATE MACHINE *
801 ************************************************************************
803 * Implement tag/msg setup and related functions.
806 xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
808 kdmsg_circuit_t *circ;
812 * Only get a tag if we have a valid virtual circuit to the server.
814 lwkt_gettoken(&xa->tok);
815 TAILQ_FOREACH(circ, &xa->circq, entry) {
819 if (circ == NULL || xa->attached <= 0) {
821 } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
822 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
825 kdmsg_circ_hold(circ);
826 TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
830 * If we can't dispatch now and this is a bio, queue it for later.
832 if (tag == NULL && bio) {
833 TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act);
835 lwkt_reltoken(&xa->tok);
841 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
843 xa_softc_t *xa = tag->xa;
855 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
857 DMSGF_CREATE | DMSGF_DELETE,
858 xa_bio_completion, tag);
859 msg->any.blk_read.keyid = xa->keyid;
860 msg->any.blk_read.offset = bio->bio_offset;
861 msg->any.blk_read.bytes = bp->b_bcount;
864 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
866 DMSGF_CREATE | DMSGF_DELETE,
867 xa_bio_completion, tag);
868 msg->any.blk_write.keyid = xa->keyid;
869 msg->any.blk_write.offset = bio->bio_offset;
870 msg->any.blk_write.bytes = bp->b_bcount;
871 msg->aux_data = bp->b_data;
872 msg->aux_size = bp->b_bcount;
875 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
877 DMSGF_CREATE | DMSGF_DELETE,
878 xa_bio_completion, tag);
879 msg->any.blk_flush.keyid = xa->keyid;
880 msg->any.blk_flush.offset = bio->bio_offset;
881 msg->any.blk_flush.bytes = bp->b_bcount;
883 case BUF_CMD_FREEBLKS:
884 msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
886 DMSGF_CREATE | DMSGF_DELETE,
887 xa_bio_completion, tag);
888 msg->any.blk_freeblks.keyid = xa->keyid;
889 msg->any.blk_freeblks.offset = bio->bio_offset;
890 msg->any.blk_freeblks.bytes = bp->b_bcount;
893 bp->b_flags |= B_ERROR;
904 tag->state = msg->state;
905 kdmsg_msg_write(msg);
912 xa_wait(xa_tag_t *tag, int seq)
914 xa_softc_t *xa = tag->xa;
916 lwkt_gettoken(&xa->tok);
917 while (tag->waitseq == seq)
918 tsleep(tag, 0, "xawait", 0);
919 lwkt_reltoken(&xa->tok);
920 return (tag->status.head.error);
924 xa_done(xa_tag_t *tag, int wasbio)
926 xa_softc_t *xa = tag->xa;
929 KKASSERT(tag->bio == NULL);
933 lwkt_gettoken(&xa->tok);
934 if (wasbio && (bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
935 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
937 lwkt_reltoken(&xa->tok);
941 kdmsg_circ_drop(tag->circ);
944 TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
945 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
946 lwkt_reltoken(&xa->tok);
951 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
953 xa_tag_t *tag = state->any.any;
954 xa_softc_t *xa = tag->xa;
956 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
957 case DMSG_LNK_ERROR | DMSGF_REPLY:
958 bzero(&tag->status, sizeof(tag->status));
959 tag->status.head = msg->any.head;
961 case DMSG_BLK_ERROR | DMSGF_REPLY:
962 tag->status = msg->any.blk_error;
965 lwkt_gettoken(&xa->tok);
966 if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
967 if (xa->opentag == tag) {
968 xa->opentag = NULL; /* XXX */
969 kdmsg_state_reply(tag->state, 0);
971 lwkt_reltoken(&xa->tok);
978 lwkt_reltoken(&xa->tok);
986 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
988 xa_tag_t *tag = state->any.any;
989 xa_softc_t *xa = tag->xa;
994 * Get the bio from the tag. If no bio is present we just do
997 if ((bio = tag->bio) == NULL)
1002 * Process return status
1004 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1005 case DMSG_LNK_ERROR | DMSGF_REPLY:
1006 bzero(&tag->status, sizeof(tag->status));
1007 tag->status.head = msg->any.head;
1008 if (tag->status.head.error)
1009 tag->status.resid = bp->b_bcount;
1011 tag->status.resid = 0;
1013 case DMSG_BLK_ERROR | DMSGF_REPLY:
1014 tag->status = msg->any.blk_error;
1019 * Potentially move the bio back onto the pending queue if the
1020 * device is open and the error is related to losing the virtual
1023 if (tag->status.head.error &&
1024 (msg->any.head.cmd & DMSGF_DELETE) && xa->opencnt) {
1025 if (tag->status.head.error == DMSG_ERR_LOSTLINK ||
1026 tag->status.head.error == DMSG_ERR_CANTCIRC) {
1032 * Process bio completion
1034 * For reads any returned data is zero-extended if necessary, so
1035 * the server can short-cut any all-zeros reads if it desires.
1039 if (msg->aux_data && msg->aux_size) {
1040 if (msg->aux_size < bp->b_bcount) {
1041 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1042 bzero(bp->b_data + msg->aux_size,
1043 bp->b_bcount - msg->aux_size);
1045 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1048 bzero(bp->b_data, bp->b_bcount);
1053 case BUF_CMD_FREEBLKS:
1055 if (tag->status.resid > bp->b_bcount)
1056 tag->status.resid = bp->b_bcount;
1057 bp->b_resid = tag->status.resid;
1058 if ((bp->b_error = tag->status.head.error) != 0) {
1059 bp->b_flags |= B_ERROR;
1069 * Handle completion of the transaction. If the bioq is not empty
1070 * we can initiate another bio on the same tag.
1072 * NOTE: Most of our transactions will be single-message
1073 * CREATE+DELETEs, so we won't have to terminate the
1074 * transaction separately, here. But just in case they
1075 * aren't be sure to terminate the transaction.
1078 if (msg->any.head.cmd & DMSGF_DELETE) {
1080 if ((state->txcmd & DMSGF_DELETE) == 0)
1081 kdmsg_msg_reply(msg, 0);
1086 * Handle the case where the transaction failed due to a
1087 * connectivity issue. The tag is put away with wasbio=0
1088 * and we restart the bio.
1090 * Setting circ->lost causes xa_setup_cmd() to skip the circuit.
1091 * Other circuits might still be live. Once a circuit gets messed
1092 * up it will (eventually) be deleted so we can simply leave (lost)
1093 * set forever after.
1096 lwkt_gettoken(&xa->tok);
1097 kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1098 tag->circ->lost = 1;
1101 if ((state->txcmd & DMSGF_DELETE) == 0)
1102 kdmsg_msg_reply(msg, 0);
1105 * Restart or requeue the bio
1107 tag = xa_setup_cmd(xa, bio);
1109 xa_start(tag, NULL);
1110 lwkt_reltoken(&xa->tok);
1115 * Restart as much deferred I/O as we can.
1117 * Called with xa->tok held
1121 xa_restart_deferred(xa_softc_t *xa)
1126 while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
1127 tag = xa_setup_cmd(xa, NULL);
1130 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
1132 xa_start(tag, NULL);