Merge branch 'vendor/OPENSSL'
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66
67 #include <sys/buf2.h>
68 #include <sys/thread2.h>
69
70 struct xa_softc;
71 struct xa_softc_tree;
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
74
75 static int xa_active;
76 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
77            "Number of active xdisk IOs");
78 static uint64_t xa_last;
79 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
80            "Offset of last xdisk IO");
81 static int xa_debug = 1;
82 SYSCTL_INT(_debug, OID_AUTO, xa_debug, CTLFLAG_RW, &xa_debug, 0,
83            "xdisk debugging");
84
85 /*
86  * Track a BIO tag
87  */
88 struct xa_tag {
89         TAILQ_ENTRY(xa_tag) entry;
90         struct xa_softc *sc;
91         dmsg_blk_error_t status;
92         kdmsg_state_t   *state;
93         struct bio      *bio;
94         int             waiting;
95         int             async;
96         int             done;
97 };
98
99 typedef struct xa_tag   xa_tag_t;
100
101 /*
102  * Track devices.
103  */
104 struct xa_softc {
105         struct kdmsg_state_list spanq;
106         RB_ENTRY(xa_softc) rbnode;
107         cdev_t          dev;
108         struct devstat  stats;
109         struct disk_info info;
110         struct disk     disk;
111         uuid_t          peer_id;
112         int             unit;
113         int             opencnt;
114         int             spancnt;
115         uint64_t        keyid;
116         int             serializing;
117         int             last_error;
118         int             terminating;
119         char            peer_label[64]; /* from LNK_SPAN host/dev */
120         char            pfs_label[64];  /* from LNK_SPAN serno */
121         xa_tag_t        *open_tag;
122         TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
123         TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
124         TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
125         struct lock     lk;
126 };
127
128 typedef struct xa_softc xa_softc_t;
129
130 struct xa_iocom {
131         TAILQ_ENTRY(xa_iocom) entry;
132         kdmsg_iocom_t   iocom;
133         xa_softc_t      dummysc;
134 };
135
136 typedef struct xa_iocom xa_iocom_t;
137
138 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
139 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
140 static struct xa_softc_tree xa_device_tree;
141
142 #define MAXTAGS         64      /* no real limit */
143
144 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
145 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
146 static void xaio_exit(kdmsg_iocom_t *iocom);
147 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
148
149 static void xa_terminate_check(struct xa_softc *sc);
150
151 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
152 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
153 static void xa_done(xa_tag_t *tag, int wasbio);
154 static void xa_release(xa_tag_t *tag, int wasbio);
155 static uint32_t xa_wait(xa_tag_t *tag);
156 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
157 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
158 static void xa_restart_deferred(xa_softc_t *sc);
159
160 #define xa_printf(level, ctl, ...)      \
161         if (xa_debug >= (level)) kprintf("xdisk: " ctl, __VA_ARGS__)
162
163 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
164
165 /*
166  * Control device, issue ioctls to create xa devices.
167  */
168 static d_open_t xdisk_open;
169 static d_close_t xdisk_close;
170 static d_ioctl_t xdisk_ioctl;
171
172 static struct dev_ops xdisk_ops = {
173         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
174         .d_open =       xdisk_open,
175         .d_close =      xdisk_close,
176         .d_ioctl =      xdisk_ioctl
177 };
178
179 /*
180  * XA disk devices
181  */
182 static d_open_t xa_open;
183 static d_close_t xa_close;
184 static d_ioctl_t xa_ioctl;
185 static d_strategy_t xa_strategy;
186 static d_psize_t xa_size;
187
188 static struct dev_ops xa_ops = {
189         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
190         .d_open =       xa_open,
191         .d_close =      xa_close,
192         .d_ioctl =      xa_ioctl,
193         .d_read =       physread,
194         .d_write =      physwrite,
195         .d_strategy =   xa_strategy,
196         .d_psize =      xa_size
197 };
198
199 static int xdisk_opencount;
200 static cdev_t xdisk_dev;
201 struct lock xdisk_lk;
202 static TAILQ_HEAD(, xa_iocom) xaiocomq;
203
204 /*
205  * Module initialization
206  */
207 static int
208 xdisk_modevent(module_t mod, int type, void *data)
209 {
210         switch (type) {
211         case MOD_LOAD:
212                 TAILQ_INIT(&xaiocomq);
213                 RB_INIT(&xa_device_tree);
214                 lockinit(&xdisk_lk, "xdisk", 0, 0);
215                 xdisk_dev = make_dev(&xdisk_ops, 0,
216                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
217                 break;
218         case MOD_UNLOAD:
219         case MOD_SHUTDOWN:
220                 if (!RB_EMPTY(&xa_device_tree))
221                         return (EBUSY);
222                 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
223                         return (EBUSY);
224                 if (xdisk_dev) {
225                         destroy_dev(xdisk_dev);
226                         xdisk_dev = NULL;
227                 }
228                 dev_ops_remove_all(&xdisk_ops);
229                 dev_ops_remove_all(&xa_ops);
230                 break;
231         default:
232                 break;
233         }
234         return 0;
235 }
236
237 DEV_MODULE(xdisk, xdisk_modevent, 0);
238
239 static int
240 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
241 {
242         return(strcmp(sc1->pfs_label, sc2->pfs_label));
243 }
244
245 /*
246  * Control device
247  */
248 static int
249 xdisk_open(struct dev_open_args *ap)
250 {
251         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
252         ++xdisk_opencount;
253         lockmgr(&xdisk_lk, LK_RELEASE);
254         return(0);
255 }
256
257 static int
258 xdisk_close(struct dev_close_args *ap)
259 {
260         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
261         --xdisk_opencount;
262         lockmgr(&xdisk_lk, LK_RELEASE);
263         return(0);
264 }
265
266 static int
267 xdisk_ioctl(struct dev_ioctl_args *ap)
268 {
269         int error;
270
271         switch(ap->a_cmd) {
272         case XDISKIOCATTACH:
273                 error = xdisk_attach((void *)ap->a_data);
274                 break;
275         case XDISKIOCDETACH:
276                 error = xdisk_detach((void *)ap->a_data);
277                 break;
278         default:
279                 error = ENOTTY;
280                 break;
281         }
282         return error;
283 }
284
285 /************************************************************************
286  *                              DMSG INTERFACE                          *
287  ************************************************************************/
288
289 static int
290 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
291 {
292         xa_iocom_t *xaio;
293         struct file *fp;
294
295         /*
296          * Normalize ioctl params
297          */
298         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
299         if (fp == NULL)
300                 return EINVAL;
301         xa_printf(1, "xdisk_attach fp=%p\n", fp);
302
303         /*
304          * See if the serial number is already present.  If we are
305          * racing a termination the disk subsystem may still have
306          * duplicate entries not yet removed so we wait a bit and
307          * retry.
308          */
309         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
310
311         xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
312         kdmsg_iocom_init(&xaio->iocom, xaio,
313                          KDMSG_IOCOMF_AUTOCONN,
314                          M_XDISK, xaio_rcvdmsg);
315         xaio->iocom.exit_func = xaio_exit;
316
317         kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
318
319         /*
320          * Setup our LNK_CONN advertisement for autoinitiate.
321          *
322          * Our filter is setup to only accept PEER_BLOCK advertisements.
323          * XXX no peer_id filter.
324          *
325          * We need a unique pfs_fsid to avoid confusion.
326          */
327         xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_CLIENT;
328         xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
329         xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
330         ksnprintf(xaio->iocom.auto_lnk_conn.peer_label,
331                   sizeof(xaio->iocom.auto_lnk_conn.peer_label),
332                   "%s/xdisk",
333                   hostname);
334         /* kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1); */
335
336         /*
337          * Setup our LNK_SPAN advertisement for autoinitiate
338          */
339         TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
340         kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
341
342         lockmgr(&xdisk_lk, LK_RELEASE);
343
344         return 0;
345 }
346
347 static int
348 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
349 {
350         return EINVAL;
351 }
352
353 /*
354  * Called from iocom core transmit thread upon disconnect.
355  */
356 static
357 void
358 xaio_exit(kdmsg_iocom_t *iocom)
359 {
360         xa_iocom_t *xaio = iocom->handle;
361
362         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
363         xa_printf(1, "%s", "xdisk_detach [xaio_exit()]\n");
364         TAILQ_REMOVE(&xaiocomq, xaio, entry);
365         lockmgr(&xdisk_lk, LK_RELEASE);
366
367         kdmsg_iocom_uninit(&xaio->iocom);
368
369         kfree(xaio, M_XDISK);
370 }
371
372 /*
373  * Called from iocom core to handle messages that the iocom core does not
374  * handle itself and for which a state function callback has not yet been
375  * established.
376  *
377  * We primarily care about LNK_SPAN transactions here.
378  */
379 static int
380 xaio_rcvdmsg(kdmsg_msg_t *msg)
381 {
382         kdmsg_state_t   *state = msg->state;
383         xa_iocom_t      *xaio = state->iocom->handle;
384         xa_softc_t      *sc;
385
386         if (state) {
387                 xa_printf(4,
388                         "xdisk - rcvmsg state=%p rx=%08x tx=%08x msgcmd=%08x\n",
389                         state, state->rxcmd, state->txcmd,
390                         msg->any.head.cmd);
391         }
392         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
393
394         switch(msg->tcmd) {
395         case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
396                 /*
397                  * A LNK_SPAN transaction which is opened and closed
398                  * degenerately is not useful to us, just ignore it.
399                  */
400                 kdmsg_msg_reply(msg, 0);
401                 break;
402         case DMSG_LNK_SPAN | DMSGF_CREATE:
403                 /*
404                  * Manage the tracking node for the remote LNK_SPAN.
405                  *
406                  * Return a streaming result, leaving the transaction open
407                  * in both directions to allow sub-transactions.
408                  */
409                 bcopy(msg->any.lnk_span.peer_label, xaio->dummysc.peer_label,
410                       sizeof(xaio->dummysc.peer_label));
411                 xaio->dummysc.peer_label[
412                         sizeof(xaio->dummysc.peer_label) - 1] = 0;
413
414                 bcopy(msg->any.lnk_span.pfs_label, xaio->dummysc.pfs_label,
415                       sizeof(xaio->dummysc.pfs_label));
416                 xaio->dummysc.pfs_label[
417                         sizeof(xaio->dummysc.pfs_label) - 1] = 0;
418
419                 xa_printf(3, "LINK_SPAN state %p create for %s\n",
420                           msg->state, msg->any.lnk_span.pfs_label);
421
422                 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
423                 if (sc == NULL) {
424                         xa_softc_t *sctmp;
425                         xa_tag_t *tag;
426                         cdev_t dev;
427                         int unit;
428                         int n;
429
430                         sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
431                         bcopy(msg->any.lnk_span.peer_label, sc->peer_label,
432                               sizeof(sc->peer_label));
433                         sc->peer_label[sizeof(sc->peer_label) - 1] = 0;
434                         bcopy(msg->any.lnk_span.pfs_label, sc->pfs_label,
435                               sizeof(sc->pfs_label));
436                         sc->pfs_label[sizeof(sc->pfs_label) - 1] = 0;
437
438                         /* XXX FIXME O(N^2) */
439                         unit = -1;
440                         do {
441                                 ++unit;
442                                 RB_FOREACH(sctmp, xa_softc_tree,
443                                            &xa_device_tree) {
444                                         if (sctmp->unit == unit)
445                                                 break;
446                                 }
447                         } while (sctmp);
448
449                         sc->unit = unit;
450                         sc->serializing = 1;
451                         sc->spancnt = 1;
452                         lockinit(&sc->lk, "xalk", 0, 0);
453                         TAILQ_INIT(&sc->spanq);
454                         TAILQ_INIT(&sc->bioq);
455                         TAILQ_INIT(&sc->tag_freeq);
456                         TAILQ_INIT(&sc->tag_pendq);
457
458                         lockmgr(&sc->lk, LK_EXCLUSIVE);
459                         RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
460                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
461                         msg->state->any.xa_sc = sc;
462
463                         /*
464                          * Setup block device
465                          */
466                         for (n = 0; n < MAXTAGS; ++n) {
467                                 tag = kmalloc(sizeof(*tag),
468                                               M_XDISK, M_WAITOK|M_ZERO);
469                                 tag->sc = sc;
470                                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
471                         }
472
473                         if (sc->dev == NULL) {
474                                 dev = disk_create(unit, &sc->disk, &xa_ops);
475                                 dev->si_drv1 = sc;
476                                 sc->dev = dev;
477                                 devstat_add_entry(&sc->stats, "xa", unit,
478                                                   DEV_BSIZE,
479                                                   DEVSTAT_NO_ORDERED_TAGS,
480                                                   DEVSTAT_TYPE_DIRECT |
481                                                   DEVSTAT_TYPE_IF_OTHER,
482                                                   DEVSTAT_PRIORITY_OTHER);
483                         }
484
485                         sc->info.d_media_blksize =
486                                 msg->any.lnk_span.media.block.blksize;
487                         if (sc->info.d_media_blksize <= 0)
488                                 sc->info.d_media_blksize = 1;
489                         sc->info.d_media_blocks =
490                                 msg->any.lnk_span.media.block.bytes /
491                                 sc->info.d_media_blksize;
492                         sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
493                         sc->info.d_secpertrack = 32;
494                         sc->info.d_nheads = 64;
495                         sc->info.d_secpercyl = sc->info.d_secpertrack *
496                                                sc->info.d_nheads;
497                         sc->info.d_ncylinders = 0;
498                         if (sc->pfs_label[0])
499                                 sc->info.d_serialno = sc->pfs_label;
500                         /*
501                          * WARNING! disk_setdiskinfo() must be asynchronous
502                          *          because we are in the rxmsg thread.  If
503                          *          it is synchronous and issues more disk
504                          *          I/Os, we will deadlock.
505                          */
506                         disk_setdiskinfo(&sc->disk, &sc->info);
507                         xa_restart_deferred(sc);        /* eats serializing */
508                         lockmgr(&sc->lk, LK_RELEASE);
509                 } else {
510                         lockmgr(&sc->lk, LK_EXCLUSIVE);
511                         ++sc->spancnt;
512                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
513                         msg->state->any.xa_sc = sc;
514                         if (sc->serializing == 0 && sc->open_tag == NULL) {
515                                 sc->serializing = 1;
516                                 xa_restart_deferred(sc); /* eats serializing */
517                         }
518                         lockmgr(&sc->lk, LK_RELEASE);
519                         if (sc->dev && sc->dev->si_disk) {
520                                 xa_printf(1, "reprobe disk: %s\n",
521                                           sc->pfs_label);
522                                 disk_msg_send(DISK_DISK_REPROBE,
523                                               sc->dev->si_disk,
524                                               NULL);
525                         }
526                 }
527                 xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
528                 kdmsg_msg_result(msg, 0);
529                 break;
530         case DMSG_LNK_SPAN | DMSGF_DELETE:
531                 /*
532                  * Manage the tracking node for the remote LNK_SPAN.
533                  *
534                  * Return a final result, closing our end of the transaction.
535                  */
536                 sc = msg->state->any.xa_sc;
537                 xa_printf(3, "LINK_SPAN state %p delete for %s (sc=%p)\n",
538                           msg->state, (sc ? sc->pfs_label : "(null)"), sc);
539                 lockmgr(&sc->lk, LK_EXCLUSIVE);
540                 msg->state->any.xa_sc = NULL;
541                 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
542                 --sc->spancnt;
543
544                 xa_printf(2, "sc %p spancnt %d\n", sc, sc->spancnt);
545
546                 /*
547                  * Spans can come and go as the graph stabilizes, so if
548                  * we lose a span along with sc->open_tag we may be able
549                  * to restart the I/Os on a different span.
550                  */
551                 if (sc->spancnt &&
552                     sc->serializing == 0 && sc->open_tag == NULL) {
553                         sc->serializing = 1;
554                         xa_restart_deferred(sc);
555                 }
556                 lockmgr(&sc->lk, LK_RELEASE);
557                 kdmsg_msg_reply(msg, 0);
558
559 #if 0
560                 /*
561                  * Termination
562                  */
563                 if (sc->spancnt == 0)
564                         xa_terminate_check(sc);
565 #endif
566                 break;
567         case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
568                 /*
569                  * Ignore unimplemented streaming replies on our LNK_SPAN
570                  * transaction.
571                  */
572                 xa_printf(3, "LINK_SPAN state %p delete+reply\n",
573                           msg->state);
574                 break;
575         case DMSG_LNK_SPAN | DMSGF_REPLY:
576                 /*
577                  * Ignore unimplemented streaming replies on our LNK_SPAN
578                  * transaction.
579                  */
580                 xa_printf(3, "LINK_SPAN state %p reply\n",
581                           msg->state);
582                 break;
583         case DMSG_DBG_SHELL:
584                 /*
585                  * Execute shell command (not supported atm).
586                  *
587                  * This is a one-way packet but if not (e.g. if part of
588                  * a streaming transaction), we will have already closed
589                  * our end.
590                  */
591                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
592                 break;
593         case DMSG_DBG_SHELL | DMSGF_REPLY:
594                 /*
595                  * Receive one or more replies to a shell command
596                  * that we sent.  Just dump it to the console.
597                  *
598                  * This is a one-way packet but if not (e.g. if
599                  * part of a streaming transaction), we will have
600                  * already closed our end.
601                  */
602                 if (msg->aux_data) {
603                         msg->aux_data[msg->aux_size - 1] = 0;
604                         xa_printf(0, "DEBUGMSG: %s\n", msg->aux_data);
605                 }
606                 break;
607         default:
608                 /*
609                  * Unsupported one-way message, streaming message, or
610                  * transaction.
611                  *
612                  * Terminate any unsupported transactions with an error
613                  * and ignore any unsupported streaming messages.
614                  *
615                  * NOTE: This case also includes DMSG_LNK_ERROR messages
616                  *       which might be one-way, replying to those would
617                  *       cause an infinite ping-pong.
618                  */
619                 if (msg->any.head.cmd & DMSGF_CREATE)
620                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
621                 break;
622         }
623         lockmgr(&xdisk_lk, LK_RELEASE);
624
625         return 0;
626 }
627
628 /*
629  * Determine if we can destroy the xa_softc.
630  *
631  * Called with xdisk_lk held.
632  */
633 static
634 void
635 xa_terminate_check(struct xa_softc *sc)
636 {
637         xa_tag_t *tag;
638
639         /*
640          * Determine if we can destroy the softc.
641          */
642         xa_printf(1, "Terminate check xa%d (%d,%d,%d) sc=%p ",
643                 sc->unit,
644                 sc->opencnt, sc->serializing, sc->spancnt,
645                 sc);
646
647         if (sc->opencnt || sc->serializing || sc->spancnt ||
648             TAILQ_FIRST(&sc->bioq) || TAILQ_FIRST(&sc->tag_pendq)) {
649                 xa_printf(1, "%s", "(leave intact)\n");
650                 return;
651         }
652
653         /*
654          * Remove from device tree, a race with a new incoming span
655          * will create a new softc and disk.
656          */
657         RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
658         sc->terminating = 1;
659
660         /*
661          * Device has to go first to prevent device ops races.
662          */
663         if (sc->dev) {
664                 disk_destroy(&sc->disk);
665                 devstat_remove_entry(&sc->stats);
666                 sc->dev->si_drv1 = NULL;
667                 sc->dev = NULL;
668         }
669
670         xa_printf(1, "%s", "(remove from tree)\n");
671         sc->serializing = 1;
672         KKASSERT(sc->opencnt == 0);
673         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
674
675         while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
676                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
677                 tag->sc = NULL;
678                 kfree(tag, M_XDISK);
679         }
680
681         kfree(sc, M_XDISK);
682 }
683
684 /************************************************************************
685  *                         XA DEVICE INTERFACE                          *
686  ************************************************************************/
687
688 static int
689 xa_open(struct dev_open_args *ap)
690 {
691         cdev_t dev = ap->a_head.a_dev;
692         xa_softc_t *sc;
693         int error;
694
695         dev->si_bsize_phys = 512;
696         dev->si_bsize_best = 32768;
697
698         /*
699          * Interlock open with opencnt, wait for attachment operations
700          * to finish.
701          */
702         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
703 again:
704         sc = dev->si_drv1;
705         if (sc == NULL) {
706                 lockmgr(&xdisk_lk, LK_RELEASE);
707                 return ENXIO;   /* raced destruction */
708         }
709         if (sc->serializing) {
710                 tsleep(sc, 0, "xarace", hz / 10);
711                 goto again;
712         }
713         if (sc->terminating) {
714                 lockmgr(&xdisk_lk, LK_RELEASE);
715                 return ENXIO;   /* raced destruction */
716         }
717         sc->serializing = 1;
718
719         /*
720          * Serialize initial open
721          */
722         if (sc->opencnt++ > 0) {
723                 sc->serializing = 0;
724                 wakeup(sc);
725                 lockmgr(&xdisk_lk, LK_RELEASE);
726                 return(0);
727         }
728
729         /*
730          * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
731          */
732         if (sc->open_tag == NULL) {
733                 lockmgr(&sc->lk, LK_EXCLUSIVE);
734                 xa_restart_deferred(sc); /* eats serializing */
735                 lockmgr(&sc->lk, LK_RELEASE);
736         } else {
737                 sc->serializing = 0;
738                 wakeup(sc);
739         }
740         lockmgr(&xdisk_lk, LK_RELEASE);
741
742         /*
743          * Wait for completion of the BLK_OPEN
744          */
745         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
746         while (sc->serializing)
747                 lksleep(sc, &xdisk_lk, 0, "xaopen", hz);
748
749         error = sc->last_error;
750         if (error) {
751                 KKASSERT(sc->opencnt > 0);
752                 --sc->opencnt;
753                 xa_terminate_check(sc);
754                 sc = NULL;      /* sc may be invalid now */
755         }
756         lockmgr(&xdisk_lk, LK_RELEASE);
757
758         return (error);
759 }
760
761 static int
762 xa_close(struct dev_close_args *ap)
763 {
764         cdev_t dev = ap->a_head.a_dev;
765         xa_softc_t *sc;
766         xa_tag_t *tag;
767
768         lockmgr(&xdisk_lk, LK_EXCLUSIVE);
769         sc = dev->si_drv1;
770         if (sc == NULL) {
771                 lockmgr(&sc->lk, LK_RELEASE);
772                 return ENXIO;   /* raced destruction */
773         }
774         if (sc->terminating) {
775                 lockmgr(&sc->lk, LK_RELEASE);
776                 return ENXIO;   /* raced destruction */
777         }
778         lockmgr(&sc->lk, LK_EXCLUSIVE);
779
780         /*
781          * NOTE: Clearing open_tag allows a concurrent open to re-open
782          *       the device and prevents autonomous completion of the tag.
783          */
784         if (sc->opencnt == 1 && sc->open_tag) {
785                 tag = sc->open_tag;
786                 sc->open_tag = NULL;
787                 lockmgr(&sc->lk, LK_RELEASE);
788                 kdmsg_state_reply(tag->state, 0);       /* close our side */
789                 xa_wait(tag);                           /* wait on remote */
790         } else {
791                 lockmgr(&sc->lk, LK_RELEASE);
792         }
793         KKASSERT(sc->opencnt > 0);
794         --sc->opencnt;
795         xa_terminate_check(sc);
796         lockmgr(&xdisk_lk, LK_RELEASE);
797
798         return(0);
799 }
800
801 static int
802 xa_strategy(struct dev_strategy_args *ap)
803 {
804         xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
805         xa_tag_t *tag;
806         struct bio *bio = ap->a_bio;
807
808         devstat_start_transaction(&sc->stats);
809         atomic_add_int(&xa_active, 1);
810         xa_last = bio->bio_offset;
811
812         /*
813          * If no tags are available NULL is returned and the bio is
814          * placed on sc->bioq.
815          */
816         lockmgr(&sc->lk, LK_EXCLUSIVE);
817         tag = xa_setup_cmd(sc, bio);
818         if (tag)
819                 xa_start(tag, NULL, 1);
820         lockmgr(&sc->lk, LK_RELEASE);
821
822         return(0);
823 }
824
825 static int
826 xa_ioctl(struct dev_ioctl_args *ap)
827 {
828         return(ENOTTY);
829 }
830
831 static int
832 xa_size(struct dev_psize_args *ap)
833 {
834         struct xa_softc *sc;
835
836         if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
837                 return (ENXIO);
838         ap->a_result = sc->info.d_media_blocks;
839         return (0);
840 }
841
842 /************************************************************************
843  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
844  ************************************************************************
845  *
846  * Implement tag/msg setup and related functions.
847  * Called with sc->lk held.
848  */
849 static xa_tag_t *
850 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
851 {
852         xa_tag_t *tag;
853
854         /*
855          * Only get a tag if we have a valid virtual circuit to the server.
856          */
857         if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
858                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
859                 tag->bio = bio;
860                 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
861         }
862
863         /*
864          * If we can't dispatch now and this is a bio, queue it for later.
865          */
866         if (tag == NULL && bio) {
867                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
868         }
869
870         return (tag);
871 }
872
873 /*
874  * Called with sc->lk held
875  */
876 static void
877 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
878 {
879         xa_softc_t *sc = tag->sc;
880
881         tag->done = 0;
882         tag->async = async;
883         tag->status.head.error = DMSG_ERR_IO;   /* fallback error */
884
885         if (msg == NULL) {
886                 struct bio *bio;
887                 struct buf *bp;
888                 kdmsg_state_t *trans;
889
890                 if (sc->opencnt == 0 || sc->open_tag == NULL) {
891                         TAILQ_FOREACH(trans, &sc->spanq, user_entry) {
892                                 if ((trans->rxcmd & DMSGF_DELETE) == 0)
893                                         break;
894                         }
895                 } else {
896                         trans = sc->open_tag->state;
897                 }
898                 if (trans == NULL)
899                         goto skip;
900
901                 KKASSERT(tag->bio);
902                 bio = tag->bio;
903                 bp = bio->bio_buf;
904
905                 switch(bp->b_cmd) {
906                 case BUF_CMD_READ:
907                         msg = kdmsg_msg_alloc(trans,
908                                               DMSG_BLK_READ |
909                                               DMSGF_CREATE |
910                                               DMSGF_DELETE,
911                                               xa_bio_completion, tag);
912                         msg->any.blk_read.keyid = sc->keyid;
913                         msg->any.blk_read.offset = bio->bio_offset;
914                         msg->any.blk_read.bytes = bp->b_bcount;
915                         break;
916                 case BUF_CMD_WRITE:
917                         msg = kdmsg_msg_alloc(trans,
918                                               DMSG_BLK_WRITE |
919                                               DMSGF_CREATE | DMSGF_DELETE,
920                                               xa_bio_completion, tag);
921                         msg->any.blk_write.keyid = sc->keyid;
922                         msg->any.blk_write.offset = bio->bio_offset;
923                         msg->any.blk_write.bytes = bp->b_bcount;
924                         msg->aux_data = bp->b_data;
925                         msg->aux_size = bp->b_bcount;
926                         break;
927                 case BUF_CMD_FLUSH:
928                         msg = kdmsg_msg_alloc(trans,
929                                               DMSG_BLK_FLUSH |
930                                               DMSGF_CREATE | DMSGF_DELETE,
931                                               xa_bio_completion, tag);
932                         msg->any.blk_flush.keyid = sc->keyid;
933                         msg->any.blk_flush.offset = bio->bio_offset;
934                         msg->any.blk_flush.bytes = bp->b_bcount;
935                         break;
936                 case BUF_CMD_FREEBLKS:
937                         msg = kdmsg_msg_alloc(trans,
938                                               DMSG_BLK_FREEBLKS |
939                                               DMSGF_CREATE | DMSGF_DELETE,
940                                               xa_bio_completion, tag);
941                         msg->any.blk_freeblks.keyid = sc->keyid;
942                         msg->any.blk_freeblks.offset = bio->bio_offset;
943                         msg->any.blk_freeblks.bytes = bp->b_bcount;
944                         break;
945                 default:
946                         bp->b_flags |= B_ERROR;
947                         bp->b_error = EIO;
948                         devstat_end_transaction_buf(&sc->stats, bp);
949                         atomic_add_int(&xa_active, -1);
950                         biodone(bio);
951                         tag->bio = NULL;
952                         break;
953                 }
954         }
955
956         /*
957          * If no msg was allocated we likely could not find a good span.
958          */
959 skip:
960         if (msg) {
961                 /*
962                  * Message was passed in or constructed.
963                  */
964                 tag->state = msg->state;
965                 lockmgr(&sc->lk, LK_RELEASE);
966                 kdmsg_msg_write(msg);
967                 lockmgr(&sc->lk, LK_EXCLUSIVE);
968         } else if (tag->bio &&
969                    (tag->bio->bio_buf->b_flags & B_FAILONDIS) == 0) {
970                 /*
971                  * No spans available but BIO is not allowed to fail
972                  * on connectivity problems.  Requeue the BIO.
973                  */
974                 TAILQ_INSERT_TAIL(&sc->bioq, tag->bio, bio_act);
975                 tag->bio = NULL;
976                 lockmgr(&sc->lk, LK_RELEASE);
977                 xa_done(tag, 1);
978                 lockmgr(&sc->lk, LK_EXCLUSIVE);
979         } else {
980                 /*
981                  * No spans available, bio is allowed to fail.
982                  */
983                 lockmgr(&sc->lk, LK_RELEASE);
984                 tag->status.head.error = DMSG_ERR_IO;
985                 xa_done(tag, 1);
986                 lockmgr(&sc->lk, LK_EXCLUSIVE);
987         }
988 }
989
990 static uint32_t
991 xa_wait(xa_tag_t *tag)
992 {
993         xa_softc_t *sc = tag->sc;
994         uint32_t error;
995
996         lockmgr(&sc->lk, LK_EXCLUSIVE);
997         tag->waiting = 1;
998         while (tag->done == 0)
999                 lksleep(tag, &sc->lk, 0, "xawait", 0);
1000         lockmgr(&sc->lk, LK_RELEASE);
1001
1002         error = tag->status.head.error;
1003         tag->waiting = 0;
1004         xa_release(tag, 0);
1005
1006         return error;
1007 }
1008
1009 static void
1010 xa_done(xa_tag_t *tag, int wasbio)
1011 {
1012         KKASSERT(tag->bio == NULL);
1013
1014         tag->state = NULL;
1015         tag->done = 1;
1016         if (tag->waiting)
1017                 wakeup(tag);
1018         if (tag->async)
1019                 xa_release(tag, wasbio);
1020 }
1021
1022 /*
1023  * Release a tag.  If everything looks ok and there are pending BIOs
1024  * (due to all tags in-use), we can use the tag to start the next BIO.
1025  * Do not try to restart if the connection is currently failed.
1026  */
1027 static
1028 void
1029 xa_release(xa_tag_t *tag, int wasbio)
1030 {
1031         xa_softc_t *sc = tag->sc;
1032         struct bio *bio;
1033
1034         if ((bio = tag->bio) != NULL) {
1035                 struct buf *bp = bio->bio_buf;
1036
1037                 bp->b_error = EIO;
1038                 bp->b_flags |= B_ERROR;
1039                 devstat_end_transaction_buf(&sc->stats, bp);
1040                 atomic_add_int(&xa_active, -1);
1041                 biodone(bio);
1042                 tag->bio = NULL;
1043         }
1044
1045         lockmgr(&sc->lk, LK_EXCLUSIVE);
1046
1047         if (wasbio && sc->open_tag &&
1048             (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1049                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1050                 tag->bio = bio;
1051                 xa_start(tag, NULL, 1);
1052         } else {
1053                 TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
1054                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
1055         }
1056         lockmgr(&sc->lk, LK_RELEASE);
1057 }
1058
1059 /*
1060  * Handle messages under the BLKOPEN transaction.
1061  */
1062 static int
1063 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1064 {
1065         xa_tag_t *tag = state->any.any;
1066         xa_softc_t *sc;
1067         struct bio *bio;
1068
1069         /*
1070          * If the tag has been cleaned out we already closed our side
1071          * of the transaction and we are waiting for the other side to
1072          * close.
1073          */
1074         xa_printf(1, "xa_sync_completion: tag %p msg %08x state %p\n",
1075                   tag, msg->any.head.cmd, msg->state);
1076
1077         if (tag == NULL) {
1078                 if (msg->any.head.cmd & DMSGF_CREATE)
1079                         kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
1080                 return 0;
1081         }
1082         sc = tag->sc;
1083
1084         /*
1085          * Validate the tag
1086          */
1087         lockmgr(&sc->lk, LK_EXCLUSIVE);
1088
1089         /*
1090          * Handle initial response to our open and restart any deferred
1091          * BIOs on success.
1092          *
1093          * NOTE: DELETE may also be set.
1094          */
1095         if (msg->any.head.cmd & DMSGF_CREATE) {
1096                 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1097                 case DMSG_LNK_ERROR | DMSGF_REPLY:
1098                         bzero(&tag->status, sizeof(tag->status));
1099                         tag->status.head = msg->any.head;
1100                         break;
1101                 case DMSG_BLK_ERROR | DMSGF_REPLY:
1102                         tag->status = msg->any.blk_error;
1103                         break;
1104                 }
1105                 sc->last_error = tag->status.head.error;
1106                 xa_printf(1, "blk_open completion status %d\n",
1107                           sc->last_error);
1108                 if (sc->last_error == 0) {
1109                         while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1110                                 tag = xa_setup_cmd(sc, NULL);
1111                                 if (tag == NULL)
1112                                         break;
1113                                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1114                                 tag->bio = bio;
1115                                 xa_start(tag, NULL, 1);
1116                         }
1117                 }
1118                 sc->serializing = 0;
1119                 wakeup(sc);
1120         }
1121
1122         /*
1123          * Handle unexpected termination (or lost comm channel) from other
1124          * side.  Autonomous completion only if open_tag matches,
1125          * otherwise another thread is probably waiting on the tag.
1126          *
1127          * (see xa_close() for other interactions)
1128          */
1129         if (msg->any.head.cmd & DMSGF_DELETE) {
1130                 kdmsg_state_reply(tag->state, 0);
1131                 if (sc->open_tag == tag) {
1132                         sc->open_tag = NULL;
1133                         xa_done(tag, 0);
1134                 } else {
1135                         tag->async = 0;
1136                         xa_done(tag, 0);
1137                 }
1138         }
1139         lockmgr(&sc->lk, LK_RELEASE);
1140
1141         return (0);
1142 }
1143
1144 static int
1145 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1146 {
1147         xa_tag_t *tag = state->any.any;
1148         xa_softc_t *sc = tag->sc;
1149         struct bio *bio;
1150         struct buf *bp;
1151
1152         /*
1153          * Get the bio from the tag.  If no bio is present we just do
1154          * 'done' handling.
1155          */
1156         if ((bio = tag->bio) == NULL)
1157                 goto handle_done;
1158         bp = bio->bio_buf;
1159
1160         /*
1161          * Process return status
1162          */
1163         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1164         case DMSG_LNK_ERROR | DMSGF_REPLY:
1165                 bzero(&tag->status, sizeof(tag->status));
1166                 tag->status.head = msg->any.head;
1167                 if (tag->status.head.error)
1168                         tag->status.resid = bp->b_bcount;
1169                 else
1170                         tag->status.resid = 0;
1171                 break;
1172         case DMSG_BLK_ERROR | DMSGF_REPLY:
1173                 tag->status = msg->any.blk_error;
1174                 break;
1175         }
1176
1177         /*
1178          * If the device is open stall the bio on DMSG errors.  If an
1179          * actual I/O error occured on the remote device, DMSG_ERR_IO
1180          * will be returned.
1181          */
1182         if (tag->status.head.error &&
1183             (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1184                 if (tag->status.head.error != DMSG_ERR_IO)
1185                         goto handle_repend;
1186         }
1187
1188         /*
1189          * Process bio completion
1190          *
1191          * For reads any returned data is zero-extended if necessary, so
1192          * the server can short-cut any all-zeros reads if it desires.
1193          */
1194         switch(bp->b_cmd) {
1195         case BUF_CMD_READ:
1196                 if (msg->aux_data && msg->aux_size) {
1197                         if (msg->aux_size < bp->b_bcount) {
1198                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1199                                 bzero(bp->b_data + msg->aux_size,
1200                                       bp->b_bcount - msg->aux_size);
1201                         } else {
1202                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1203                         }
1204                 } else {
1205                         bzero(bp->b_data, bp->b_bcount);
1206                 }
1207                 /* fall through */
1208         case BUF_CMD_WRITE:
1209         case BUF_CMD_FLUSH:
1210         case BUF_CMD_FREEBLKS:
1211         default:
1212                 if (tag->status.resid > bp->b_bcount)
1213                         tag->status.resid = bp->b_bcount;
1214                 bp->b_resid = tag->status.resid;
1215                 if (tag->status.head.error != 0) {
1216                         bp->b_error = EIO;
1217                         bp->b_flags |= B_ERROR;
1218                 } else {
1219                         bp->b_resid = 0;
1220                 }
1221                 devstat_end_transaction_buf(&sc->stats, bp);
1222                 atomic_add_int(&xa_active, -1);
1223                 biodone(bio);
1224                 tag->bio = NULL;
1225                 break;
1226         }
1227
1228         /*
1229          * Handle completion of the transaction.  If the bioq is not empty
1230          * we can initiate another bio on the same tag.
1231          *
1232          * NOTE: Most of our transactions will be single-message
1233          *       CREATE+DELETEs, so we won't have to terminate the
1234          *       transaction separately, here.  But just in case they
1235          *       aren't be sure to terminate the transaction.
1236          */
1237 handle_done:
1238         if (msg->any.head.cmd & DMSGF_DELETE) {
1239                 xa_done(tag, 1);
1240                 if ((state->txcmd & DMSGF_DELETE) == 0)
1241                         kdmsg_msg_reply(msg, 0);
1242         }
1243         return (0);
1244
1245         /*
1246          * Handle the case where the transaction failed due to a
1247          * connectivity issue.  The tag is put away with wasbio=0
1248          * and we put the BIO back onto the bioq for a later restart.
1249          *
1250          * probe I/Os (where the device is not open) will be failed
1251          * instead of requeued.
1252          */
1253 handle_repend:
1254         tag->bio = NULL;
1255         if (bio->bio_buf->b_flags & B_FAILONDIS) {
1256                 xa_printf(1, "xa_strategy: lost link, fail probe bp %p\n",
1257                           bio->bio_buf);
1258                 bio->bio_buf->b_error = ENXIO;
1259                 bio->bio_buf->b_flags |= B_ERROR;
1260                 biodone(bio);
1261                 bio = NULL;
1262         } else {
1263                 xa_printf(1, "xa_strategy: lost link, requeue bp %p\n",
1264                           bio->bio_buf);
1265         }
1266         xa_done(tag, 0);
1267         if ((state->txcmd & DMSGF_DELETE) == 0)
1268                 kdmsg_msg_reply(msg, 0);
1269
1270         /*
1271          * Requeue the bio
1272          */
1273         if (bio) {
1274                 lockmgr(&sc->lk, LK_EXCLUSIVE);
1275                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1276                 lockmgr(&sc->lk, LK_RELEASE);
1277         }
1278         return (0);
1279 }
1280
1281 /*
1282  * Restart as much deferred I/O as we can.  The serializer is set and we
1283  * eat it (clear it) when done.
1284  *
1285  * Called with sc->lk held
1286  */
1287 static
1288 void
1289 xa_restart_deferred(xa_softc_t *sc)
1290 {
1291         kdmsg_state_t *span;
1292         kdmsg_msg_t *msg;
1293         xa_tag_t *tag;
1294         int error;
1295
1296         KKASSERT(sc->serializing);
1297
1298         /*
1299          * Determine if a restart is needed.
1300          */
1301         if (sc->opencnt == 0) {
1302                 /*
1303                  * Device is not open, nothing to do, eat serializing.
1304                  */
1305                 sc->serializing = 0;
1306                 wakeup(sc);
1307         } else if (sc->open_tag == NULL) {
1308                 /*
1309                  * BLK_OPEN required before we can restart any BIOs.
1310                  * Select the best LNK_SPAN to issue the BLK_OPEN under.
1311                  *
1312                  * serializing interlocks waiting open()s.
1313                  */
1314                 error = 0;
1315                 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1316                         if ((span->rxcmd & DMSGF_DELETE) == 0)
1317                                 break;
1318                 }
1319                 if (span == NULL)
1320                         error = ENXIO;
1321
1322                 if (error == 0) {
1323                         tag = xa_setup_cmd(sc, NULL);
1324                         if (tag == NULL)
1325                                 error = ENXIO;
1326                 }
1327                 if (error == 0) {
1328                         sc->open_tag = tag;
1329                         msg = kdmsg_msg_alloc(span,
1330                                               DMSG_BLK_OPEN |
1331                                               DMSGF_CREATE,
1332                                               xa_sync_completion, tag);
1333                         msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1334                         xa_printf(1,
1335                                   "BLK_OPEN tag %p state %p "
1336                                   "span-state %p\n",
1337                                   tag, msg->state, span);
1338                         xa_start(tag, msg, 0);
1339                 }
1340                 if (error) {
1341                         sc->serializing = 0;
1342                         wakeup(sc);
1343                 }
1344                 /* else leave serializing set until BLK_OPEN response */
1345         } else {
1346                 /* nothing to do */
1347                 sc->serializing = 0;
1348                 wakeup(sc);
1349         }
1350 }