kernel: Remove numerous #include <sys/thread2.h>.
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66
67 #include <sys/buf2.h>
68
69 struct xa_softc;
70 struct xa_softc_tree;
71 RB_HEAD(xa_softc_tree, xa_softc);
72 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
73
74 static int xa_active;
75 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
76            "Number of active xdisk IOs");
77 static uint64_t xa_last;
78 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
79            "Offset of last xdisk IO");
80 static int xa_debug = 1;
81 SYSCTL_INT(_debug, OID_AUTO, xa_debug, CTLFLAG_RW, &xa_debug, 0,
82            "xdisk debugging");
83
84 /*
85  * Track a BIO tag
86  */
87 struct xa_tag {
88         TAILQ_ENTRY(xa_tag) entry;
89         struct xa_softc *sc;
90         dmsg_blk_error_t status;
91         kdmsg_state_t   *state;
92         struct bio      *bio;
93         int             waiting;
94         int             async;
95         int             done;
96 };
97
98 typedef struct xa_tag   xa_tag_t;
99
100 /*
101  * Track devices.
102  */
103 struct xa_softc {
104         struct kdmsg_state_list spanq;
105         RB_ENTRY(xa_softc) rbnode;
106         cdev_t          dev;
107         struct devstat  stats;
108         struct disk_info info;
109         struct disk     disk;
110         uuid_t          peer_id;
111         int             unit;
112         int             opencnt;
113         int             spancnt;
114         uint64_t        keyid;
115         int             serializing;
116         int             last_error;
117         int             terminating;
118         char            peer_label[64]; /* from LNK_SPAN host/dev */
119         char            pfs_label[64];  /* from LNK_SPAN serno */
120         xa_tag_t        *open_tag;
121         TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
122         TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
123         TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
124         struct lock     lk;
125 };
126
127 typedef struct xa_softc xa_softc_t;
128
129 struct xa_iocom {
130         TAILQ_ENTRY(xa_iocom) entry;
131         kdmsg_iocom_t   iocom;
132         xa_softc_t      dummysc;
133 };
134
135 typedef struct xa_iocom xa_iocom_t;
136
137 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
138 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
139 static struct xa_softc_tree xa_device_tree;
140
141 #define MAXTAGS         64      /* no real limit */
142
143 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
144 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
145 static void xaio_exit(kdmsg_iocom_t *iocom);
146 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
147
148 static void xa_terminate_check(struct xa_softc *sc);
149
150 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
151 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
152 static void xa_done(xa_tag_t *tag, int wasbio);
153 static void xa_release(xa_tag_t *tag, int wasbio);
154 static uint32_t xa_wait(xa_tag_t *tag);
155 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
156 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
157 static void xa_restart_deferred(xa_softc_t *sc);
158
159 #define xa_printf(level, ctl, ...)      \
160         if (xa_debug >= (level)) kprintf("xdisk: " ctl, __VA_ARGS__)
161
162 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
163
164 /*
165  * Control device, issue ioctls to create xa devices.
166  */
167 static d_open_t xdisk_open;
168 static d_close_t xdisk_close;
169 static d_ioctl_t xdisk_ioctl;
170
171 static struct dev_ops xdisk_ops = {
172         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
173         .d_open =       xdisk_open,
174         .d_close =      xdisk_close,
175         .d_ioctl =      xdisk_ioctl
176 };
177
178 /*
179  * XA disk devices
180  */
181 static d_open_t xa_open;
182 static d_close_t xa_close;
183 static d_ioctl_t xa_ioctl;
184 static d_strategy_t xa_strategy;
185 static d_psize_t xa_size;
186
187 static struct dev_ops xa_ops = {
188         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
189         .d_open =       xa_open,
190         .d_close =      xa_close,
191         .d_ioctl =      xa_ioctl,
192         .d_read =       physread,
193         .d_write =      physwrite,
194         .d_strategy =   xa_strategy,
195         .d_psize =      xa_size
196 };
197
198 static int xdisk_opencount;
199 static cdev_t xdisk_dev;
200 struct lock xdisk_lk;
201 static TAILQ_HEAD(, xa_iocom) xaiocomq;
202
203 /*
204  * Module initialization
205  */
206 static int
207 xdisk_modevent(module_t mod, int type, void *data)
208 {
209         switch (type) {
210         case MOD_LOAD:
211                 TAILQ_INIT(&xaiocomq);
212                 RB_INIT(&xa_device_tree);
213                 lockinit(&xdisk_lk, "xdisk", 0, 0);
214                 xdisk_dev = make_dev(&xdisk_ops, 0,
215                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
216                 break;
217         case MOD_UNLOAD:
218         case MOD_SHUTDOWN:
219                 if (!RB_EMPTY(&xa_device_tree))
220                         return (EBUSY);
221                 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
222                         return (EBUSY);
223                 if (xdisk_dev) {
224                         destroy_dev(xdisk_dev);
225                         xdisk_dev = NULL;
226                 }
227                 dev_ops_remove_all(&xdisk_ops);
228                 dev_ops_remove_all(&xa_ops);
229                 break;
230         default:
231                 break;
232         }
233         return 0;
234 }
235
236 DEV_MODULE(xdisk, xdisk_modevent, 0);
237
238 static int
239 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
240 {
241         return(strcmp(sc1->pfs_label, sc2->pfs_label));
242 }
243
244 /*
245  * Control device
246  */
247 static int
248 xdisk_open(struct dev_open_args *ap)
249 {
250         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
251         ++xdisk_opencount;
252         lockmgr(&xdisk_lk, LK_RELEASE);
253         return(0);
254 }
255
256 static int
257 xdisk_close(struct dev_close_args *ap)
258 {
259         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
260         --xdisk_opencount;
261         lockmgr(&xdisk_lk, LK_RELEASE);
262         return(0);
263 }
264
265 static int
266 xdisk_ioctl(struct dev_ioctl_args *ap)
267 {
268         int error;
269
270         switch(ap->a_cmd) {
271         case XDISKIOCATTACH:
272                 error = xdisk_attach((void *)ap->a_data);
273                 break;
274         case XDISKIOCDETACH:
275                 error = xdisk_detach((void *)ap->a_data);
276                 break;
277         default:
278                 error = ENOTTY;
279                 break;
280         }
281         return error;
282 }
283
284 /************************************************************************
285  *                              DMSG INTERFACE                          *
286  ************************************************************************/
287
288 static int
289 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
290 {
291         xa_iocom_t *xaio;
292         struct file *fp;
293
294         /*
295          * Normalize ioctl params
296          */
297         fp = holdfp(curthread, xaioc->fd, -1);
298         if (fp == NULL)
299                 return EINVAL;
300         xa_printf(1, "xdisk_attach fp=%p\n", fp);
301
302         /*
303          * See if the serial number is already present.  If we are
304          * racing a termination the disk subsystem may still have
305          * duplicate entries not yet removed so we wait a bit and
306          * retry.
307          */
308         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
309
310         xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
311         kdmsg_iocom_init(&xaio->iocom, xaio,
312                          KDMSG_IOCOMF_AUTOCONN,
313                          M_XDISK, xaio_rcvdmsg);
314         xaio->iocom.exit_func = xaio_exit;
315
316         kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
317
318         /*
319          * Setup our LNK_CONN advertisement for autoinitiate.
320          *
321          * Our filter is setup to only accept PEER_BLOCK advertisements.
322          * XXX no peer_id filter.
323          *
324          * We need a unique pfs_fsid to avoid confusion.
325          */
326         xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_CLIENT;
327         xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
328         xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
329         ksnprintf(xaio->iocom.auto_lnk_conn.peer_label,
330                   sizeof(xaio->iocom.auto_lnk_conn.peer_label),
331                   "%s/xdisk",
332                   hostname);
333         /* kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1); */
334
335         /*
336          * Setup our LNK_SPAN advertisement for autoinitiate
337          */
338         TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
339         kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
340
341         lockmgr(&xdisk_lk, LK_RELEASE);
342
343         return 0;
344 }
345
346 static int
347 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
348 {
349         return EINVAL;
350 }
351
352 /*
353  * Called from iocom core transmit thread upon disconnect.
354  */
355 static
356 void
357 xaio_exit(kdmsg_iocom_t *iocom)
358 {
359         xa_iocom_t *xaio = iocom->handle;
360
361         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
362         xa_printf(1, "%s", "xdisk_detach [xaio_exit()]\n");
363         TAILQ_REMOVE(&xaiocomq, xaio, entry);
364         lockmgr(&xdisk_lk, LK_RELEASE);
365
366         kdmsg_iocom_uninit(&xaio->iocom);
367
368         kfree(xaio, M_XDISK);
369 }
370
371 /*
372  * Called from iocom core to handle messages that the iocom core does not
373  * handle itself and for which a state function callback has not yet been
374  * established.
375  *
376  * We primarily care about LNK_SPAN transactions here.
377  */
378 static int
379 xaio_rcvdmsg(kdmsg_msg_t *msg)
380 {
381         kdmsg_state_t   *state = msg->state;
382         xa_iocom_t      *xaio = state->iocom->handle;
383         xa_softc_t      *sc;
384
385         if (state) {
386                 xa_printf(4,
387                         "xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
388                         state, state->rxcmd, state->txcmd,
389                         msg->any.head.cmd);
390         }
391         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
392
393         switch(msg->tcmd) {
394         case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
395                 /*
396                  * A LNK_SPAN transaction which is opened and closed
397                  * degenerately is not useful to us, just ignore it.
398                  */
399                 kdmsg_msg_reply(msg, 0);
400                 break;
401         case DMSG_LNK_SPAN | DMSGF_CREATE:
402                 /*
403                  * Manage the tracking node for the remote LNK_SPAN.
404                  *
405                  * Return a streaming result, leaving the transaction open
406                  * in both directions to allow sub-transactions.
407                  */
408                 bcopy(msg->any.lnk_span.peer_label, xaio->dummysc.peer_label,
409                       sizeof(xaio->dummysc.peer_label));
410                 xaio->dummysc.peer_label[
411                         sizeof(xaio->dummysc.peer_label) - 1] = 0;
412
413                 bcopy(msg->any.lnk_span.pfs_label, xaio->dummysc.pfs_label,
414                       sizeof(xaio->dummysc.pfs_label));
415                 xaio->dummysc.pfs_label[
416                         sizeof(xaio->dummysc.pfs_label) - 1] = 0;
417
418                 xa_printf(3, "LINK_SPAN state %p create for %s\n",
419                           msg->state, msg->any.lnk_span.pfs_label);
420
421                 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
422                 if (sc == NULL) {
423                         xa_softc_t *sctmp;
424                         xa_tag_t *tag;
425                         cdev_t dev;
426                         int unit;
427                         int n;
428
429                         sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
430                         bcopy(msg->any.lnk_span.peer_label, sc->peer_label,
431                               sizeof(sc->peer_label));
432                         sc->peer_label[sizeof(sc->peer_label) - 1] = 0;
433                         bcopy(msg->any.lnk_span.pfs_label, sc->pfs_label,
434                               sizeof(sc->pfs_label));
435                         sc->pfs_label[sizeof(sc->pfs_label) - 1] = 0;
436
437                         /* XXX FIXME O(N^2) */
438                         unit = -1;
439                         do {
440                                 ++unit;
441                                 RB_FOREACH(sctmp, xa_softc_tree,
442                                            &xa_device_tree) {
443                                         if (sctmp->unit == unit)
444                                                 break;
445                                 }
446                         } while (sctmp);
447
448                         sc->unit = unit;
449                         sc->serializing = 1;
450                         sc->spancnt = 1;
451                         lockinit(&sc->lk, "xalk", 0, 0);
452                         TAILQ_INIT(&sc->spanq);
453                         TAILQ_INIT(&sc->bioq);
454                         TAILQ_INIT(&sc->tag_freeq);
455                         TAILQ_INIT(&sc->tag_pendq);
456
457                         lockmgr(&sc->lk, LK_EXCLUSIVE);
458                         RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
459                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
460                         msg->state->any.xa_sc = sc;
461
462                         /*
463                          * Setup block device
464                          */
465                         for (n = 0; n < MAXTAGS; ++n) {
466                                 tag = kmalloc(sizeof(*tag),
467                                               M_XDISK, M_WAITOK|M_ZERO);
468                                 tag->sc = sc;
469                                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
470                         }
471
472                         if (sc->dev == NULL) {
473                                 dev = disk_create(unit, &sc->disk, &xa_ops);
474                                 dev->si_drv1 = sc;
475                                 sc->dev = dev;
476                                 devstat_add_entry(&sc->stats, "xa", unit,
477                                                   DEV_BSIZE,
478                                                   DEVSTAT_NO_ORDERED_TAGS,
479                                                   DEVSTAT_TYPE_DIRECT |
480                                                   DEVSTAT_TYPE_IF_OTHER,
481                                                   DEVSTAT_PRIORITY_OTHER);
482                         }
483
484                         sc->info.d_media_blksize =
485                                 msg->any.lnk_span.media.block.blksize;
486                         if (sc->info.d_media_blksize <= 0)
487                                 sc->info.d_media_blksize = 1;
488                         sc->info.d_media_blocks =
489                                 msg->any.lnk_span.media.block.bytes /
490                                 sc->info.d_media_blksize;
491                         sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
492                         sc->info.d_secpertrack = 32;
493                         sc->info.d_nheads = 64;
494                         sc->info.d_secpercyl = sc->info.d_secpertrack *
495                                                sc->info.d_nheads;
496                         sc->info.d_ncylinders = 0;
497                         if (sc->pfs_label[0])
498                                 sc->info.d_serialno = sc->pfs_label;
499                         /*
500                          * WARNING! disk_setdiskinfo() must be asynchronous
501                          *          because we are in the rxmsg thread.  If
502                          *          it is synchronous and issues more disk
503                          *          I/Os, we will deadlock.
504                          */
505                         disk_setdiskinfo(&sc->disk, &sc->info);
506                         xa_restart_deferred(sc);        /* eats serializing */
507                         lockmgr(&sc->lk, LK_RELEASE);
508                 } else {
509                         lockmgr(&sc->lk, LK_EXCLUSIVE);
510                         ++sc->spancnt;
511                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
512                         msg->state->any.xa_sc = sc;
513                         if (sc->serializing == 0 && sc->open_tag == NULL) {
514                                 sc->serializing = 1;
515                                 xa_restart_deferred(sc); /* eats serializing */
516                         }
517                         lockmgr(&sc->lk, LK_RELEASE);
518                         if (sc->dev && sc->dev->si_disk) {
519                                 xa_printf(1, "reprobe disk: %s\n",
520                                           sc->pfs_label);
521                                 disk_msg_send(DISK_DISK_REPROBE,
522                                               sc->dev->si_disk,
523                                               NULL);
524                         }
525                 }
526                 xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
527                 kdmsg_msg_result(msg, 0);
528                 break;
529         case DMSG_LNK_SPAN | DMSGF_DELETE:
530                 /*
531                  * Manage the tracking node for the remote LNK_SPAN.
532                  *
533                  * Return a final result, closing our end of the transaction.
534                  */
535                 sc = msg->state->any.xa_sc;
536                 xa_printf(3, "LINK_SPAN state %p delete for %s (sc=%p)\n",
537                           msg->state, (sc ? sc->pfs_label : "(null)"), sc);
538                 lockmgr(&sc->lk, LK_EXCLUSIVE);
539                 msg->state->any.xa_sc = NULL;
540                 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
541                 --sc->spancnt;
542
543                 xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
544
545                 /*
546                  * Spans can come and go as the graph stabilizes, so if
547                  * we lose a span along with sc->open_tag we may be able
548                  * to restart the I/Os on a different span.
549                  */
550                 if (sc->spancnt &&
551                     sc->serializing == 0 && sc->open_tag == NULL) {
552                         sc->serializing = 1;
553                         xa_restart_deferred(sc);
554                 }
555                 lockmgr(&sc->lk, LK_RELEASE);
556                 kdmsg_msg_reply(msg, 0);
557
558 #if 0
559                 /*
560                  * Termination
561                  */
562                 if (sc->spancnt == 0)
563                         xa_terminate_check(sc);
564 #endif
565                 break;
566         case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
567                 /*
568                  * Ignore unimplemented streaming replies on our LNK_SPAN
569                  * transaction.
570                  */
571                 xa_printf(3, "LINK_SPAN state %p delete+reply\n",
572                           msg->state);
573                 break;
574         case DMSG_LNK_SPAN | DMSGF_REPLY:
575                 /*
576                  * Ignore unimplemented streaming replies on our LNK_SPAN
577                  * transaction.
578                  */
579                 xa_printf(3, "LINK_SPAN state %p reply\n",
580                           msg->state);
581                 break;
582         case DMSG_DBG_SHELL:
583                 /*
584                  * Execute shell command (not supported atm).
585                  *
586                  * This is a one-way packet but if not (e.g. if part of
587                  * a streaming transaction), we will have already closed
588                  * our end.
589                  */
590                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
591                 break;
592         case DMSG_DBG_SHELL | DMSGF_REPLY:
593                 /*
594                  * Receive one or more replies to a shell command
595                  * that we sent.  Just dump it to the console.
596                  *
597                  * This is a one-way packet but if not (e.g. if
598                  * part of a streaming transaction), we will have
599                  * already closed our end.
600                  */
601                 if (msg->aux_data) {
602                         msg->aux_data[msg->aux_size - 1] = 0;
603                         xa_printf(0, "DEBUGMSG: %s\n", msg->aux_data);
604                 }
605                 break;
606         default:
607                 /*
608                  * Unsupported one-way message, streaming message, or
609                  * transaction.
610                  *
611                  * Terminate any unsupported transactions with an error
612                  * and ignore any unsupported streaming messages.
613                  *
614                  * NOTE: This case also includes DMSG_LNK_ERROR messages
615                  *       which might be one-way, replying to those would
616                  *       cause an infinite ping-pong.
617                  */
618                 if (msg->any.head.cmd & DMSGF_CREATE)
619                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
620                 break;
621         }
622         lockmgr(&xdisk_lk, LK_RELEASE);
623
624         return 0;
625 }
626
627 /*
628  * Determine if we can destroy the xa_softc.
629  *
630  * Called with xdisk_lk held.
631  */
632 static
633 void
634 xa_terminate_check(struct xa_softc *sc)
635 {
636         xa_tag_t *tag;
637
638         /*
639          * Determine if we can destroy the softc.
640          */
641         xa_printf(1, "Terminate check xa%d (%d,%d,%d) sc=%p ",
642                 sc->unit,
643                 sc->opencnt, sc->serializing, sc->spancnt,
644                 sc);
645
646         if (sc->opencnt || sc->serializing || sc->spancnt ||
647             TAILQ_FIRST(&sc->bioq) || TAILQ_FIRST(&sc->tag_pendq)) {
648                 xa_printf(1, "%s", "(leave intact)\n");
649                 return;
650         }
651
652         /*
653          * Remove from device tree, a race with a new incoming span
654          * will create a new softc and disk.
655          */
656         RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
657         sc->terminating = 1;
658
659         /*
660          * Device has to go first to prevent device ops races.
661          */
662         if (sc->dev) {
663                 disk_destroy(&sc->disk);
664                 devstat_remove_entry(&sc->stats);
665                 sc->dev->si_drv1 = NULL;
666                 sc->dev = NULL;
667         }
668
669         xa_printf(1, "%s", "(remove from tree)\n");
670         sc->serializing = 1;
671         KKASSERT(sc->opencnt == 0);
672         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
673
674         while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
675                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
676                 tag->sc = NULL;
677                 kfree(tag, M_XDISK);
678         }
679
680         kfree(sc, M_XDISK);
681 }
682
683 /************************************************************************
684  *                         XA DEVICE INTERFACE                          *
685  ************************************************************************/
686
687 static int
688 xa_open(struct dev_open_args *ap)
689 {
690         cdev_t dev = ap->a_head.a_dev;
691         xa_softc_t *sc;
692         int error;
693
694         dev->si_bsize_phys = 512;
695         dev->si_bsize_best = 32768;
696
697         /*
698          * Interlock open with opencnt, wait for attachment operations
699          * to finish.
700          */
701         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
702 again:
703         sc = dev->si_drv1;
704         if (sc == NULL) {
705                 lockmgr(&xdisk_lk, LK_RELEASE);
706                 return ENXIO;   /* raced destruction */
707         }
708         if (sc->serializing) {
709                 tsleep(sc, 0, "xarace", hz / 10);
710                 goto again;
711         }
712         if (sc->terminating) {
713                 lockmgr(&xdisk_lk, LK_RELEASE);
714                 return ENXIO;   /* raced destruction */
715         }
716         sc->serializing = 1;
717
718         /*
719          * Serialize initial open
720          */
721         if (sc->opencnt++ > 0) {
722                 sc->serializing = 0;
723                 wakeup(sc);
724                 lockmgr(&xdisk_lk, LK_RELEASE);
725                 return(0);
726         }
727
728         /*
729          * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
730          */
731         if (sc->open_tag == NULL) {
732                 lockmgr(&sc->lk, LK_EXCLUSIVE);
733                 xa_restart_deferred(sc); /* eats serializing */
734                 lockmgr(&sc->lk, LK_RELEASE);
735         } else {
736                 sc->serializing = 0;
737                 wakeup(sc);
738         }
739         lockmgr(&xdisk_lk, LK_RELEASE);
740
741         /*
742          * Wait for completion of the BLK_OPEN
743          */
744         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
745         while (sc->serializing)
746                 lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
747
748         error = sc->last_error;
749         if (error) {
750                 KKASSERT(sc->opencnt > 0);
751                 --sc->opencnt;
752                 xa_terminate_check(sc);
753                 sc = NULL;      /* sc may be invalid now */
754         }
755         lockmgr(&xdisk_lk, LK_RELEASE);
756
757         return (error);
758 }
759
760 static int
761 xa_close(struct dev_close_args *ap)
762 {
763         cdev_t dev = ap->a_head.a_dev;
764         xa_softc_t *sc;
765         xa_tag_t *tag;
766
767         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
768         sc = dev->si_drv1;
769         if (sc == NULL) {
770                 lockmgr(&sc->lk, LK_RELEASE);
771                 return ENXIO;   /* raced destruction */
772         }
773         if (sc->terminating) {
774                 lockmgr(&sc->lk, LK_RELEASE);
775                 return ENXIO;   /* raced destruction */
776         }
777         lockmgr(&sc->lk, LK_EXCLUSIVE);
778
779         /*
780          * NOTE: Clearing open_tag allows a concurrent open to re-open
781          *       the device and prevents autonomous completion of the tag.
782          */
783         if (sc->opencnt == 1 && sc->open_tag) {
784                 tag = sc->open_tag;
785                 sc->open_tag = NULL;
786                 lockmgr(&sc->lk, LK_RELEASE);
787                 kdmsg_state_reply(tag->state, 0);       /* close our side */
788                 xa_wait(tag);                           /* wait on remote */
789         } else {
790                 lockmgr(&sc->lk, LK_RELEASE);
791         }
792         KKASSERT(sc->opencnt > 0);
793         --sc->opencnt;
794         xa_terminate_check(sc);
795         lockmgr(&xdisk_lk, LK_RELEASE);
796
797         return(0);
798 }
799
800 static int
801 xa_strategy(struct dev_strategy_args *ap)
802 {
803         xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
804         xa_tag_t *tag;
805         struct bio *bio = ap->a_bio;
806
807         devstat_start_transaction(&sc->stats);
808         atomic_add_int(&xa_active, 1);
809         xa_last = bio->bio_offset;
810
811         /*
812          * If no tags are available NULL is returned and the bio is
813          * placed on sc->bioq.
814          */
815         lockmgr(&sc->lk, LK_EXCLUSIVE);
816         tag = xa_setup_cmd(sc, bio);
817         if (tag)
818                 xa_start(tag, NULL, 1);
819         lockmgr(&sc->lk, LK_RELEASE);
820
821         return(0);
822 }
823
824 static int
825 xa_ioctl(struct dev_ioctl_args *ap)
826 {
827         return(ENOTTY);
828 }
829
830 static int
831 xa_size(struct dev_psize_args *ap)
832 {
833         struct xa_softc *sc;
834
835         if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
836                 return (ENXIO);
837         ap->a_result = sc->info.d_media_blocks;
838         return (0);
839 }
840
841 /************************************************************************
842  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
843  ************************************************************************
844  *
845  * Implement tag/msg setup and related functions.
846  * Called with sc->lk held.
847  */
848 static xa_tag_t *
849 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
850 {
851         xa_tag_t *tag;
852
853         /*
854          * Only get a tag if we have a valid virtual circuit to the server.
855          */
856         if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
857                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
858                 tag->bio = bio;
859                 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
860         }
861
862         /*
863          * If we can't dispatch now and this is a bio, queue it for later.
864          */
865         if (tag == NULL && bio) {
866                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
867         }
868
869         return (tag);
870 }
871
872 /*
873  * Called with sc->lk held
874  */
875 static void
876 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
877 {
878         xa_softc_t *sc = tag->sc;
879
880         tag->done = 0;
881         tag->async = async;
882         tag->status.head.error = DMSG_ERR_IO;   /* fallback error */
883
884         if (msg == NULL) {
885                 struct bio *bio;
886                 struct buf *bp;
887                 kdmsg_state_t *trans;
888
889                 if (sc->opencnt == 0 || sc->open_tag == NULL) {
890                         TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
891                                 if ((trans->rxcmd & DMSGF_DELETE) == 0)
892                                         break;
893                         }
894                 } else {
895                         trans = sc->open_tag->state;
896                 }
897                 if (trans == NULL)
898                         goto skip;
899
900                 KKASSERT(tag->bio);
901                 bio = tag->bio;
902                 bp = bio->bio_buf;
903
904                 switch(bp->b_cmd) {
905                 case BUF_CMD_READ:
906                         msg = kdmsg_msg_alloc(trans,
907                                               DMSG_BLK_READ |
908                                               DMSGF_CREATE |
909                                               DMSGF_DELETE,
910                                               xa_bio_completion, tag);
911                         msg->any.blk_read.keyid = sc->keyid;
912                         msg->any.blk_read.offset = bio->bio_offset;
913                         msg->any.blk_read.bytes = bp->b_bcount;
914                         break;
915                 case BUF_CMD_WRITE:
916                         msg = kdmsg_msg_alloc(trans,
917                                               DMSG_BLK_WRITE |
918                                               DMSGF_CREATE | DMSGF_DELETE,
919                                               xa_bio_completion, tag);
920                         msg->any.blk_write.keyid = sc->keyid;
921                         msg->any.blk_write.offset = bio->bio_offset;
922                         msg->any.blk_write.bytes = bp->b_bcount;
923                         msg->aux_data = bp->b_data;
924                         msg->aux_size = bp->b_bcount;
925                         break;
926                 case BUF_CMD_FLUSH:
927                         msg = kdmsg_msg_alloc(trans,
928                                               DMSG_BLK_FLUSH |
929                                               DMSGF_CREATE | DMSGF_DELETE,
930                                               xa_bio_completion, tag);
931                         msg->any.blk_flush.keyid = sc->keyid;
932                         msg->any.blk_flush.offset = bio->bio_offset;
933                         msg->any.blk_flush.bytes = bp->b_bcount;
934                         break;
935                 case BUF_CMD_FREEBLKS:
936                         msg = kdmsg_msg_alloc(trans,
937                                               DMSG_BLK_FREEBLKS |
938                                               DMSGF_CREATE | DMSGF_DELETE,
939                                               xa_bio_completion, tag);
940                         msg->any.blk_freeblks.keyid = sc->keyid;
941                         msg->any.blk_freeblks.offset = bio->bio_offset;
942                         msg->any.blk_freeblks.bytes = bp->b_bcount;
943                         break;
944                 default:
945                         bp->b_flags |= B_ERROR;
946                         bp->b_error = EIO;
947                         devstat_end_transaction_buf(&sc->stats, bp);
948                         atomic_add_int(&xa_active, -1);
949                         biodone(bio);
950                         tag->bio = NULL;
951                         break;
952                 }
953         }
954
955         /*
956          * If no msg was allocated we likely could not find a good span.
957          */
958 skip:
959         if (msg) {
960                 /*
961                  * Message was passed in or constructed.
962                  */
963                 tag->state = msg->state;
964                 lockmgr(&sc->lk, LK_RELEASE);
965                 kdmsg_msg_write(msg);
966                 lockmgr(&sc->lk, LK_EXCLUSIVE);
967         } else if (tag->bio &&
968                    (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) {
969                 /*
970                  * No spans available but BIO is not allowed to fail
971                  * on connectivity problems.  Requeue the BIO.
972                  */
973                 TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act);
974                 tag->bio = NULL;
975                 lockmgr(&sc->lk, LK_RELEASE);
976                 xa_done(tag, 1);
977                 lockmgr(&sc->lk, LK_EXCLUSIVE);
978         } else {
979                 /*
980                  * No spans available, bio is allowed to fail.
981                  */
982                 lockmgr(&sc->lk, LK_RELEASE);
983                 tag->status.head.error = DMSG_ERR_IO;
984                 xa_done(tag, 1);
985                 lockmgr(&sc->lk, LK_EXCLUSIVE);
986         }
987 }
988
989 static uint32_t
990 xa_wait(xa_tag_t *tag)
991 {
992         xa_softc_t *sc = tag->sc;
993         uint32_t error;
994
995         lockmgr(&sc->lk, LK_EXCLUSIVE);
996         tag->waiting = 1;
997         while (tag->done == 0)
998                 lksleep(tag, &sc->lk, 0, "xawait", 0);
999         lockmgr(&sc->lk, LK_RELEASE);
1000
1001         error = tag->status.head.error;
1002         tag->waiting = 0;
1003         xa_release(tag, 0);
1004
1005         return error;
1006 }
1007
1008 static void
1009 xa_done(xa_tag_t *tag, int wasbio)
1010 {
1011         KKASSERT(tag->bio == NULL);
1012
1013         tag->state = NULL;
1014         tag->done = 1;
1015         if (tag->waiting)
1016                 wakeup(tag);
1017         if (tag->async)
1018                 xa_release(tag, wasbio);
1019 }
1020
1021 /*
1022  * Release a tag.  If everything looks ok and there are pending BIOs
1023  * (due to all tags in-use), we can use the tag to start the next BIO.
1024  * Do not try to restart if the connection is currently failed.
1025  */
1026 static
1027 void
1028 xa_release(xa_tag_t *tag, int wasbio)
1029 {
1030         xa_softc_t *sc = tag->sc;
1031         struct bio *bio;
1032
1033         if ((bio = tag->bio) != NULL) {
1034                 struct buf *bp = bio->bio_buf;
1035
1036                 bp->b_error = EIO;
1037                 bp->b_flags |= B_ERROR;
1038                 devstat_end_transaction_buf(&sc->stats, bp);
1039                 atomic_add_int(&xa_active, -1);
1040                 biodone(bio);
1041                 tag->bio = NULL;
1042         }
1043
1044         lockmgr(&sc->lk, LK_EXCLUSIVE);
1045
1046         if (wasbio && sc->open_tag &&
1047             (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1048                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1049                 tag->bio = bio;
1050                 xa_start(tag, NULL, 1);
1051         } else {
1052                 TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1053                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1054         }
1055         lockmgr(&sc->lk, LK_RELEASE);
1056 }
1057
1058 /*
1059  * Handle messages under the BLKOPEN transaction.
1060  */
1061 static int
1062 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1063 {
1064         xa_tag_t *tag = state->any.any;
1065         xa_softc_t *sc;
1066         struct bio *bio;
1067
1068         /*
1069          * If the tag has been cleaned out we already closed our side
1070          * of the transaction and we are waiting for the other side to
1071          * close.
1072          */
1073         xa_printf(1, "xa_sync_completion: tag %p msg %08x state %p\n",
1074                   tag, msg->any.head.cmd, msg->state);
1075
1076         if (tag == NULL) {
1077                 if (msg->any.head.cmd & DMSGF_CREATE)
1078                         kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1079                 return 0;
1080         }
1081         sc = tag->sc;
1082
1083         /*
1084          * Validate the tag
1085          */
1086         lockmgr(&sc->lk, LK_EXCLUSIVE);
1087
1088         /*
1089          * Handle initial response to our open and restart any deferred
1090          * BIOs on success.
1091          *
1092          * NOTE: DELETE may also be set.
1093          */
1094         if (msg->any.head.cmd & DMSGF_CREATE) {
1095                 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1096                 case DMSG_LNK_ERROR | DMSGF_REPLY:
1097                         bzero(&tag->status, sizeof(tag->status));
1098                         tag->status.head = msg->any.head;
1099                         break;
1100                 case DMSG_BLK_ERROR | DMSGF_REPLY:
1101                         tag->status = msg->any.blk_error;
1102                         break;
1103                 }
1104                 sc->last_error = tag->status.head.error;
1105                 xa_printf(1, "blk_open completion status %d\n",
1106                           sc->last_error);
1107                 if (sc->last_error == 0) {
1108                         while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1109                                 tag = xa_setup_cmd(sc, NULL);
1110                                 if (tag == NULL)
1111                                         break;
1112                                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1113                                 tag->bio = bio;
1114                                 xa_start(tag, NULL, 1);
1115                         }
1116                 }
1117                 sc->serializing = 0;
1118                 wakeup(sc);
1119         }
1120
1121         /*
1122          * Handle unexpected termination (or lost comm channel) from other
1123          * side.  Autonomous completion only if open_tag matches,
1124          * otherwise another thread is probably waiting on the tag.
1125          *
1126          * (see xa_close() for other interactions)
1127          */
1128         if (msg->any.head.cmd & DMSGF_DELETE) {
1129                 kdmsg_state_reply(tag->state, 0);
1130                 if (sc->open_tag == tag) {
1131                         sc->open_tag = NULL;
1132                         xa_done(tag, 0);
1133                 } else {
1134                         tag->async = 0;
1135                         xa_done(tag, 0);
1136                 }
1137         }
1138         lockmgr(&sc->lk, LK_RELEASE);
1139
1140         return (0);
1141 }
1142
1143 static int
1144 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1145 {
1146         xa_tag_t *tag = state->any.any;
1147         xa_softc_t *sc = tag->sc;
1148         struct bio *bio;
1149         struct buf *bp;
1150
1151         /*
1152          * Get the bio from the tag.  If no bio is present we just do
1153          * 'done' handling.
1154          */
1155         if ((bio = tag->bio) == NULL)
1156                 goto handle_done;
1157         bp = bio->bio_buf;
1158
1159         /*
1160          * Process return status
1161          */
1162         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1163         case DMSG_LNK_ERROR | DMSGF_REPLY:
1164                 bzero(&tag->status, sizeof(tag->status));
1165                 tag->status.head = msg->any.head;
1166                 if (tag->status.head.error)
1167                         tag->status.resid = bp->b_bcount;
1168                 else
1169                         tag->status.resid = 0;
1170                 break;
1171         case DMSG_BLK_ERROR | DMSGF_REPLY:
1172                 tag->status = msg->any.blk_error;
1173                 break;
1174         }
1175
1176         /*
1177          * If the device is open stall the bio on DMSG errors.  If an
1178          * actual I/O error occured on the remote device, DMSG_ERR_IO
1179          * will be returned.
1180          */
1181         if (tag->status.head.error &&
1182             (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1183                 if (tag->status.head.error != DMSG_ERR_IO)
1184                         goto handle_repend;
1185         }
1186
1187         /*
1188          * Process bio completion
1189          *
1190          * For reads any returned data is zero-extended if necessary, so
1191          * the server can short-cut any all-zeros reads if it desires.
1192          */
1193         switch(bp->b_cmd) {
1194         case BUF_CMD_READ:
1195                 if (msg->aux_data && msg->aux_size) {
1196                         if (msg->aux_size < bp->b_bcount) {
1197                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1198                                 bzero(bp->b_data + msg->aux_size,
1199                                       bp->b_bcount - msg->aux_size);
1200                         } else {
1201                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1202                         }
1203                 } else {
1204                         bzero(bp->b_data, bp->b_bcount);
1205                 }
1206                 /* fall through */
1207         case BUF_CMD_WRITE:
1208         case BUF_CMD_FLUSH:
1209         case BUF_CMD_FREEBLKS:
1210         default:
1211                 if (tag->status.resid > bp->b_bcount)
1212                         tag->status.resid = bp->b_bcount;
1213                 bp->b_resid = tag->status.resid;
1214                 if (tag->status.head.error != 0) {
1215                         bp->b_error = EIO;
1216                         bp->b_flags |= B_ERROR;
1217                 } else {
1218                         bp->b_resid = 0;
1219                 }
1220                 devstat_end_transaction_buf(&sc->stats, bp);
1221                 atomic_add_int(&xa_active, -1);
1222                 biodone(bio);
1223                 tag->bio = NULL;
1224                 break;
1225         }
1226
1227         /*
1228          * Handle completion of the transaction.  If the bioq is not empty
1229          * we can initiate another bio on the same tag.
1230          *
1231          * NOTE: Most of our transactions will be single-message
1232          *       CREATE+DELETEs, so we won't have to terminate the
1233          *       transaction separately, here.  But just in case they
1234          *       aren't be sure to terminate the transaction.
1235          */
1236 handle_done:
1237         if (msg->any.head.cmd & DMSGF_DELETE) {
1238                 xa_done(tag, 1);
1239                 if ((state->txcmd & DMSGF_DELETE) == 0)
1240                         kdmsg_msg_reply(msg, 0);
1241         }
1242         return (0);
1243
1244         /*
1245          * Handle the case where the transaction failed due to a
1246          * connectivity issue.  The tag is put away with wasbio=0
1247          * and we put the BIO back onto the bioq for a later restart.
1248          *
1249          * probe I/Os (where the device is not open) will be failed
1250          * instead of requeued.
1251          */
1252 handle_repend:
1253         tag->bio = NULL;
1254         if (bio->bio_buf->b_flags & B_FAILONDIS) {
1255                 xa_printf(1, "xa_strategy: lost link, fail probe bp %p\n",
1256                           bio->bio_buf);
1257                 bio->bio_buf->b_error = ENXIO;
1258                 bio->bio_buf->b_flags |= B_ERROR;
1259                 biodone(bio);
1260                 bio = NULL;
1261         } else {
1262                 xa_printf(1, "xa_strategy: lost link, requeue bp %p\n",
1263                           bio->bio_buf);
1264         }
1265         xa_done(tag, 0);
1266         if ((state->txcmd & DMSGF_DELETE) == 0)
1267                 kdmsg_msg_reply(msg, 0);
1268
1269         /*
1270          * Requeue the bio
1271          */
1272         if (bio) {
1273                 lockmgr(&sc->lk, LK_EXCLUSIVE);
1274                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1275                 lockmgr(&sc->lk, LK_RELEASE);
1276         }
1277         return (0);
1278 }
1279
1280 /*
1281  * Restart as much deferred I/O as we can.  The serializer is set and we
1282  * eat it (clear it) when done.
1283  *
1284  * Called with sc->lk held
1285  */
1286 static
1287 void
1288 xa_restart_deferred(xa_softc_t *sc)
1289 {
1290         kdmsg_state_t *span;
1291         kdmsg_msg_t *msg;
1292         xa_tag_t *tag;
1293         int error;
1294
1295         KKASSERT(sc->serializing);
1296
1297         /*
1298          * Determine if a restart is needed.
1299          */
1300         if (sc->opencnt == 0) {
1301                 /*
1302                  * Device is not open, nothing to do, eat serializing.
1303                  */
1304                 sc->serializing = 0;
1305                 wakeup(sc);
1306         } else if (sc->open_tag == NULL) {
1307                 /*
1308                  * BLK_OPEN required before we can restart any BIOs.
1309                  * Select the best LNK_SPAN to issue the BLK_OPEN under.
1310                  *
1311                  * serializing interlocks waiting open()s.
1312                  */
1313                 error = 0;
1314                 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1315                         if ((span->rxcmd & DMSGF_DELETE) == 0)
1316                                 break;
1317                 }
1318                 if (span == NULL)
1319                         error = ENXIO;
1320
1321                 if (error == 0) {
1322                         tag = xa_setup_cmd(sc, NULL);
1323                         if (tag == NULL)
1324                                 error = ENXIO;
1325                 }
1326                 if (error == 0) {
1327                         sc->open_tag = tag;
1328                         msg = kdmsg_msg_alloc(span,
1329                                               DMSG_BLK_OPEN |
1330                                               DMSGF_CREATE,
1331                                               xa_sync_completion, tag);
1332                         msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1333                         xa_printf(1,
1334                                   "BLK_OPEN tag %p state %p "
1335                                   "span-state %p\n",
1336                                   tag, msg->state, span);
1337                         xa_start(tag, msg, 0);
1338                 }
1339                 if (error) {
1340                         sc->serializing = 0;
1341                         wakeup(sc);
1342                 }
1343                 /* else leave serializing set until BLK_OPEN response */
1344         } else {
1345                 /* nothing to do */
1346                 sc->serializing = 0;
1347                 wakeup(sc);
1348         }
1349 }