23877ccfe1c2a36484a0fd32e77388d2eafb945d
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66
67 #include <sys/buf2.h>
68 #include <sys/thread2.h>
69
70 struct xa_softc;
71 struct xa_softc_tree;
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
74
75 static int xa_active;
76 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
77            "Number of active xdisk IOs");
78 static uint64_t xa_last;
79 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
80            "Offset of last xdisk IO");
81
82 /*
83  * Track a BIO tag
84  */
85 struct xa_tag {
86         TAILQ_ENTRY(xa_tag) entry;
87         struct xa_softc *sc;
88         dmsg_blk_error_t status;
89         kdmsg_state_t   *state;
90         struct bio      *bio;
91         int             waiting;
92         int             async;
93         int             done;
94 };
95
96 typedef struct xa_tag   xa_tag_t;
97
98 /*
99  * Track devices.
100  */
101 struct xa_softc {
102         struct kdmsg_state_list spanq;
103         RB_ENTRY(xa_softc) rbnode;
104         cdev_t          dev;
105         struct devstat  stats;
106         struct disk_info info;
107         struct disk     disk;
108         uuid_t          pfs_fsid;
109         int             unit;
110         int             opencnt;
111         int             spancnt;
112         uint64_t        keyid;
113         int             serializing;
114         int             last_error;
115         char            cl_label[64];   /* from LNK_SPAN cl_label (host/dev) */
116         char            fs_label[64];   /* from LNK_SPAN fs_label (serno str) */
117         xa_tag_t        *open_tag;
118         TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
119         TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
120         TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
121         struct lock     lk;
122 };
123
124 typedef struct xa_softc xa_softc_t;
125
126 struct xa_iocom {
127         TAILQ_ENTRY(xa_iocom) entry;
128         kdmsg_iocom_t   iocom;
129         xa_softc_t      dummysc;
130 };
131
132 typedef struct xa_iocom xa_iocom_t;
133
134 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
135 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
136 static struct xa_softc_tree xa_device_tree;
137
138 #define MAXTAGS         64      /* no real limit */
139
140 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
141 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
142 static void xaio_exit(kdmsg_iocom_t *iocom);
143 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
144
145 static void xa_terminate_check(struct xa_softc *sc);
146
147 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
148 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
149 static void xa_done(xa_tag_t *tag, int wasbio);
150 static void xa_release(xa_tag_t *tag, int wasbio);
151 static uint32_t xa_wait(xa_tag_t *tag);
152 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
153 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
154 static void xa_restart_deferred(xa_softc_t *sc);
155
156 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
157
158 /*
159  * Control device, issue ioctls to create xa devices.
160  */
161 static d_open_t xdisk_open;
162 static d_close_t xdisk_close;
163 static d_ioctl_t xdisk_ioctl;
164
165 static struct dev_ops xdisk_ops = {
166         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
167         .d_open =       xdisk_open,
168         .d_close =      xdisk_close,
169         .d_ioctl =      xdisk_ioctl
170 };
171
172 /*
173  * XA disk devices
174  */
175 static d_open_t xa_open;
176 static d_close_t xa_close;
177 static d_ioctl_t xa_ioctl;
178 static d_strategy_t xa_strategy;
179 static d_psize_t xa_size;
180
181 static struct dev_ops xa_ops = {
182         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
183         .d_open =       xa_open,
184         .d_close =      xa_close,
185         .d_ioctl =      xa_ioctl,
186         .d_read =       physread,
187         .d_write =      physwrite,
188         .d_strategy =   xa_strategy,
189         .d_psize =      xa_size
190 };
191
192 static int xdisk_opencount;
193 static cdev_t xdisk_dev;
194 struct lock xdisk_lk;
195 static TAILQ_HEAD(, xa_iocom) xaiocomq;
196
197 /*
198  * Module initialization
199  */
200 static int
201 xdisk_modevent(module_t mod, int type, void *data)
202 {
203         switch (type) {
204         case MOD_LOAD:
205                 TAILQ_INIT(&xaiocomq);
206                 RB_INIT(&xa_device_tree);
207                 lockinit(&xdisk_lk, "xdisk", 0, 0);
208                 xdisk_dev = make_dev(&xdisk_ops, 0,
209                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
210                 break;
211         case MOD_UNLOAD:
212         case MOD_SHUTDOWN:
213                 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
214                         return (EBUSY);
215                 if (xdisk_dev) {
216                         destroy_dev(xdisk_dev);
217                         xdisk_dev = NULL;
218                 }
219                 dev_ops_remove_all(&xdisk_ops);
220                 dev_ops_remove_all(&xa_ops);
221                 break;
222         default:
223                 break;
224         }
225         return 0;
226 }
227
228 DEV_MODULE(xdisk, xdisk_modevent, 0);
229
230 static int
231 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
232 {
233         return(strcmp(sc1->fs_label, sc2->fs_label));
234 }
235
236 /*
237  * Control device
238  */
239 static int
240 xdisk_open(struct dev_open_args *ap)
241 {
242         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
243         ++xdisk_opencount;
244         lockmgr(&xdisk_lk, LK_RELEASE);
245         return(0);
246 }
247
248 static int
249 xdisk_close(struct dev_close_args *ap)
250 {
251         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
252         --xdisk_opencount;
253         lockmgr(&xdisk_lk, LK_RELEASE);
254         return(0);
255 }
256
257 static int
258 xdisk_ioctl(struct dev_ioctl_args *ap)
259 {
260         int error;
261
262         switch(ap->a_cmd) {
263         case XDISKIOCATTACH:
264                 error = xdisk_attach((void *)ap->a_data);
265                 break;
266         case XDISKIOCDETACH:
267                 error = xdisk_detach((void *)ap->a_data);
268                 break;
269         default:
270                 error = ENOTTY;
271                 break;
272         }
273         return error;
274 }
275
276 /************************************************************************
277  *                              DMSG INTERFACE                          *
278  ************************************************************************/
279
280 static int
281 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
282 {
283         xa_iocom_t *xaio;
284         struct file *fp;
285
286         /*
287          * Normalize ioctl params
288          */
289         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
290         if (fp == NULL)
291                 return EINVAL;
292         kprintf("xdisk_attach fp=%p\n", fp);
293
294         /*
295          * See if the serial number is already present.  If we are
296          * racing a termination the disk subsystem may still have
297          * duplicate entries not yet removed so we wait a bit and
298          * retry.
299          */
300         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
301
302         xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
303         kdmsg_iocom_init(&xaio->iocom, xaio,
304                          KDMSG_IOCOMF_AUTOCONN,
305                          M_XDISK, xaio_rcvdmsg);
306         xaio->iocom.exit_func = xaio_exit;
307
308         kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
309
310         /*
311          * Setup our LNK_CONN advertisement for autoinitiate.
312          *
313          * Our filter is setup to only accept PEER_BLOCK/SERVER
314          * advertisements.
315          *
316          * We need a unique pfs_fsid to avoid confusion.
317          */
318         xaio->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
319         xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
320         xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
321         xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
322         xaio->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
323         ksnprintf(xaio->iocom.auto_lnk_conn.fs_label,
324                   sizeof(xaio->iocom.auto_lnk_conn.fs_label),
325                   "xdisk");
326         kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1);
327
328         /*
329          * Setup our LNK_SPAN advertisement for autoinitiate
330          */
331         TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
332         kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
333
334         lockmgr(&xdisk_lk, LK_RELEASE);
335
336         return 0;
337 }
338
339 static int
340 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
341 {
342         return EINVAL;
343 }
344
345 /*
346  * Called from iocom core transmit thread upon disconnect.
347  */
348 static
349 void
350 xaio_exit(kdmsg_iocom_t *iocom)
351 {
352         xa_iocom_t *xaio = iocom->handle;
353
354         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
355         kprintf("xdisk_detach [xaio_exit()]\n");
356         TAILQ_REMOVE(&xaiocomq, xaio, entry);
357         lockmgr(&xdisk_lk, LK_RELEASE);
358
359         kdmsg_iocom_uninit(&xaio->iocom);
360
361         kfree(xaio, M_XDISK);
362 }
363
364 /*
365  * Called from iocom core to handle messages that the iocom core does not
366  * handle itself and for which a state function callback has not yet been
367  * established.
368  *
369  * We primarily care about LNK_SPAN transactions here.
370  */
371 static int
372 xaio_rcvdmsg(kdmsg_msg_t *msg)
373 {
374         kdmsg_state_t   *state = msg->state;
375         xa_iocom_t      *xaio = state->iocom->handle;
376         xa_softc_t      *sc;
377
378         if (state) {
379                 kprintf("xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
380                         state, state->rxcmd, state->txcmd,
381                         msg->any.head.cmd);
382         }
383         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
384
385         switch(msg->tcmd) {
386         case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
387                 /*
388                  * A LNK_SPAN transaction which is opened and closed
389                  * degenerately is not useful to us, just ignore it.
390                  */
391                 kdmsg_msg_reply(msg, 0);
392                 break;
393         case DMSG_LNK_SPAN | DMSGF_CREATE:
394                 /*
395                  * Manage the tracking node for the remote LNK_SPAN.
396                  *
397                  * Return a streaming result, leaving the transaction open
398                  * in both directions to allow sub-transactions.
399                  */
400                 bcopy(msg->any.lnk_span.cl_label, xaio->dummysc.cl_label,
401                       sizeof(xaio->dummysc.cl_label));
402                 xaio->dummysc.cl_label[sizeof(xaio->dummysc.cl_label) - 1] = 0;
403
404                 bcopy(msg->any.lnk_span.fs_label, xaio->dummysc.fs_label,
405                       sizeof(xaio->dummysc.fs_label));
406                 xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
407
408                 kprintf("xdisk: LINK_SPAN state %p create for %s\n",
409                         msg->state, msg->any.lnk_span.fs_label);
410
411                 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
412                 if (sc == NULL) {
413                         xa_softc_t *sctmp;
414                         xa_tag_t *tag;
415                         cdev_t dev;
416                         int unit;
417                         int n;
418
419                         sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
420                         bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
421                               sizeof(sc->cl_label));
422                         sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
423                         bcopy(msg->any.lnk_span.fs_label, sc->fs_label,
424                               sizeof(sc->fs_label));
425                         sc->fs_label[sizeof(sc->fs_label) - 1] = 0;
426
427                         /* XXX FIXME O(N^2) */
428                         unit = -1;
429                         do {
430                                 ++unit;
431                                 RB_FOREACH(sctmp, xa_softc_tree,
432                                            &xa_device_tree) {
433                                         if (sctmp->unit == unit)
434                                                 break;
435                                 }
436                         } while (sctmp);
437
438                         sc->unit = unit;
439                         sc->serializing = 1;
440                         sc->spancnt = 1;
441                         lockinit(&sc->lk, "xalk", 0, 0);
442                         TAILQ_INIT(&sc->spanq);
443                         TAILQ_INIT(&sc->bioq);
444                         TAILQ_INIT(&sc->tag_freeq);
445                         TAILQ_INIT(&sc->tag_pendq);
446
447                         lockmgr(&sc->lk, LK_EXCLUSIVE);
448                         RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
449                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
450                         msg->state->any.xa_sc = sc;
451
452                         /*
453                          * Setup block device
454                          */
455                         for (n = 0; n < MAXTAGS; ++n) {
456                                 tag = kmalloc(sizeof(*tag),
457                                               M_XDISK, M_WAITOK|M_ZERO);
458                                 tag->sc = sc;
459                                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
460                         }
461
462                         if (sc->dev == NULL) {
463                                 dev = disk_create(unit, &sc->disk, &xa_ops);
464                                 dev->si_drv1 = sc;
465                                 sc->dev = dev;
466                                 devstat_add_entry(&sc->stats, "xa", unit,
467                                                   DEV_BSIZE,
468                                                   DEVSTAT_NO_ORDERED_TAGS,
469                                                   DEVSTAT_TYPE_DIRECT |
470                                                   DEVSTAT_TYPE_IF_OTHER,
471                                                   DEVSTAT_PRIORITY_OTHER);
472                         }
473
474                         sc->info.d_media_blksize =
475                                 msg->any.lnk_span.media.block.blksize;
476                         if (sc->info.d_media_blksize <= 0)
477                                 sc->info.d_media_blksize = 1;
478                         sc->info.d_media_blocks =
479                                 msg->any.lnk_span.media.block.bytes /
480                                 sc->info.d_media_blksize;
481                         sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
482                         sc->info.d_secpertrack = 32;
483                         sc->info.d_nheads = 64;
484                         sc->info.d_secpercyl = sc->info.d_secpertrack *
485                                                sc->info.d_nheads;
486                         sc->info.d_ncylinders = 0;
487                         if (sc->fs_label[0])
488                                 sc->info.d_serialno = sc->fs_label;
489                         /*
490                          * WARNING! disk_setdiskinfo() must be asynchronous
491                          *          because we are in the rxmsg thread.  If
492                          *          it is synchronous and issues more disk
493                          *          I/Os, we will deadlock.
494                          */
495                         kprintf("xdisk: A1\n");
496                         disk_setdiskinfo(&sc->disk, &sc->info);
497                         xa_restart_deferred(sc);        /* eats serializing */
498                         lockmgr(&sc->lk, LK_RELEASE);
499                 } else {
500                         lockmgr(&sc->lk, LK_EXCLUSIVE);
501                         ++sc->spancnt;
502                         kprintf("xdisk: A2 (%d) ser=%d otag=%p\n", sc->spancnt, sc->serializing, sc->open_tag);
503                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
504                         msg->state->any.xa_sc = sc;
505                         if (sc->serializing == 0 && sc->open_tag == NULL) {
506                                 sc->serializing = 1;
507                                 xa_restart_deferred(sc); /* eats serializing */
508                         }
509                         lockmgr(&sc->lk, LK_RELEASE);
510                         if (sc->dev && sc->dev->si_disk) {
511                                 kprintf("reprobe\n");
512                                 disk_msg_send(DISK_DISK_REPROBE,
513                                               sc->dev->si_disk,
514                                               NULL);
515                         }
516                 }
517                 kprintf("xdisk: sc %p spancnt %d\n", sc, sc->spancnt);
518                 kdmsg_msg_result(msg, 0);
519                 break;
520         case DMSG_LNK_SPAN | DMSGF_DELETE:
521                 /*
522                  * Manage the tracking node for the remote LNK_SPAN.
523                  *
524                  * Return a final result, closing our end of the transaction.
525                  */
526                 sc = msg->state->any.xa_sc;
527                 kprintf("xdisk: LINK_SPAN state %p delete for %s (sc=%p)\n",
528                         msg->state, (sc ? sc->fs_label : "(null)"), sc);
529                 lockmgr(&sc->lk, LK_EXCLUSIVE);
530                 msg->state->any.xa_sc = NULL;
531                 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
532                 --sc->spancnt;
533
534                 kprintf("xdisk: sc %p spancnt %d\n", sc, sc->spancnt);
535
536                 /*
537                  * Spans can come and go as the graph stabilizes, so if
538                  * we lose a span along with sc->open_tag we may be able
539                  * to restart the I/Os on a different span.
540                  */
541                 if (sc->spancnt &&
542                     sc->serializing == 0 && sc->open_tag == NULL) {
543                         sc->serializing = 1;
544                         xa_restart_deferred(sc);
545                 }
546                 lockmgr(&sc->lk, LK_RELEASE);
547                 kdmsg_msg_reply(msg, 0);
548
549 #if 0
550                 /*
551                  * Termination
552                  */
553                 if (sc->spancnt == 0)
554                         xa_terminate_check(sc);
555 #endif
556                 break;
557         case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
558                 /*
559                  * Ignore unimplemented streaming replies on our LNK_SPAN
560                  * transaction.
561                  */
562                 kprintf("xdisk: LINK_SPAN state %p delete+reply\n",
563                         msg->state);
564                 break;
565         case DMSG_LNK_SPAN | DMSGF_REPLY:
566                 /*
567                  * Ignore unimplemented streaming replies on our LNK_SPAN
568                  * transaction.
569                  */
570                 kprintf("xdisk: LINK_SPAN state %p reply\n",
571                         msg->state);
572                 break;
573         case DMSG_DBG_SHELL:
574                 /*
575                  * Execute shell command (not supported atm).
576                  *
577                  * This is a one-way packet but if not (e.g. if part of
578                  * a streaming transaction), we will have already closed
579                  * our end.
580                  */
581                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
582                 break;
583         case DMSG_DBG_SHELL | DMSGF_REPLY:
584                 /*
585                  * Receive one or more replies to a shell command
586                  * that we sent.  Just dump it to the console.
587                  *
588                  * This is a one-way packet but if not (e.g. if
589                  * part of a streaming transaction), we will have
590                  * already closed our end.
591                  */
592                 if (msg->aux_data) {
593                         msg->aux_data[msg->aux_size - 1] = 0;
594                         kprintf("xdisk: DEBUGMSG: %s\n",
595                                 msg->aux_data);
596                 }
597                 break;
598         default:
599                 /*
600                  * Unsupported one-way message, streaming message, or
601                  * transaction.
602                  *
603                  * Terminate any unsupported transactions with an error
604                  * and ignore any unsupported streaming messages.
605                  *
606                  * NOTE: This case also includes DMSG_LNK_ERROR messages
607                  *       which might be one-way, replying to those would
608                  *       cause an infinite ping-pong.
609                  */
610                 if (msg->any.head.cmd & DMSGF_CREATE)
611                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
612                 break;
613         }
614         lockmgr(&xdisk_lk, LK_RELEASE);
615
616         return 0;
617 }
618
619 /*
620  * Determine if we can destroy the xa_softc.
621  *
622  * Called with xdisk_lk held.
623  */
624 static
625 void
626 xa_terminate_check(struct xa_softc *sc)
627 {
628         xa_tag_t *tag;
629
630         /*
631          * Determine if we can destroy the softc.
632          */
633         kprintf("xdisk: terminate check xa%d (%d,%d,%d) sc=%p ",
634                 sc->unit,
635                 sc->opencnt, sc->serializing, sc->spancnt,
636                 sc);
637
638         if (sc->opencnt || sc->serializing || sc->spancnt) {
639                 kprintf("(leave intact)\n");
640                 return;
641         }
642
643         /*
644          * Remove from device tree, a race with a new incoming span
645          * will create a new softc and disk.
646          */
647         RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
648
649         /*
650          * Device has to go first to prevent device ops races.
651          */
652         if (sc->dev) {
653                 disk_destroy(&sc->disk);
654                 devstat_remove_entry(&sc->stats);
655                 sc->dev->si_drv1 = NULL;
656                 sc->dev = NULL;
657         }
658
659         kprintf("(remove from tree)\n");
660         sc->serializing = 1;
661         KKASSERT(sc->opencnt == 0);
662         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
663
664         while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
665                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
666                 tag->sc = NULL;
667                 kfree(tag, M_XDISK);
668         }
669
670         kfree(sc, M_XDISK);
671 }
672
673 /************************************************************************
674  *                         XA DEVICE INTERFACE                          *
675  ************************************************************************/
676
677 static int
678 xa_open(struct dev_open_args *ap)
679 {
680         cdev_t dev = ap->a_head.a_dev;
681         xa_softc_t *sc;
682         int error;
683
684         dev->si_bsize_phys = 512;
685         dev->si_bsize_best = 32768;
686
687         /*
688          * Interlock open with opencnt, wait for attachment operations
689          * to finish.
690          */
691         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
692 again:
693         sc = dev->si_drv1;
694         if (sc == NULL) {
695                 lockmgr(&xdisk_lk, LK_RELEASE);
696                 return ENXIO;   /* raced destruction */
697         }
698         if (sc->serializing) {
699                 tsleep(sc, 0, "xarace", hz / 10);
700                 goto again;
701         }
702         sc->serializing = 1;
703
704         /*
705          * Serialize initial open
706          */
707         if (sc->opencnt++ > 0) {
708                 sc->serializing = 0;
709                 wakeup(sc);
710                 lockmgr(&xdisk_lk, LK_RELEASE);
711                 return(0);
712         }
713
714         /*
715          * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
716          */
717         if (sc->open_tag == NULL) {
718                 lockmgr(&sc->lk, LK_EXCLUSIVE);
719                 xa_restart_deferred(sc); /* eats serializing */
720                 lockmgr(&sc->lk, LK_RELEASE);
721         } else {
722                 sc->serializing = 0;
723                 wakeup(sc);
724         }
725         lockmgr(&xdisk_lk, LK_RELEASE);
726
727         /*
728          * Wait for completion of the BLK_OPEN
729          */
730         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
731         while (sc->serializing)
732                 lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
733
734         error = sc->last_error;
735         if (error) {
736                 KKASSERT(sc->opencnt > 0);
737                 --sc->opencnt;
738                 xa_terminate_check(sc);
739                 sc = NULL;      /* sc may be invalid now */
740         }
741         lockmgr(&xdisk_lk, LK_RELEASE);
742
743         return (error);
744 }
745
746 static int
747 xa_close(struct dev_close_args *ap)
748 {
749         cdev_t dev = ap->a_head.a_dev;
750         xa_softc_t *sc;
751         xa_tag_t *tag;
752
753         sc = dev->si_drv1;
754         if (sc == NULL)
755                 return ENXIO;   /* raced destruction */
756         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
757         lockmgr(&sc->lk, LK_EXCLUSIVE);
758
759         /*
760          * NOTE: Clearing open_tag allows a concurrent open to re-open
761          *       the device and prevents autonomous completion of the tag.
762          */
763         if (sc->opencnt == 1 && sc->open_tag) {
764                 tag = sc->open_tag;
765                 sc->open_tag = NULL;
766                 lockmgr(&sc->lk, LK_RELEASE);
767                 kdmsg_state_reply(tag->state, 0);       /* close our side */
768                 xa_wait(tag);                           /* wait on remote */
769         } else {
770                 lockmgr(&sc->lk, LK_RELEASE);
771         }
772         KKASSERT(sc->opencnt > 0);
773         --sc->opencnt;
774         xa_terminate_check(sc);
775         lockmgr(&xdisk_lk, LK_RELEASE);
776
777         return(0);
778 }
779
780 static int
781 xa_strategy(struct dev_strategy_args *ap)
782 {
783         xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
784         xa_tag_t *tag;
785         struct bio *bio = ap->a_bio;
786
787         devstat_start_transaction(&sc->stats);
788         atomic_add_int(&xa_active, 1);
789         xa_last = bio->bio_offset;
790
791         /*
792          * If no tags are available NULL is returned and the bio is
793          * placed on sc->bioq.
794          */
795         lockmgr(&sc->lk, LK_EXCLUSIVE);
796         tag = xa_setup_cmd(sc, bio);
797         if (tag)
798                 xa_start(tag, NULL, 1);
799         lockmgr(&sc->lk, LK_RELEASE);
800
801         return(0);
802 }
803
804 static int
805 xa_ioctl(struct dev_ioctl_args *ap)
806 {
807         return(ENOTTY);
808 }
809
810 static int
811 xa_size(struct dev_psize_args *ap)
812 {
813         struct xa_softc *sc;
814
815         if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
816                 return (ENXIO);
817         ap->a_result = sc->info.d_media_blocks;
818         return (0);
819 }
820
821 /************************************************************************
822  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
823  ************************************************************************
824  *
825  * Implement tag/msg setup and related functions.
826  * Called with sc->lk held.
827  */
828 static xa_tag_t *
829 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
830 {
831         xa_tag_t *tag;
832
833         /*
834          * Only get a tag if we have a valid virtual circuit to the server.
835          */
836         if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
837                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
838                 tag->bio = bio;
839                 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
840         }
841
842         /*
843          * If we can't dispatch now and this is a bio, queue it for later.
844          */
845         if (tag == NULL && bio) {
846                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
847         }
848
849         return (tag);
850 }
851
852 /*
853  * Called with sc->lk held
854  */
855 static void
856 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
857 {
858         xa_softc_t *sc = tag->sc;
859
860         tag->done = 0;
861         tag->async = async;
862         tag->status.head.error = DMSG_ERR_IO;   /* fallback error */
863
864         if (msg == NULL) {
865                 struct bio *bio;
866                 struct buf *bp;
867                 kdmsg_state_t *trans;
868
869                 if (sc->opencnt == 0 || sc->open_tag == NULL) {
870                         TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
871                                 if ((trans->rxcmd & DMSGF_DELETE) == 0)
872                                         break;
873                         }
874                 } else {
875                         trans = sc->open_tag->state;
876                 }
877                 if (trans == NULL)
878                         goto skip;
879
880                 KKASSERT(tag->bio);
881                 bio = tag->bio;
882                 bp = bio->bio_buf;
883
884                 switch(bp->b_cmd) {
885                 case BUF_CMD_READ:
886                         msg = kdmsg_msg_alloc(trans,
887                                               DMSG_BLK_READ |
888                                               DMSGF_CREATE |
889                                               DMSGF_DELETE,
890                                               xa_bio_completion, tag);
891                         msg->any.blk_read.keyid = sc->keyid;
892                         msg->any.blk_read.offset = bio->bio_offset;
893                         msg->any.blk_read.bytes = bp->b_bcount;
894                         break;
895                 case BUF_CMD_WRITE:
896                         msg = kdmsg_msg_alloc(trans,
897                                               DMSG_BLK_WRITE |
898                                               DMSGF_CREATE | DMSGF_DELETE,
899                                               xa_bio_completion, tag);
900                         msg->any.blk_write.keyid = sc->keyid;
901                         msg->any.blk_write.offset = bio->bio_offset;
902                         msg->any.blk_write.bytes = bp->b_bcount;
903                         msg->aux_data = bp->b_data;
904                         msg->aux_size = bp->b_bcount;
905                         break;
906                 case BUF_CMD_FLUSH:
907                         msg = kdmsg_msg_alloc(trans,
908                                               DMSG_BLK_FLUSH |
909                                               DMSGF_CREATE | DMSGF_DELETE,
910                                               xa_bio_completion, tag);
911                         msg->any.blk_flush.keyid = sc->keyid;
912                         msg->any.blk_flush.offset = bio->bio_offset;
913                         msg->any.blk_flush.bytes = bp->b_bcount;
914                         break;
915                 case BUF_CMD_FREEBLKS:
916                         msg = kdmsg_msg_alloc(trans,
917                                               DMSG_BLK_FREEBLKS |
918                                               DMSGF_CREATE | DMSGF_DELETE,
919                                               xa_bio_completion, tag);
920                         msg->any.blk_freeblks.keyid = sc->keyid;
921                         msg->any.blk_freeblks.offset = bio->bio_offset;
922                         msg->any.blk_freeblks.bytes = bp->b_bcount;
923                         break;
924                 default:
925                         bp->b_flags |= B_ERROR;
926                         bp->b_error = EIO;
927                         devstat_end_transaction_buf(&sc->stats, bp);
928                         atomic_add_int(&xa_active, -1);
929                         biodone(bio);
930                         tag->bio = NULL;
931                         break;
932                 }
933         }
934
935         /*
936          * If no msg was allocated we likely could not find a good span.
937          */
938 skip:
939         if (msg) {
940                 /*
941                  * Message was passed in or constructed.
942                  */
943                 tag->state = msg->state;
944                 lockmgr(&sc->lk, LK_RELEASE);
945                 kdmsg_msg_write(msg);
946                 lockmgr(&sc->lk, LK_EXCLUSIVE);
947         } else if (tag->bio &&
948                    (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) {
949                 /*
950                  * No spans available but BIO is not allowed to fail
951                  * on connectivity problems.  Requeue the BIO.
952                  */
953                 TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act);
954                 tag->bio = NULL;
955                 lockmgr(&sc->lk, LK_RELEASE);
956                 xa_done(tag, 1);
957                 lockmgr(&sc->lk, LK_EXCLUSIVE);
958         } else {
959                 /*
960                  * No spans available, bio is allowed to fail.
961                  */
962                 lockmgr(&sc->lk, LK_RELEASE);
963                 tag->status.head.error = DMSG_ERR_IO;
964                 xa_done(tag, 1);
965                 lockmgr(&sc->lk, LK_EXCLUSIVE);
966         }
967 }
968
969 static uint32_t
970 xa_wait(xa_tag_t *tag)
971 {
972         xa_softc_t *sc = tag->sc;
973         uint32_t error;
974
975         lockmgr(&sc->lk, LK_EXCLUSIVE);
976         tag->waiting = 1;
977         while (tag->done == 0)
978                 lksleep(tag, &sc->lk, 0, "xawait", 0);
979         lockmgr(&sc->lk, LK_RELEASE);
980
981         error = tag->status.head.error;
982         tag->waiting = 0;
983         xa_release(tag, 0);
984
985         return error;
986 }
987
988 static void
989 xa_done(xa_tag_t *tag, int wasbio)
990 {
991         KKASSERT(tag->bio == NULL);
992
993         tag->state = NULL;
994         tag->done = 1;
995         if (tag->waiting)
996                 wakeup(tag);
997         if (tag->async)
998                 xa_release(tag, wasbio);
999 }
1000
1001 /*
1002  * Release a tag.  If everything looks ok and there are pending BIOs
1003  * (due to all tags in-use), we can use the tag to start the next BIO.
1004  * Do not try to restart if the connection is currently failed.
1005  */
1006 static
1007 void
1008 xa_release(xa_tag_t *tag, int wasbio)
1009 {
1010         xa_softc_t *sc = tag->sc;
1011         struct bio *bio;
1012
1013         if ((bio = tag->bio) != NULL) {
1014                 struct buf *bp = bio->bio_buf;
1015
1016                 bp->b_error = EIO;
1017                 bp->b_flags |= B_ERROR;
1018                 devstat_end_transaction_buf(&sc->stats, bp);
1019                 atomic_add_int(&xa_active, -1);
1020                 biodone(bio);
1021                 tag->bio = NULL;
1022         }
1023
1024         lockmgr(&sc->lk, LK_EXCLUSIVE);
1025
1026         if (wasbio && sc->open_tag &&
1027             (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1028                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1029                 tag->bio = bio;
1030                 xa_start(tag, NULL, 1);
1031         } else {
1032                 TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1033                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1034         }
1035         lockmgr(&sc->lk, LK_RELEASE);
1036 }
1037
1038 /*
1039  * Handle messages under the BLKOPEN transaction.
1040  */
1041 static int
1042 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1043 {
1044         xa_tag_t *tag = state->any.any;
1045         xa_softc_t *sc;
1046         struct bio *bio;
1047
1048         /*
1049          * If the tag has been cleaned out we already closed our side
1050          * of the transaction and we are waiting for the other side to
1051          * close.
1052          */
1053         kprintf("xa_sync_completion: tag %p msg %08x state %p\n",
1054                 tag, msg->any.head.cmd, msg->state);
1055
1056         if (tag == NULL) {
1057                 if (msg->any.head.cmd & DMSGF_CREATE)
1058                         kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1059                 return 0;
1060         }
1061         sc = tag->sc;
1062
1063         /*
1064          * Validate the tag
1065          */
1066         lockmgr(&sc->lk, LK_EXCLUSIVE);
1067
1068         /*
1069          * Handle initial response to our open and restart any deferred
1070          * BIOs on success.
1071          *
1072          * NOTE: DELETE may also be set.
1073          */
1074         if (msg->any.head.cmd & DMSGF_CREATE) {
1075                 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1076                 case DMSG_LNK_ERROR | DMSGF_REPLY:
1077                         bzero(&tag->status, sizeof(tag->status));
1078                         tag->status.head = msg->any.head;
1079                         break;
1080                 case DMSG_BLK_ERROR | DMSGF_REPLY:
1081                         tag->status = msg->any.blk_error;
1082                         break;
1083                 }
1084                 sc->last_error = tag->status.head.error;
1085                 kprintf("xdisk: blk_open completion status %d\n",
1086                         sc->last_error);
1087                 if (sc->last_error == 0) {
1088                         while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1089                                 tag = xa_setup_cmd(sc, NULL);
1090                                 if (tag == NULL)
1091                                         break;
1092                                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1093                                 tag->bio = bio;
1094                                 xa_start(tag, NULL, 1);
1095                         }
1096                 }
1097                 sc->serializing = 0;
1098                 wakeup(sc);
1099         }
1100
1101         /*
1102          * Handle unexpected termination (or lost comm channel) from other
1103          * side.  Autonomous completion only if open_tag matches,
1104          * otherwise another thread is probably waiting on the tag.
1105          *
1106          * (see xa_close() for other interactions)
1107          */
1108         if (msg->any.head.cmd & DMSGF_DELETE) {
1109                 kdmsg_state_reply(tag->state, 0);
1110                 if (sc->open_tag == tag) {
1111                         sc->open_tag = NULL;
1112                         xa_done(tag, 0);
1113                 } else {
1114                         tag->async = 0;
1115                         xa_done(tag, 0);
1116                 }
1117         }
1118         lockmgr(&sc->lk, LK_RELEASE);
1119
1120         return (0);
1121 }
1122
1123 static int
1124 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1125 {
1126         xa_tag_t *tag = state->any.any;
1127         xa_softc_t *sc = tag->sc;
1128         struct bio *bio;
1129         struct buf *bp;
1130
1131         /*
1132          * Get the bio from the tag.  If no bio is present we just do
1133          * 'done' handling.
1134          */
1135         if ((bio = tag->bio) == NULL)
1136                 goto handle_done;
1137         bp = bio->bio_buf;
1138
1139         /*
1140          * Process return status
1141          */
1142         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1143         case DMSG_LNK_ERROR | DMSGF_REPLY:
1144                 bzero(&tag->status, sizeof(tag->status));
1145                 tag->status.head = msg->any.head;
1146                 if (tag->status.head.error)
1147                         tag->status.resid = bp->b_bcount;
1148                 else
1149                         tag->status.resid = 0;
1150                 break;
1151         case DMSG_BLK_ERROR | DMSGF_REPLY:
1152                 tag->status = msg->any.blk_error;
1153                 break;
1154         }
1155
1156         /*
1157          * If the device is open stall the bio on DMSG errors.  If an
1158          * actual I/O error occured on the remote device, DMSG_ERR_IO
1159          * will be returned.
1160          */
1161         if (tag->status.head.error &&
1162             (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1163                 if (tag->status.head.error != DMSG_ERR_IO)
1164                         goto handle_repend;
1165         }
1166
1167         /*
1168          * Process bio completion
1169          *
1170          * For reads any returned data is zero-extended if necessary, so
1171          * the server can short-cut any all-zeros reads if it desires.
1172          */
1173         switch(bp->b_cmd) {
1174         case BUF_CMD_READ:
1175                 if (msg->aux_data && msg->aux_size) {
1176                         if (msg->aux_size < bp->b_bcount) {
1177                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1178                                 bzero(bp->b_data + msg->aux_size,
1179                                       bp->b_bcount - msg->aux_size);
1180                         } else {
1181                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1182                         }
1183                 } else {
1184                         bzero(bp->b_data, bp->b_bcount);
1185                 }
1186                 /* fall through */
1187         case BUF_CMD_WRITE:
1188         case BUF_CMD_FLUSH:
1189         case BUF_CMD_FREEBLKS:
1190         default:
1191                 if (tag->status.resid > bp->b_bcount)
1192                         tag->status.resid = bp->b_bcount;
1193                 bp->b_resid = tag->status.resid;
1194                 if (tag->status.head.error != 0) {
1195                         bp->b_error = EIO;
1196                         bp->b_flags |= B_ERROR;
1197                 } else {
1198                         bp->b_resid = 0;
1199                 }
1200                 devstat_end_transaction_buf(&sc->stats, bp);
1201                 atomic_add_int(&xa_active, -1);
1202                 biodone(bio);
1203                 tag->bio = NULL;
1204                 break;
1205         }
1206
1207         /*
1208          * Handle completion of the transaction.  If the bioq is not empty
1209          * we can initiate another bio on the same tag.
1210          *
1211          * NOTE: Most of our transactions will be single-message
1212          *       CREATE+DELETEs, so we won't have to terminate the
1213          *       transaction separately, here.  But just in case they
1214          *       aren't be sure to terminate the transaction.
1215          */
1216 handle_done:
1217         if (msg->any.head.cmd & DMSGF_DELETE) {
1218                 xa_done(tag, 1);
1219                 if ((state->txcmd & DMSGF_DELETE) == 0)
1220                         kdmsg_msg_reply(msg, 0);
1221         }
1222         return (0);
1223
1224         /*
1225          * Handle the case where the transaction failed due to a
1226          * connectivity issue.  The tag is put away with wasbio=0
1227          * and we put the BIO back onto the bioq for a later restart.
1228          *
1229          * probe I/Os (where the device is not open) will be failed
1230          * instead of requeued.
1231          */
1232 handle_repend:
1233         tag->bio = NULL;
1234         if (bio->bio_buf->b_flags & B_FAILONDIS) {
1235                 kprintf("xa_strategy: disconnected, fail bp %p\n",
1236                         bio->bio_buf);
1237                 bio->bio_buf->b_error = ENXIO;
1238                 bio->bio_buf->b_flags |= B_ERROR;
1239                 biodone(bio);
1240                 bio = NULL;
1241                 kprintf("BIO CIRC FAILURE, FAIL BIO %p\n", bio);
1242         } else {
1243                 kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1244         }
1245         xa_done(tag, 0);
1246         if ((state->txcmd & DMSGF_DELETE) == 0)
1247                 kdmsg_msg_reply(msg, 0);
1248
1249         /*
1250          * Requeue the bio
1251          */
1252         if (bio) {
1253                 lockmgr(&sc->lk, LK_EXCLUSIVE);
1254                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1255                 lockmgr(&sc->lk, LK_RELEASE);
1256         }
1257         return (0);
1258 }
1259
1260 /*
1261  * Restart as much deferred I/O as we can.  The serializer is set and we
1262  * eat it (clear it) when done.
1263  *
1264  * Called with sc->lk held
1265  */
1266 static
1267 void
1268 xa_restart_deferred(xa_softc_t *sc)
1269 {
1270         kdmsg_state_t *span;
1271         kdmsg_msg_t *msg;
1272         xa_tag_t *tag;
1273         int error;
1274
1275         KKASSERT(sc->serializing);
1276
1277         /*
1278          * Determine if a restart is needed.
1279          */
1280         if (sc->opencnt == 0) {
1281                 /*
1282                  * Device is not open, nothing to do, eat serializing.
1283                  */
1284                 sc->serializing = 0;
1285                 wakeup(sc);
1286         } else if (sc->open_tag == NULL) {
1287                 /*
1288                  * BLK_OPEN required before we can restart any BIOs.
1289                  * Select the best LNK_SPAN to issue the BLK_OPEN under.
1290                  *
1291                  * serializing interlocks waiting open()s.
1292                  */
1293                 error = 0;
1294                 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1295                         if ((span->rxcmd & DMSGF_DELETE) == 0)
1296                                 break;
1297                 }
1298                 if (span == NULL)
1299                         error = ENXIO;
1300
1301                 if (error == 0) {
1302                         tag = xa_setup_cmd(sc, NULL);
1303                         if (tag == NULL)
1304                                 error = ENXIO;
1305                 }
1306                 if (error == 0) {
1307                         sc->open_tag = tag;
1308                         msg = kdmsg_msg_alloc(span,
1309                                               DMSG_BLK_OPEN |
1310                                               DMSGF_CREATE,
1311                                               xa_sync_completion, tag);
1312                         msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1313                         kprintf("xdisk: BLK_OPEN tag %p state %p "
1314                                 "span-state %p\n",
1315                                 tag, msg->state, span);
1316                         xa_start(tag, msg, 0);
1317                 }
1318                 if (error) {
1319                         sc->serializing = 0;
1320                         wakeup(sc);
1321                 }
1322                 /* else leave serializing set until BLK_OPEN response */
1323         } else {
1324                 /* nothing to do */
1325                 sc->serializing = 0;
1326                 wakeup(sc);
1327         }
1328 }