Merge branch 'vendor/BYACC'
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66
67 #include <sys/buf2.h>
68 #include <sys/thread2.h>
69
70 struct xa_softc;
71 struct xa_softc_tree;
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
74
75 /*
76  * Track a BIO tag
77  */
78 struct xa_tag {
79         TAILQ_ENTRY(xa_tag) entry;
80         struct xa_softc *sc;
81         dmsg_blk_error_t status;
82         kdmsg_state_t   *state;
83         struct bio      *bio;
84         int             waiting;
85         int             async;
86         int             done;
87 };
88
89 typedef struct xa_tag   xa_tag_t;
90
91 /*
92  * Track devices.
93  */
94 struct xa_softc {
95         struct kdmsg_state_list spanq;
96         RB_ENTRY(xa_softc) rbnode;
97         cdev_t          dev;
98         struct disk_info info;
99         struct disk     disk;
100         uuid_t          pfs_fsid;
101         int             unit;
102         int             opencnt;
103         int             spancnt;
104         uint64_t        keyid;
105         int             serializing;
106         int             last_error;
107         char            cl_label[64];   /* from LNK_SPAN cl_label (host/dev) */
108         char            fs_label[64];   /* from LNK_SPAN fs_label (serno str) */
109         xa_tag_t        *open_tag;
110         TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
111         TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
112         TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
113         struct lwkt_token tok;
114 };
115
116 typedef struct xa_softc xa_softc_t;
117
118 struct xa_iocom {
119         TAILQ_ENTRY(xa_iocom) entry;
120         kdmsg_iocom_t   iocom;
121         xa_softc_t      dummysc;
122 };
123
124 typedef struct xa_iocom xa_iocom_t;
125
126 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
127 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
128 static struct xa_softc_tree xa_device_tree;
129
130 #define MAXTAGS         64      /* no real limit */
131
132 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
133 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
134 static void xaio_exit(kdmsg_iocom_t *iocom);
135 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
136
137 static void xa_terminate_check(struct xa_softc *sc);
138
139 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
140 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
141 static void xa_done(xa_tag_t *tag, int wasbio);
142 static void xa_release(xa_tag_t *tag, int wasbio);
143 static uint32_t xa_wait(xa_tag_t *tag);
144 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
145 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
146 static void xa_restart_deferred(xa_softc_t *sc);
147
148 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
149
150 /*
151  * Control device, issue ioctls to create xa devices.
152  */
153 static d_open_t xdisk_open;
154 static d_close_t xdisk_close;
155 static d_ioctl_t xdisk_ioctl;
156
157 static struct dev_ops xdisk_ops = {
158         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
159         .d_open =       xdisk_open,
160         .d_close =      xdisk_close,
161         .d_ioctl =      xdisk_ioctl
162 };
163
164 /*
165  * XA disk devices
166  */
167 static d_open_t xa_open;
168 static d_close_t xa_close;
169 static d_ioctl_t xa_ioctl;
170 static d_strategy_t xa_strategy;
171 static d_psize_t xa_size;
172
173 static struct dev_ops xa_ops = {
174         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
175         .d_open =       xa_open,
176         .d_close =      xa_close,
177         .d_ioctl =      xa_ioctl,
178         .d_read =       physread,
179         .d_write =      physwrite,
180         .d_strategy =   xa_strategy,
181         .d_psize =      xa_size
182 };
183
184 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
185 static int xdisk_opencount;
186 static cdev_t xdisk_dev;
187 static TAILQ_HEAD(, xa_iocom) xaiocomq;
188
189 /*
190  * Module initialization
191  */
192 static int
193 xdisk_modevent(module_t mod, int type, void *data)
194 {
195         switch (type) {
196         case MOD_LOAD:
197                 TAILQ_INIT(&xaiocomq);
198                 RB_INIT(&xa_device_tree);
199                 xdisk_dev = make_dev(&xdisk_ops, 0,
200                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
201                 break;
202         case MOD_UNLOAD:
203         case MOD_SHUTDOWN:
204                 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
205                         return (EBUSY);
206                 if (xdisk_dev) {
207                         destroy_dev(xdisk_dev);
208                         xdisk_dev = NULL;
209                 }
210                 dev_ops_remove_all(&xdisk_ops);
211                 dev_ops_remove_all(&xa_ops);
212                 break;
213         default:
214                 break;
215         }
216         return 0;
217 }
218
219 DEV_MODULE(xdisk, xdisk_modevent, 0);
220
221 static int
222 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
223 {
224         return(strcmp(sc1->fs_label, sc2->fs_label));
225 }
226
227 /*
228  * Control device
229  */
230 static int
231 xdisk_open(struct dev_open_args *ap)
232 {
233         lwkt_gettoken(&xdisk_token);
234         ++xdisk_opencount;
235         lwkt_reltoken(&xdisk_token);
236         return(0);
237 }
238
239 static int
240 xdisk_close(struct dev_close_args *ap)
241 {
242         lwkt_gettoken(&xdisk_token);
243         --xdisk_opencount;
244         lwkt_reltoken(&xdisk_token);
245         return(0);
246 }
247
248 static int
249 xdisk_ioctl(struct dev_ioctl_args *ap)
250 {
251         int error;
252
253         switch(ap->a_cmd) {
254         case XDISKIOCATTACH:
255                 error = xdisk_attach((void *)ap->a_data);
256                 break;
257         case XDISKIOCDETACH:
258                 error = xdisk_detach((void *)ap->a_data);
259                 break;
260         default:
261                 error = ENOTTY;
262                 break;
263         }
264         return error;
265 }
266
267 /************************************************************************
268  *                              DMSG INTERFACE                          *
269  ************************************************************************/
270
271 static int
272 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
273 {
274         xa_iocom_t *xaio;
275         struct file *fp;
276
277         /*
278          * Normalize ioctl params
279          */
280         kprintf("xdisk_attach1\n");
281         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
282         if (fp == NULL)
283                 return EINVAL;
284         kprintf("xdisk_attach2\n");
285
286         /*
287          * See if the serial number is already present.  If we are
288          * racing a termination the disk subsystem may still have
289          * duplicate entries not yet removed so we wait a bit and
290          * retry.
291          */
292         lwkt_gettoken(&xdisk_token);
293
294         xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
295         kprintf("xdisk_attach3\n");
296         kdmsg_iocom_init(&xaio->iocom, xaio,
297                          KDMSG_IOCOMF_AUTOCONN,
298                          M_XDISK, xaio_rcvdmsg);
299         xaio->iocom.exit_func = xaio_exit;
300
301         kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
302
303         /*
304          * Setup our LNK_CONN advertisement for autoinitiate.
305          *
306          * Our filter is setup to only accept PEER_BLOCK/SERVER
307          * advertisements.
308          *
309          * We need a unique pfs_fsid to avoid confusion.
310          */
311         xaio->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
312         xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
313         xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
314         xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
315         xaio->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
316         ksnprintf(xaio->iocom.auto_lnk_conn.fs_label,
317                   sizeof(xaio->iocom.auto_lnk_conn.fs_label),
318                   "xdisk");
319         kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1);
320
321         /*
322          * Setup our LNK_SPAN advertisement for autoinitiate
323          */
324         TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
325         kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
326         lwkt_reltoken(&xdisk_token);
327
328         return 0;
329 }
330
331 static int
332 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
333 {
334         return EINVAL;
335 }
336
337 /*
338  * Called from iocom core transmit thread upon disconnect.
339  */
340 static
341 void
342 xaio_exit(kdmsg_iocom_t *iocom)
343 {
344         xa_iocom_t *xaio = iocom->handle;
345
346         kprintf("xdisk_detach -xaio_exit\n");
347         lwkt_gettoken(&xdisk_token);
348         TAILQ_REMOVE(&xaiocomq, xaio, entry);
349         lwkt_reltoken(&xdisk_token);
350
351         kfree(xaio, M_XDISK);
352 }
353
354 /*
355  * Called from iocom core to handle messages that the iocom core does not
356  * handle itself and for which a state function callback has not yet been
357  * established.
358  *
359  * We primarily care about LNK_SPAN transactions here.
360  */
361 static int
362 xaio_rcvdmsg(kdmsg_msg_t *msg)
363 {
364         kdmsg_state_t   *state = msg->state;
365         xa_iocom_t      *xaio = state->iocom->handle;
366         xa_softc_t      *sc;
367
368         kprintf("xdisk_rcvdmsg %08x\n", msg->any.head.cmd);
369         lwkt_gettoken(&xdisk_token);
370
371         switch(msg->tcmd) {
372         case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
373                 /*
374                  * A LNK_SPAN transaction which is opened and closed
375                  * degenerately is not useful to us, just ignore it.
376                  */
377                 kdmsg_msg_reply(msg, 0);
378                 break;
379         case DMSG_LNK_SPAN | DMSGF_CREATE:
380                 /*
381                  * Manage the tracking node for the remote LNK_SPAN.
382                  *
383                  * Return a streaming result, leaving the transaction open
384                  * in both directions to allow sub-transactions.
385                  */
386                 bcopy(msg->any.lnk_span.cl_label, xaio->dummysc.cl_label,
387                       sizeof(xaio->dummysc.cl_label));
388                 xaio->dummysc.cl_label[sizeof(xaio->dummysc.cl_label) - 1] = 0;
389
390                 bcopy(msg->any.lnk_span.fs_label, xaio->dummysc.fs_label,
391                       sizeof(xaio->dummysc.fs_label));
392                 xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
393
394                 kprintf("xdisk: %s LNK_SPAN create ",
395                         msg->any.lnk_span.fs_label);
396
397                 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
398                 if (sc == NULL) {
399                         xa_softc_t *sctmp;
400                         xa_tag_t *tag;
401                         cdev_t dev;
402                         int unit;
403                         int n;
404
405                         sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
406                         kprintf("(not found - create %p)\n", sc);
407                         bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
408                               sizeof(sc->cl_label));
409                         sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
410                         bcopy(msg->any.lnk_span.fs_label, sc->fs_label,
411                               sizeof(sc->fs_label));
412                         sc->fs_label[sizeof(sc->fs_label) - 1] = 0;
413
414                         /* XXX FIXME O(N^2) */
415                         unit = -1;
416                         do {
417                                 ++unit;
418                                 RB_FOREACH(sctmp, xa_softc_tree,
419                                            &xa_device_tree) {
420                                         if (sctmp->unit == unit)
421                                                 break;
422                                 }
423                         } while (sctmp);
424
425                         sc->unit = unit;
426                         sc->serializing = 1;
427                         sc->spancnt = 1;
428                         lwkt_token_init(&sc->tok, "xa");
429                         TAILQ_INIT(&sc->spanq);
430                         TAILQ_INIT(&sc->bioq);
431                         TAILQ_INIT(&sc->tag_freeq);
432                         TAILQ_INIT(&sc->tag_pendq);
433                         RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
434                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
435                         msg->state->any.xa_sc = sc;
436
437                         /*
438                          * Setup block device
439                          */
440                         for (n = 0; n < MAXTAGS; ++n) {
441                                 tag = kmalloc(sizeof(*tag),
442                                               M_XDISK, M_WAITOK|M_ZERO);
443                                 tag->sc = sc;
444                                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
445                         }
446
447                         if (sc->dev == NULL) {
448                                 dev = disk_create(unit, &sc->disk, &xa_ops);
449                                 dev->si_drv1 = sc;
450                                 sc->dev = dev;
451                         }
452
453                         sc->info.d_media_blksize =
454                                 msg->any.lnk_span.media.block.blksize;
455                         if (sc->info.d_media_blksize <= 0)
456                                 sc->info.d_media_blksize = 1;
457                         sc->info.d_media_blocks =
458                                 msg->any.lnk_span.media.block.bytes /
459                                 sc->info.d_media_blksize;
460                         sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
461                         sc->info.d_secpertrack = 32;
462                         sc->info.d_nheads = 64;
463                         sc->info.d_secpercyl = sc->info.d_secpertrack *
464                                                sc->info.d_nheads;
465                         sc->info.d_ncylinders = 0;
466                         if (sc->fs_label[0])
467                                 sc->info.d_serialno = sc->fs_label;
468                         disk_setdiskinfo_sync(&sc->disk, &sc->info);
469                         xa_restart_deferred(sc);        /* eats serializing */
470                 } else {
471                         kprintf("(found spancnt %d sc=%p)\n", sc->spancnt, sc);
472                         ++sc->spancnt;
473                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
474                         msg->state->any.xa_sc = sc;
475                         if (sc->serializing == 0 && sc->open_tag == NULL) {
476                                 sc->serializing = 1;
477                                 xa_restart_deferred(sc); /* eats serializing */
478                         }
479                 }
480                 kdmsg_msg_result(msg, 0);
481                 break;
482         case DMSG_LNK_SPAN | DMSGF_DELETE:
483         case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
484                 /*
485                  * Manage the tracking node for the remote LNK_SPAN.
486                  *
487                  * Return a final result, closing our end of the transaction.
488                  */
489                 sc = msg->state->any.xa_sc;
490                 kprintf("xdisk: %s LNK_SPAN terminate\n", sc->fs_label);
491                 msg->state->any.xa_sc = NULL;
492                 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
493                 --sc->spancnt;
494                 xa_terminate_check(sc);
495                 kdmsg_msg_reply(msg, 0);
496                 break;
497         case DMSG_LNK_SPAN | DMSGF_REPLY:
498                 /*
499                  * Ignore unimplemented streaming replies on our LNK_SPAN
500                  * transaction.
501                  */
502                 break;
503         case DMSG_DBG_SHELL:
504                 /*
505                  * Execute shell command (not supported atm).
506                  *
507                  * This is a one-way packet but if not (e.g. if part of
508                  * a streaming transaction), we will have already closed
509                  * our end.
510                  */
511                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
512                 break;
513         case DMSG_DBG_SHELL | DMSGF_REPLY:
514                 /*
515                  * Receive one or more replies to a shell command
516                  * that we sent.  Just dump it to the console.
517                  *
518                  * This is a one-way packet but if not (e.g. if
519                  * part of a streaming transaction), we will have
520                  * already closed our end.
521                  */
522                 if (msg->aux_data) {
523                         msg->aux_data[msg->aux_size - 1] = 0;
524                         kprintf("xdisk: DEBUGMSG: %s\n",
525                                 msg->aux_data);
526                 }
527                 break;
528         default:
529                 /*
530                  * Unsupported one-way message, streaming message, or
531                  * transaction.
532                  *
533                  * Terminate any unsupported transactions with an error
534                  * and ignore any unsupported streaming messages.
535                  *
536                  * NOTE: This case also includes DMSG_LNK_ERROR messages
537                  *       which might be one-way, replying to those would
538                  *       cause an infinite ping-pong.
539                  */
540                 if (msg->any.head.cmd & DMSGF_CREATE)
541                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
542                 break;
543         }
544         lwkt_reltoken(&xdisk_token);
545
546         return 0;
547 }
548
549 /*
550  * Determine if we can destroy the xa_softc.
551  *
552  * Called with xdisk_token held.
553  */
554 static
555 void
556 xa_terminate_check(struct xa_softc *sc)
557 {
558         xa_tag_t *tag;
559
560         /*
561          * Determine if we can destroy the softc.
562          */
563         kprintf("xdisk: terminate check xa%d (%d,%d,%d) sc=%p ",
564                 sc->unit,
565                 sc->opencnt, sc->serializing, sc->spancnt,
566                 sc);
567
568         if (sc->opencnt || sc->serializing || sc->spancnt) {
569                 kprintf("(leave intact)\n");
570                 return;
571         }
572         kprintf("(remove from tree)\n");
573         sc->serializing = 1;
574         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
575
576         RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
577
578         if (sc->dev) {
579                 disk_destroy(&sc->disk);
580                 sc->dev->si_drv1 = NULL;
581                 sc->dev = NULL;
582         }
583         KKASSERT(sc->opencnt == 0);
584         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
585
586         while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
587                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
588                 tag->sc = NULL;
589                 kfree(tag, M_XDISK);
590         }
591         kfree(sc, M_XDISK);
592 }
593
594 /************************************************************************
595  *                         XA DEVICE INTERFACE                          *
596  ************************************************************************/
597
598 static int
599 xa_open(struct dev_open_args *ap)
600 {
601         cdev_t dev = ap->a_head.a_dev;
602         xa_softc_t *sc;
603         int error;
604
605         dev->si_bsize_phys = 512;
606         dev->si_bsize_best = 32768;
607
608         /*
609          * Interlock open with opencnt, wait for attachment operations
610          * to finish.
611          */
612         lwkt_gettoken(&xdisk_token);
613 again:
614         sc = dev->si_drv1;
615         if (sc == NULL) {
616                 lwkt_reltoken(&xdisk_token);
617                 return ENXIO;   /* raced destruction */
618         }
619         if (sc->serializing) {
620                 tsleep(sc, 0, "xarace", hz / 10);
621                 goto again;
622         }
623         sc->serializing = 1;
624
625         /*
626          * Serialize initial open
627          */
628         if (sc->opencnt++ > 0) {
629                 lwkt_reltoken(&xdisk_token);
630                 return(0);
631         }
632         lwkt_reltoken(&xdisk_token);
633
634         /*
635          * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
636          */
637         if (sc->open_tag == NULL) {
638                 xa_restart_deferred(sc); /* eats serializing */
639         } else {
640                 sc->serializing = 0;
641                 wakeup(sc);
642         }
643
644         /*
645          * Wait for completion of the BLK_OPEN
646          */
647         lwkt_gettoken(&xdisk_token);
648         while (sc->serializing)
649                 tsleep(sc, 0, "xaopen", hz);
650
651         error = sc->last_error;
652         if (error) {
653                 KKASSERT(sc->opencnt > 0);
654                 --sc->opencnt;
655                 xa_terminate_check(sc);
656                 sc = NULL;      /* sc may be invalid now */
657         }
658         lwkt_reltoken(&xdisk_token);
659
660         return (error);
661 }
662
663 static int
664 xa_close(struct dev_close_args *ap)
665 {
666         cdev_t dev = ap->a_head.a_dev;
667         xa_softc_t *sc;
668         xa_tag_t *tag;
669
670         sc = dev->si_drv1;
671         if (sc == NULL)
672                 return ENXIO;   /* raced destruction */
673         lwkt_gettoken(&xdisk_token);
674         lwkt_gettoken(&sc->tok);
675
676         /*
677          * NOTE: Clearing open_tag allows a concurrent open to re-open
678          *       the device and prevents autonomous completion of the tag.
679          */
680         if (sc->opencnt == 1 && sc->open_tag) {
681                 tag = sc->open_tag;
682                 sc->open_tag = NULL;
683                 kdmsg_state_reply(tag->state, 0);       /* close our side */
684                 xa_wait(tag);                           /* wait on remote */
685         }
686         lwkt_reltoken(&sc->tok);
687         KKASSERT(sc->opencnt > 0);
688         --sc->opencnt;
689         xa_terminate_check(sc);
690         lwkt_reltoken(&xdisk_token);
691
692         return(0);
693 }
694
695 static int
696 xa_strategy(struct dev_strategy_args *ap)
697 {
698         xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
699         xa_tag_t *tag;
700         struct bio *bio = ap->a_bio;
701
702         /*
703          * Allow potentially temporary link failures to fail the I/Os
704          * only if the device is not open.  That is, we allow the disk
705          * probe code prior to mount to fail.
706          */
707         if (sc->opencnt == 0) {
708                 bio->bio_buf->b_error = ENXIO;
709                 bio->bio_buf->b_flags |= B_ERROR;
710                 biodone(bio);
711                 return(0);
712         }
713
714         tag = xa_setup_cmd(sc, bio);
715         if (tag)
716                 xa_start(tag, NULL, 1);
717         return(0);
718 }
719
720 static int
721 xa_ioctl(struct dev_ioctl_args *ap)
722 {
723         return(ENOTTY);
724 }
725
726 static int
727 xa_size(struct dev_psize_args *ap)
728 {
729         struct xa_softc *sc;
730
731         if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
732                 return (ENXIO);
733         ap->a_result = sc->info.d_media_blocks;
734         return (0);
735 }
736
737 /************************************************************************
738  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
739  ************************************************************************
740  *
741  * Implement tag/msg setup and related functions.
742  */
743 static xa_tag_t *
744 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
745 {
746         xa_tag_t *tag;
747
748         /*
749          * Only get a tag if we have a valid virtual circuit to the server.
750          */
751         lwkt_gettoken(&sc->tok);
752         if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
753                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
754                 tag->bio = bio;
755                 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
756         }
757
758         /*
759          * If we can't dispatch now and this is a bio, queue it for later.
760          */
761         if (tag == NULL && bio) {
762                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
763         }
764         lwkt_reltoken(&sc->tok);
765
766         return (tag);
767 }
768
769 static void
770 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
771 {
772         xa_softc_t *sc = tag->sc;
773
774         tag->done = 0;
775         tag->async = async;
776
777         if (msg == NULL) {
778                 struct bio *bio;
779                 struct buf *bp;
780
781                 KKASSERT(tag->bio);
782                 bio = tag->bio;
783                 bp = bio->bio_buf;
784
785                 switch(bp->b_cmd) {
786                 case BUF_CMD_READ:
787                         msg = kdmsg_msg_alloc(sc->open_tag->state,
788                                               DMSG_BLK_READ |
789                                               DMSGF_CREATE | DMSGF_DELETE,
790                                               xa_bio_completion, tag);
791                         msg->any.blk_read.keyid = sc->keyid;
792                         msg->any.blk_read.offset = bio->bio_offset;
793                         msg->any.blk_read.bytes = bp->b_bcount;
794                         break;
795                 case BUF_CMD_WRITE:
796                         msg = kdmsg_msg_alloc(sc->open_tag->state,
797                                               DMSG_BLK_WRITE |
798                                               DMSGF_CREATE | DMSGF_DELETE,
799                                               xa_bio_completion, tag);
800                         msg->any.blk_write.keyid = sc->keyid;
801                         msg->any.blk_write.offset = bio->bio_offset;
802                         msg->any.blk_write.bytes = bp->b_bcount;
803                         msg->aux_data = bp->b_data;
804                         msg->aux_size = bp->b_bcount;
805                         break;
806                 case BUF_CMD_FLUSH:
807                         msg = kdmsg_msg_alloc(sc->open_tag->state,
808                                               DMSG_BLK_FLUSH |
809                                               DMSGF_CREATE | DMSGF_DELETE,
810                                               xa_bio_completion, tag);
811                         msg->any.blk_flush.keyid = sc->keyid;
812                         msg->any.blk_flush.offset = bio->bio_offset;
813                         msg->any.blk_flush.bytes = bp->b_bcount;
814                         break;
815                 case BUF_CMD_FREEBLKS:
816                         msg = kdmsg_msg_alloc(sc->open_tag->state,
817                                               DMSG_BLK_FREEBLKS |
818                                               DMSGF_CREATE | DMSGF_DELETE,
819                                               xa_bio_completion, tag);
820                         msg->any.blk_freeblks.keyid = sc->keyid;
821                         msg->any.blk_freeblks.offset = bio->bio_offset;
822                         msg->any.blk_freeblks.bytes = bp->b_bcount;
823                         break;
824                 default:
825                         bp->b_flags |= B_ERROR;
826                         bp->b_error = EIO;
827                         biodone(bio);
828                         tag->bio = NULL;
829                         break;
830                 }
831         }
832
833         if (msg) {
834                 tag->state = msg->state;
835                 kdmsg_msg_write(msg);
836         } else {
837                 tag->status.head.error = DMSG_ERR_IO;
838                 xa_done(tag, 1);
839         }
840 }
841
842 static uint32_t
843 xa_wait(xa_tag_t *tag)
844 {
845         xa_softc_t *sc = tag->sc;
846         uint32_t error;
847
848         kprintf("xdisk: xa_wait  %p\n", tag);
849
850         lwkt_gettoken(&sc->tok);
851         tag->waiting = 1;
852         while (tag->done == 0)
853                 tsleep(tag, 0, "xawait", 0);
854         lwkt_reltoken(&sc->tok);
855         error = tag->status.head.error;
856         tag->waiting = 0;
857         xa_release(tag, 0);
858
859         return error;
860 }
861
862 static void
863 xa_done(xa_tag_t *tag, int wasbio)
864 {
865         KKASSERT(tag->bio == NULL);
866
867         tag->state = NULL;
868         tag->done = 1;
869         if (tag->waiting)
870                 wakeup(tag);
871         if (tag->async)
872                 xa_release(tag, wasbio);
873 }
874
875 static
876 void
877 xa_release(xa_tag_t *tag, int wasbio)
878 {
879         xa_softc_t *sc = tag->sc;
880         struct bio *bio;
881
882         lwkt_gettoken(&sc->tok);
883         if (wasbio && (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
884                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
885                 tag->bio = bio;
886                 lwkt_reltoken(&sc->tok);
887                 xa_start(tag, NULL, 1);
888         } else {
889                 TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
890                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
891                 lwkt_reltoken(&sc->tok);
892         }
893 }
894
895 /*
896  * Handle messages under the BLKOPEN transaction.
897  */
898 static int
899 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
900 {
901         xa_tag_t *tag = state->any.any;
902         xa_softc_t *sc;
903         struct bio *bio;
904
905         /*
906          * If the tag has been cleaned out we already closed our side
907          * of the transaction and we are waiting for the other side to
908          * close.
909          */
910         if (tag == NULL) {
911                 if (msg->any.head.cmd & DMSGF_CREATE)
912                         kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
913                 return 0;
914         }
915         sc = tag->sc;
916
917         /*
918          * Validate the tag
919          */
920         lwkt_gettoken(&sc->tok);
921
922         /*
923          * Handle initial response to our open and restart any deferred
924          * BIOs on success.
925          *
926          * NOTE: DELETE may also be set.
927          */
928         if (msg->any.head.cmd & DMSGF_CREATE) {
929                 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
930                 case DMSG_LNK_ERROR | DMSGF_REPLY:
931                         bzero(&tag->status, sizeof(tag->status));
932                         tag->status.head = msg->any.head;
933                         break;
934                 case DMSG_BLK_ERROR | DMSGF_REPLY:
935                         tag->status = msg->any.blk_error;
936                         break;
937                 }
938                 sc->last_error = tag->status.head.error;
939                 kprintf("xdisk: blk_open completion status %d\n",
940                         sc->last_error);
941                 if (sc->last_error == 0) {
942                         while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
943                                 tag = xa_setup_cmd(sc, NULL);
944                                 if (tag == NULL)
945                                         break;
946                                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
947                                 tag->bio = bio;
948                                 xa_start(tag, NULL, 1);
949                         }
950                 }
951                 sc->serializing = 0;
952                 wakeup(sc);
953         }
954
955         /*
956          * Handle unexpected termination (or lost comm channel) from other
957          * side.  Autonomous completion only if open_tag matches,
958          * otherwise another thread is probably waiting on the tag.
959          *
960          * (see xa_close() for other interactions)
961          */
962         if (msg->any.head.cmd & DMSGF_DELETE) {
963                 kdmsg_state_reply(tag->state, 0);
964                 if (sc->open_tag == tag) {
965                         sc->open_tag = NULL;
966                         xa_done(tag, 0);
967                 } else {
968                         tag->async = 0;
969                         xa_done(tag, 0);
970                 }
971         }
972         lwkt_reltoken(&sc->tok);
973         return (0);
974 }
975
976 static int
977 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
978 {
979         xa_tag_t *tag = state->any.any;
980         xa_softc_t *sc = tag->sc;
981         struct bio *bio;
982         struct buf *bp;
983
984         /*
985          * Get the bio from the tag.  If no bio is present we just do
986          * 'done' handling.
987          */
988         if ((bio = tag->bio) == NULL)
989                 goto handle_done;
990         bp = bio->bio_buf;
991
992         /*
993          * Process return status
994          */
995         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
996         case DMSG_LNK_ERROR | DMSGF_REPLY:
997                 bzero(&tag->status, sizeof(tag->status));
998                 tag->status.head = msg->any.head;
999                 if (tag->status.head.error)
1000                         tag->status.resid = bp->b_bcount;
1001                 else
1002                         tag->status.resid = 0;
1003                 break;
1004         case DMSG_BLK_ERROR | DMSGF_REPLY:
1005                 tag->status = msg->any.blk_error;
1006                 break;
1007         }
1008
1009         /*
1010          * If the device is open stall the bio on DMSG errors.  If an
1011          * actual I/O error occured on the remote device, DMSG_ERR_IO
1012          * will be returned.
1013          */
1014         if (tag->status.head.error &&
1015             (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1016                 if (tag->status.head.error != DMSG_ERR_IO)
1017                         goto handle_repend;
1018         }
1019
1020         /*
1021          * Process bio completion
1022          *
1023          * For reads any returned data is zero-extended if necessary, so
1024          * the server can short-cut any all-zeros reads if it desires.
1025          */
1026         switch(bp->b_cmd) {
1027         case BUF_CMD_READ:
1028                 if (msg->aux_data && msg->aux_size) {
1029                         if (msg->aux_size < bp->b_bcount) {
1030                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1031                                 bzero(bp->b_data + msg->aux_size,
1032                                       bp->b_bcount - msg->aux_size);
1033                         } else {
1034                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1035                         }
1036                 } else {
1037                         bzero(bp->b_data, bp->b_bcount);
1038                 }
1039                 /* fall through */
1040         case BUF_CMD_WRITE:
1041         case BUF_CMD_FLUSH:
1042         case BUF_CMD_FREEBLKS:
1043         default:
1044                 if (tag->status.resid > bp->b_bcount)
1045                         tag->status.resid = bp->b_bcount;
1046                 bp->b_resid = tag->status.resid;
1047                 if (tag->status.head.error != 0) {
1048                         bp->b_error = EIO;
1049                         bp->b_flags |= B_ERROR;
1050                 } else {
1051                         bp->b_resid = 0;
1052                 }
1053                 biodone(bio);
1054                 tag->bio = NULL;
1055                 break;
1056         }
1057
1058         /*
1059          * Handle completion of the transaction.  If the bioq is not empty
1060          * we can initiate another bio on the same tag.
1061          *
1062          * NOTE: Most of our transactions will be single-message
1063          *       CREATE+DELETEs, so we won't have to terminate the
1064          *       transaction separately, here.  But just in case they
1065          *       aren't be sure to terminate the transaction.
1066          */
1067 handle_done:
1068         if (msg->any.head.cmd & DMSGF_DELETE) {
1069                 xa_done(tag, 1);
1070                 if ((state->txcmd & DMSGF_DELETE) == 0)
1071                         kdmsg_msg_reply(msg, 0);
1072         }
1073         return (0);
1074
1075         /*
1076          * Handle the case where the transaction failed due to a
1077          * connectivity issue.  The tag is put away with wasbio=0
1078          * and we put the BIO back onto the bioq for a later restart.
1079          */
1080 handle_repend:
1081         lwkt_gettoken(&sc->tok);
1082         kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1083         tag->bio = NULL;
1084         xa_done(tag, 0);
1085         if ((state->txcmd & DMSGF_DELETE) == 0)
1086                 kdmsg_msg_reply(msg, 0);
1087
1088         /*
1089          * Requeue the bio
1090          */
1091         TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1092
1093         lwkt_reltoken(&sc->tok);
1094         return (0);
1095 }
1096
1097 /*
1098  * Restart as much deferred I/O as we can.  The serializer is set and we
1099  * eat it (clear it) when done.
1100  *
1101  * Called with sc->tok held
1102  */
1103 static
1104 void
1105 xa_restart_deferred(xa_softc_t *sc)
1106 {
1107         kdmsg_state_t *span;
1108         kdmsg_msg_t *msg;
1109         xa_tag_t *tag;
1110         int error;
1111
1112         KKASSERT(sc->serializing);
1113
1114         /*
1115          * Determine if a restart is needed.
1116          */
1117         if (sc->opencnt == 0) {
1118                 /*
1119                  * Device is not open, nothing to do, eat serializing.
1120                  */
1121                 sc->serializing = 0;
1122                 wakeup(sc);
1123         } else if (sc->open_tag == NULL) {
1124                 /*
1125                  * BLK_OPEN required before we can restart any BIOs.
1126                  * Select the best LNK_SPAN to issue the BLK_OPEN under.
1127                  *
1128                  * serializing interlocks waiting open()s.
1129                  */
1130                 error = 0;
1131                 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1132                         if ((span->rxcmd & DMSGF_DELETE) == 0)
1133                                 break;
1134                 }
1135                 if (span == NULL)
1136                         error = ENXIO;
1137
1138                 if (error == 0) {
1139                         tag = xa_setup_cmd(sc, NULL);
1140                         if (tag == NULL)
1141                                 error = ENXIO;
1142                 }
1143                 if (error == 0) {
1144                         kprintf("xdisk: BLK_OPEN\n");
1145                         sc->open_tag = tag;
1146                         msg = kdmsg_msg_alloc(span,
1147                                               DMSG_BLK_OPEN |
1148                                               DMSGF_CREATE,
1149                                               xa_sync_completion, tag);
1150                         msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1151                         xa_start(tag, msg, 0);
1152                 }
1153                 if (error) {
1154                         sc->serializing = 0;
1155                         wakeup(sc);
1156                 }
1157                 /* else leave serializing set until BLK_OPEN response */
1158         } else {
1159                 /* nothing to do */
1160                 sc->serializing = 0;
1161                 wakeup(sc);
1162         }
1163 }