xdisk - Flesh out support
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012-2014 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/tree.h>
60 #include <sys/udev.h>
61 #include <sys/uuid.h>
62 #include <sys/kern_syscall.h>
63
64 #include <sys/dmsg.h>
65 #include <sys/xdiskioctl.h>
66
67 #include <sys/buf2.h>
68 #include <sys/thread2.h>
69
70 struct xa_softc;
71 struct xa_softc_tree;
72 RB_HEAD(xa_softc_tree, xa_softc);
73 RB_PROTOTYPE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
74
75 static int xa_active;
76 SYSCTL_INT(_debug, OID_AUTO, xa_active, CTLFLAG_RW, &xa_active, 0,
77            "Number of active xdisk IOs");
78 static uint64_t xa_last;
79 SYSCTL_ULONG(_debug, OID_AUTO, xa_last, CTLFLAG_RW, &xa_last, 0,
80            "Offset of last xdisk IO");
81
82 /*
83  * Track a BIO tag
84  */
85 struct xa_tag {
86         TAILQ_ENTRY(xa_tag) entry;
87         struct xa_softc *sc;
88         dmsg_blk_error_t status;
89         kdmsg_state_t   *state;
90         struct bio      *bio;
91         int             waiting;
92         int             async;
93         int             done;
94 };
95
96 typedef struct xa_tag   xa_tag_t;
97
98 /*
99  * Track devices.
100  */
101 struct xa_softc {
102         struct kdmsg_state_list spanq;
103         RB_ENTRY(xa_softc) rbnode;
104         cdev_t          dev;
105         struct devstat  stats;
106         struct disk_info info;
107         struct disk     disk;
108         uuid_t          pfs_fsid;
109         int             unit;
110         int             opencnt;
111         int             spancnt;
112         uint64_t        keyid;
113         int             serializing;
114         int             last_error;
115         char            cl_label[64];   /* from LNK_SPAN cl_label (host/dev) */
116         char            fs_label[64];   /* from LNK_SPAN fs_label (serno str) */
117         xa_tag_t        *open_tag;
118         TAILQ_HEAD(, bio) bioq;         /* pending BIOs */
119         TAILQ_HEAD(, xa_tag) tag_freeq; /* available I/O tags */
120         TAILQ_HEAD(, xa_tag) tag_pendq; /* running I/O tags */
121         struct lwkt_token tok;
122 };
123
124 typedef struct xa_softc xa_softc_t;
125
126 struct xa_iocom {
127         TAILQ_ENTRY(xa_iocom) entry;
128         kdmsg_iocom_t   iocom;
129         xa_softc_t      dummysc;
130 };
131
132 typedef struct xa_iocom xa_iocom_t;
133
134 static int xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2);
135 RB_GENERATE(xa_softc_tree, xa_softc, rbnode, xa_softc_cmp);
136 static struct xa_softc_tree xa_device_tree;
137
138 #define MAXTAGS         64      /* no real limit */
139
140 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
141 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
142 static void xaio_exit(kdmsg_iocom_t *iocom);
143 static int xaio_rcvdmsg(kdmsg_msg_t *msg);
144
145 static void xa_terminate_check(struct xa_softc *sc);
146
147 static xa_tag_t *xa_setup_cmd(xa_softc_t *sc, struct bio *bio);
148 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async);
149 static void xa_done(xa_tag_t *tag, int wasbio);
150 static void xa_release(xa_tag_t *tag, int wasbio);
151 static uint32_t xa_wait(xa_tag_t *tag);
152 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
153 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
154 static void xa_restart_deferred(xa_softc_t *sc);
155
156 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
157
158 /*
159  * Control device, issue ioctls to create xa devices.
160  */
161 static d_open_t xdisk_open;
162 static d_close_t xdisk_close;
163 static d_ioctl_t xdisk_ioctl;
164
165 static struct dev_ops xdisk_ops = {
166         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
167         .d_open =       xdisk_open,
168         .d_close =      xdisk_close,
169         .d_ioctl =      xdisk_ioctl
170 };
171
172 /*
173  * XA disk devices
174  */
175 static d_open_t xa_open;
176 static d_close_t xa_close;
177 static d_ioctl_t xa_ioctl;
178 static d_strategy_t xa_strategy;
179 static d_psize_t xa_size;
180
181 static struct dev_ops xa_ops = {
182         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
183         .d_open =       xa_open,
184         .d_close =      xa_close,
185         .d_ioctl =      xa_ioctl,
186         .d_read =       physread,
187         .d_write =      physwrite,
188         .d_strategy =   xa_strategy,
189         .d_psize =      xa_size
190 };
191
192 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
193 static int xdisk_opencount;
194 static cdev_t xdisk_dev;
195 static TAILQ_HEAD(, xa_iocom) xaiocomq;
196
197 /*
198  * Module initialization
199  */
200 static int
201 xdisk_modevent(module_t mod, int type, void *data)
202 {
203         switch (type) {
204         case MOD_LOAD:
205                 TAILQ_INIT(&xaiocomq);
206                 RB_INIT(&xa_device_tree);
207                 xdisk_dev = make_dev(&xdisk_ops, 0,
208                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
209                 break;
210         case MOD_UNLOAD:
211         case MOD_SHUTDOWN:
212                 if (xdisk_opencount || TAILQ_FIRST(&xaiocomq))
213                         return (EBUSY);
214                 if (xdisk_dev) {
215                         destroy_dev(xdisk_dev);
216                         xdisk_dev = NULL;
217                 }
218                 dev_ops_remove_all(&xdisk_ops);
219                 dev_ops_remove_all(&xa_ops);
220                 break;
221         default:
222                 break;
223         }
224         return 0;
225 }
226
227 DEV_MODULE(xdisk, xdisk_modevent, 0);
228
229 static int
230 xa_softc_cmp(xa_softc_t *sc1, xa_softc_t *sc2)
231 {
232         return(strcmp(sc1->fs_label, sc2->fs_label));
233 }
234
235 /*
236  * Control device
237  */
238 static int
239 xdisk_open(struct dev_open_args *ap)
240 {
241         lwkt_gettoken(&xdisk_token);
242         ++xdisk_opencount;
243         lwkt_reltoken(&xdisk_token);
244         return(0);
245 }
246
247 static int
248 xdisk_close(struct dev_close_args *ap)
249 {
250         lwkt_gettoken(&xdisk_token);
251         --xdisk_opencount;
252         lwkt_reltoken(&xdisk_token);
253         return(0);
254 }
255
256 static int
257 xdisk_ioctl(struct dev_ioctl_args *ap)
258 {
259         int error;
260
261         switch(ap->a_cmd) {
262         case XDISKIOCATTACH:
263                 error = xdisk_attach((void *)ap->a_data);
264                 break;
265         case XDISKIOCDETACH:
266                 error = xdisk_detach((void *)ap->a_data);
267                 break;
268         default:
269                 error = ENOTTY;
270                 break;
271         }
272         return error;
273 }
274
275 /************************************************************************
276  *                              DMSG INTERFACE                          *
277  ************************************************************************/
278
279 static int
280 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
281 {
282         xa_iocom_t *xaio;
283         struct file *fp;
284
285         /*
286          * Normalize ioctl params
287          */
288         kprintf("xdisk_attach1\n");
289         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
290         if (fp == NULL)
291                 return EINVAL;
292         kprintf("xdisk_attach2\n");
293
294         /*
295          * See if the serial number is already present.  If we are
296          * racing a termination the disk subsystem may still have
297          * duplicate entries not yet removed so we wait a bit and
298          * retry.
299          */
300         lwkt_gettoken(&xdisk_token);
301
302         xaio = kmalloc(sizeof(*xaio), M_XDISK, M_WAITOK | M_ZERO);
303         kprintf("xdisk_attach3\n");
304         kdmsg_iocom_init(&xaio->iocom, xaio,
305                          KDMSG_IOCOMF_AUTOCONN,
306                          M_XDISK, xaio_rcvdmsg);
307         xaio->iocom.exit_func = xaio_exit;
308
309         kdmsg_iocom_reconnect(&xaio->iocom, fp, "xdisk");
310
311         /*
312          * Setup our LNK_CONN advertisement for autoinitiate.
313          *
314          * Our filter is setup to only accept PEER_BLOCK/SERVER
315          * advertisements.
316          *
317          * We need a unique pfs_fsid to avoid confusion.
318          */
319         xaio->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
320         xaio->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
321         xaio->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
322         xaio->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
323         xaio->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
324         ksnprintf(xaio->iocom.auto_lnk_conn.fs_label,
325                   sizeof(xaio->iocom.auto_lnk_conn.fs_label),
326                   "xdisk");
327         kern_uuidgen(&xaio->iocom.auto_lnk_conn.pfs_fsid, 1);
328
329         /*
330          * Setup our LNK_SPAN advertisement for autoinitiate
331          */
332         TAILQ_INSERT_TAIL(&xaiocomq, xaio, entry);
333         kdmsg_iocom_autoinitiate(&xaio->iocom, NULL);
334         lwkt_reltoken(&xdisk_token);
335
336         return 0;
337 }
338
339 static int
340 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
341 {
342         return EINVAL;
343 }
344
345 /*
346  * Called from iocom core transmit thread upon disconnect.
347  */
348 static
349 void
350 xaio_exit(kdmsg_iocom_t *iocom)
351 {
352         xa_iocom_t *xaio = iocom->handle;
353
354         kprintf("xdisk_detach -xaio_exit\n");
355         lwkt_gettoken(&xdisk_token);
356         TAILQ_REMOVE(&xaiocomq, xaio, entry);
357         lwkt_reltoken(&xdisk_token);
358
359         kdmsg_iocom_uninit(&xaio->iocom);
360
361         kfree(xaio, M_XDISK);
362 }
363
364 /*
365  * Called from iocom core to handle messages that the iocom core does not
366  * handle itself and for which a state function callback has not yet been
367  * established.
368  *
369  * We primarily care about LNK_SPAN transactions here.
370  */
371 static int
372 xaio_rcvdmsg(kdmsg_msg_t *msg)
373 {
374         kdmsg_state_t   *state = msg->state;
375         xa_iocom_t      *xaio = state->iocom->handle;
376         xa_softc_t      *sc;
377
378         kprintf("xdisk_rcvdmsg %08x (state cmd %08x)\n",
379                 msg->any.head.cmd, msg->tcmd);
380         lwkt_gettoken(&xdisk_token);
381
382         switch(msg->tcmd) {
383         case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
384                 /*
385                  * A LNK_SPAN transaction which is opened and closed
386                  * degenerately is not useful to us, just ignore it.
387                  */
388                 kdmsg_msg_reply(msg, 0);
389                 break;
390         case DMSG_LNK_SPAN | DMSGF_CREATE:
391                 /*
392                  * Manage the tracking node for the remote LNK_SPAN.
393                  *
394                  * Return a streaming result, leaving the transaction open
395                  * in both directions to allow sub-transactions.
396                  */
397                 bcopy(msg->any.lnk_span.cl_label, xaio->dummysc.cl_label,
398                       sizeof(xaio->dummysc.cl_label));
399                 xaio->dummysc.cl_label[sizeof(xaio->dummysc.cl_label) - 1] = 0;
400
401                 bcopy(msg->any.lnk_span.fs_label, xaio->dummysc.fs_label,
402                       sizeof(xaio->dummysc.fs_label));
403                 xaio->dummysc.fs_label[sizeof(xaio->dummysc.fs_label) - 1] = 0;
404
405                 kprintf("xdisk: %s LNK_SPAN create state=%p ",
406                         msg->any.lnk_span.fs_label,
407                         msg->state);
408
409                 sc = RB_FIND(xa_softc_tree, &xa_device_tree, &xaio->dummysc);
410                 if (sc == NULL) {
411                         xa_softc_t *sctmp;
412                         xa_tag_t *tag;
413                         cdev_t dev;
414                         int unit;
415                         int n;
416
417                         sc = kmalloc(sizeof(*sc), M_XDISK, M_WAITOK | M_ZERO);
418                         kprintf("(not found - create %p)\n", sc);
419                         bcopy(msg->any.lnk_span.cl_label, sc->cl_label,
420                               sizeof(sc->cl_label));
421                         sc->cl_label[sizeof(sc->cl_label) - 1] = 0;
422                         bcopy(msg->any.lnk_span.fs_label, sc->fs_label,
423                               sizeof(sc->fs_label));
424                         sc->fs_label[sizeof(sc->fs_label) - 1] = 0;
425
426                         /* XXX FIXME O(N^2) */
427                         unit = -1;
428                         do {
429                                 ++unit;
430                                 RB_FOREACH(sctmp, xa_softc_tree,
431                                            &xa_device_tree) {
432                                         if (sctmp->unit == unit)
433                                                 break;
434                                 }
435                         } while (sctmp);
436
437                         sc->unit = unit;
438                         sc->serializing = 1;
439                         sc->spancnt = 1;
440                         lwkt_token_init(&sc->tok, "xa");
441                         TAILQ_INIT(&sc->spanq);
442                         TAILQ_INIT(&sc->bioq);
443                         TAILQ_INIT(&sc->tag_freeq);
444                         TAILQ_INIT(&sc->tag_pendq);
445                         RB_INSERT(xa_softc_tree, &xa_device_tree, sc);
446                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
447                         msg->state->any.xa_sc = sc;
448
449                         /*
450                          * Setup block device
451                          */
452                         for (n = 0; n < MAXTAGS; ++n) {
453                                 tag = kmalloc(sizeof(*tag),
454                                               M_XDISK, M_WAITOK|M_ZERO);
455                                 tag->sc = sc;
456                                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
457                         }
458
459                         if (sc->dev == NULL) {
460                                 dev = disk_create(unit, &sc->disk, &xa_ops);
461                                 dev->si_drv1 = sc;
462                                 sc->dev = dev;
463                                 devstat_add_entry(&sc->stats, "xa", unit,
464                                                   DEV_BSIZE,
465                                                   DEVSTAT_NO_ORDERED_TAGS,
466                                                   DEVSTAT_TYPE_DIRECT |
467                                                   DEVSTAT_TYPE_IF_OTHER,
468                                                   DEVSTAT_PRIORITY_OTHER);
469                         }
470
471                         sc->info.d_media_blksize =
472                                 msg->any.lnk_span.media.block.blksize;
473                         if (sc->info.d_media_blksize <= 0)
474                                 sc->info.d_media_blksize = 1;
475                         sc->info.d_media_blocks =
476                                 msg->any.lnk_span.media.block.bytes /
477                                 sc->info.d_media_blksize;
478                         sc->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
479                         sc->info.d_secpertrack = 32;
480                         sc->info.d_nheads = 64;
481                         sc->info.d_secpercyl = sc->info.d_secpertrack *
482                                                sc->info.d_nheads;
483                         sc->info.d_ncylinders = 0;
484                         if (sc->fs_label[0])
485                                 sc->info.d_serialno = sc->fs_label;
486                         /*
487                          * WARNING! disk_setdiskinfo() must be asynchronous
488                          *          because we are in the rxmsg thread.  If
489                          *          it is synchronous and issues more disk
490                          *          I/Os, we will deadlock.
491                          */
492                         disk_setdiskinfo(&sc->disk, &sc->info);
493                         xa_restart_deferred(sc);        /* eats serializing */
494                 } else {
495                         kprintf("(found spancnt %d sc=%p)\n", sc->spancnt, sc);
496                         ++sc->spancnt;
497                         TAILQ_INSERT_TAIL(&sc->spanq, msg->state, user_entry);
498                         msg->state->any.xa_sc = sc;
499                         if (sc->serializing == 0 && sc->open_tag == NULL) {
500                                 sc->serializing = 1;
501                                 xa_restart_deferred(sc); /* eats serializing */
502                         }
503                 }
504                 kdmsg_msg_result(msg, 0);
505                 break;
506         case DMSG_LNK_SPAN | DMSGF_DELETE:
507                 /*
508                  * Manage the tracking node for the remote LNK_SPAN.
509                  *
510                  * Return a final result, closing our end of the transaction.
511                  */
512                 sc = msg->state->any.xa_sc;
513                 kprintf("xdisk: %s LNK_SPAN terminate state=%p\n",
514                         sc->fs_label, msg->state);
515                 msg->state->any.xa_sc = NULL;
516                 TAILQ_REMOVE(&sc->spanq, msg->state, user_entry);
517                 --sc->spancnt;
518                 xa_terminate_check(sc);
519                 kdmsg_msg_reply(msg, 0);
520                 break;
521         case DMSG_LNK_SPAN | DMSGF_DELETE | DMSGF_REPLY:
522         case DMSG_LNK_SPAN | DMSGF_REPLY:
523                 /*
524                  * Ignore unimplemented streaming replies on our LNK_SPAN
525                  * transaction.
526                  */
527                 break;
528         case DMSG_DBG_SHELL:
529                 /*
530                  * Execute shell command (not supported atm).
531                  *
532                  * This is a one-way packet but if not (e.g. if part of
533                  * a streaming transaction), we will have already closed
534                  * our end.
535                  */
536                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
537                 break;
538         case DMSG_DBG_SHELL | DMSGF_REPLY:
539                 /*
540                  * Receive one or more replies to a shell command
541                  * that we sent.  Just dump it to the console.
542                  *
543                  * This is a one-way packet but if not (e.g. if
544                  * part of a streaming transaction), we will have
545                  * already closed our end.
546                  */
547                 if (msg->aux_data) {
548                         msg->aux_data[msg->aux_size - 1] = 0;
549                         kprintf("xdisk: DEBUGMSG: %s\n",
550                                 msg->aux_data);
551                 }
552                 break;
553         default:
554                 /*
555                  * Unsupported one-way message, streaming message, or
556                  * transaction.
557                  *
558                  * Terminate any unsupported transactions with an error
559                  * and ignore any unsupported streaming messages.
560                  *
561                  * NOTE: This case also includes DMSG_LNK_ERROR messages
562                  *       which might be one-way, replying to those would
563                  *       cause an infinite ping-pong.
564                  */
565                 if (msg->any.head.cmd & DMSGF_CREATE)
566                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
567                 break;
568         }
569         lwkt_reltoken(&xdisk_token);
570
571         return 0;
572 }
573
574 /*
575  * Determine if we can destroy the xa_softc.
576  *
577  * Called with xdisk_token held.
578  */
579 static
580 void
581 xa_terminate_check(struct xa_softc *sc)
582 {
583         xa_tag_t *tag;
584
585         /*
586          * Determine if we can destroy the softc.
587          */
588         kprintf("xdisk: terminate check xa%d (%d,%d,%d) sc=%p ",
589                 sc->unit,
590                 sc->opencnt, sc->serializing, sc->spancnt,
591                 sc);
592
593         if (sc->opencnt || sc->serializing || sc->spancnt) {
594                 kprintf("(leave intact)\n");
595                 return;
596         }
597         kprintf("(remove from tree)\n");
598         sc->serializing = 1;
599         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
600
601         RB_REMOVE(xa_softc_tree, &xa_device_tree, sc);
602
603         if (sc->dev) {
604                 disk_destroy(&sc->disk);
605                 devstat_remove_entry(&sc->stats);
606                 sc->dev->si_drv1 = NULL;
607                 sc->dev = NULL;
608         }
609         KKASSERT(sc->opencnt == 0);
610         KKASSERT(TAILQ_EMPTY(&sc->tag_pendq));
611
612         while ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
613                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
614                 tag->sc = NULL;
615                 kfree(tag, M_XDISK);
616         }
617
618         kfree(sc, M_XDISK);
619 }
620
621 /************************************************************************
622  *                         XA DEVICE INTERFACE                          *
623  ************************************************************************/
624
625 static int
626 xa_open(struct dev_open_args *ap)
627 {
628         cdev_t dev = ap->a_head.a_dev;
629         xa_softc_t *sc;
630         int error;
631
632         dev->si_bsize_phys = 512;
633         dev->si_bsize_best = 32768;
634
635         /*
636          * Interlock open with opencnt, wait for attachment operations
637          * to finish.
638          */
639         lwkt_gettoken(&xdisk_token);
640 again:
641         sc = dev->si_drv1;
642         if (sc == NULL) {
643                 lwkt_reltoken(&xdisk_token);
644                 return ENXIO;   /* raced destruction */
645         }
646         if (sc->serializing) {
647                 tsleep(sc, 0, "xarace", hz / 10);
648                 goto again;
649         }
650         sc->serializing = 1;
651
652         /*
653          * Serialize initial open
654          */
655         if (sc->opencnt++ > 0) {
656                 sc->serializing = 0;
657                 wakeup(sc);
658                 lwkt_reltoken(&xdisk_token);
659                 return(0);
660         }
661         lwkt_reltoken(&xdisk_token);
662
663         /*
664          * Issue BLK_OPEN if necessary.  ENXIO is returned if we have trouble.
665          */
666         if (sc->open_tag == NULL) {
667                 xa_restart_deferred(sc); /* eats serializing */
668         } else {
669                 sc->serializing = 0;
670                 wakeup(sc);
671         }
672
673         /*
674          * Wait for completion of the BLK_OPEN
675          */
676         lwkt_gettoken(&xdisk_token);
677         while (sc->serializing)
678                 tsleep(sc, 0, "xaopen", hz);
679
680         error = sc->last_error;
681         if (error) {
682                 KKASSERT(sc->opencnt > 0);
683                 --sc->opencnt;
684                 xa_terminate_check(sc);
685                 sc = NULL;      /* sc may be invalid now */
686         }
687         lwkt_reltoken(&xdisk_token);
688
689         return (error);
690 }
691
692 static int
693 xa_close(struct dev_close_args *ap)
694 {
695         cdev_t dev = ap->a_head.a_dev;
696         xa_softc_t *sc;
697         xa_tag_t *tag;
698
699         sc = dev->si_drv1;
700         if (sc == NULL)
701                 return ENXIO;   /* raced destruction */
702         lwkt_gettoken(&xdisk_token);
703         lwkt_gettoken(&sc->tok);
704
705         /*
706          * NOTE: Clearing open_tag allows a concurrent open to re-open
707          *       the device and prevents autonomous completion of the tag.
708          */
709         if (sc->opencnt == 1 && sc->open_tag) {
710                 tag = sc->open_tag;
711                 sc->open_tag = NULL;
712                 kdmsg_state_reply(tag->state, 0);       /* close our side */
713                 xa_wait(tag);                           /* wait on remote */
714         }
715         lwkt_reltoken(&sc->tok);
716         KKASSERT(sc->opencnt > 0);
717         --sc->opencnt;
718         xa_terminate_check(sc);
719         lwkt_reltoken(&xdisk_token);
720
721         return(0);
722 }
723
724 static int
725 xa_strategy(struct dev_strategy_args *ap)
726 {
727         xa_softc_t *sc = ap->a_head.a_dev->si_drv1;
728         xa_tag_t *tag;
729         struct bio *bio = ap->a_bio;
730
731         /*
732          * Only BUF_CMD_READ is allowed (for probes) if opencnt is zero.
733          * Otherwise a BLK_OPEN transaction is required.
734          */
735         if (sc->opencnt == 0 && bio->bio_buf->b_cmd != BUF_CMD_READ) {
736                 bio->bio_buf->b_error = ENXIO;
737                 bio->bio_buf->b_flags |= B_ERROR;
738                 biodone(bio);
739                 return(0);
740         }
741         devstat_start_transaction(&sc->stats);
742         atomic_add_int(&xa_active, 1);
743         xa_last = bio->bio_offset;
744
745         lwkt_gettoken(&sc->tok);
746         tag = xa_setup_cmd(sc, bio);
747         if (tag)
748                 xa_start(tag, NULL, 1);
749         lwkt_reltoken(&sc->tok);
750         return(0);
751 }
752
753 static int
754 xa_ioctl(struct dev_ioctl_args *ap)
755 {
756         return(ENOTTY);
757 }
758
759 static int
760 xa_size(struct dev_psize_args *ap)
761 {
762         struct xa_softc *sc;
763
764         if ((sc = ap->a_head.a_dev->si_drv1) == NULL)
765                 return (ENXIO);
766         ap->a_result = sc->info.d_media_blocks;
767         return (0);
768 }
769
770 /************************************************************************
771  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
772  ************************************************************************
773  *
774  * Implement tag/msg setup and related functions.
775  */
776 static xa_tag_t *
777 xa_setup_cmd(xa_softc_t *sc, struct bio *bio)
778 {
779         xa_tag_t *tag;
780
781         /*
782          * Only get a tag if we have a valid virtual circuit to the server.
783          */
784         lwkt_gettoken(&sc->tok);
785         if ((tag = TAILQ_FIRST(&sc->tag_freeq)) != NULL) {
786                 TAILQ_REMOVE(&sc->tag_freeq, tag, entry);
787                 tag->bio = bio;
788                 TAILQ_INSERT_TAIL(&sc->tag_pendq, tag, entry);
789         }
790
791         /*
792          * If we can't dispatch now and this is a bio, queue it for later.
793          */
794         if (tag == NULL && bio) {
795                 TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
796         }
797         lwkt_reltoken(&sc->tok);
798
799         return (tag);
800 }
801
802 static void
803 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg, int async)
804 {
805         xa_softc_t *sc = tag->sc;
806
807         tag->done = 0;
808         tag->async = async;
809
810         if (msg == NULL) {
811                 struct bio *bio;
812                 struct buf *bp;
813
814                 KKASSERT(tag->bio);
815                 bio = tag->bio;
816                 bp = bio->bio_buf;
817
818                 switch(bp->b_cmd) {
819                 case BUF_CMD_READ:
820                         if (sc->opencnt == 0 || sc->open_tag == NULL) {
821                                 kdmsg_state_t *span;
822
823                                 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
824                                         if ((span->rxcmd & DMSGF_DELETE) == 0)
825                                                 break;
826                                 }
827                                 if (span == NULL)
828                                         break;
829                                 msg = kdmsg_msg_alloc(span,
830                                                       DMSG_BLK_READ |
831                                                       DMSGF_CREATE |
832                                                       DMSGF_DELETE,
833                                                       xa_bio_completion, tag);
834                         } else {
835                                 msg = kdmsg_msg_alloc(sc->open_tag->state,
836                                                       DMSG_BLK_READ |
837                                                       DMSGF_CREATE |
838                                                       DMSGF_DELETE,
839                                                       xa_bio_completion, tag);
840                         }
841                         msg->any.blk_read.keyid = sc->keyid;
842                         msg->any.blk_read.offset = bio->bio_offset;
843                         msg->any.blk_read.bytes = bp->b_bcount;
844                         break;
845                 case BUF_CMD_WRITE:
846                         msg = kdmsg_msg_alloc(sc->open_tag->state,
847                                               DMSG_BLK_WRITE |
848                                               DMSGF_CREATE | DMSGF_DELETE,
849                                               xa_bio_completion, tag);
850                         msg->any.blk_write.keyid = sc->keyid;
851                         msg->any.blk_write.offset = bio->bio_offset;
852                         msg->any.blk_write.bytes = bp->b_bcount;
853                         msg->aux_data = bp->b_data;
854                         msg->aux_size = bp->b_bcount;
855                         break;
856                 case BUF_CMD_FLUSH:
857                         msg = kdmsg_msg_alloc(sc->open_tag->state,
858                                               DMSG_BLK_FLUSH |
859                                               DMSGF_CREATE | DMSGF_DELETE,
860                                               xa_bio_completion, tag);
861                         msg->any.blk_flush.keyid = sc->keyid;
862                         msg->any.blk_flush.offset = bio->bio_offset;
863                         msg->any.blk_flush.bytes = bp->b_bcount;
864                         break;
865                 case BUF_CMD_FREEBLKS:
866                         msg = kdmsg_msg_alloc(sc->open_tag->state,
867                                               DMSG_BLK_FREEBLKS |
868                                               DMSGF_CREATE | DMSGF_DELETE,
869                                               xa_bio_completion, tag);
870                         msg->any.blk_freeblks.keyid = sc->keyid;
871                         msg->any.blk_freeblks.offset = bio->bio_offset;
872                         msg->any.blk_freeblks.bytes = bp->b_bcount;
873                         break;
874                 default:
875                         bp->b_flags |= B_ERROR;
876                         bp->b_error = EIO;
877                         devstat_end_transaction_buf(&sc->stats, bp);
878                         atomic_add_int(&xa_active, -1);
879                         biodone(bio);
880                         tag->bio = NULL;
881                         break;
882                 }
883         }
884
885         if (msg) {
886                 tag->state = msg->state;
887                 kdmsg_msg_write(msg);
888         } else {
889                 tag->status.head.error = DMSG_ERR_IO;
890                 xa_done(tag, 1);
891         }
892 }
893
894 static uint32_t
895 xa_wait(xa_tag_t *tag)
896 {
897         xa_softc_t *sc = tag->sc;
898         uint32_t error;
899
900         kprintf("xdisk: xa_wait  %p\n", tag);
901
902         lwkt_gettoken(&sc->tok);
903         tag->waiting = 1;
904         while (tag->done == 0)
905                 tsleep(tag, 0, "xawait", 0);
906         lwkt_reltoken(&sc->tok);
907         error = tag->status.head.error;
908         tag->waiting = 0;
909         xa_release(tag, 0);
910
911         return error;
912 }
913
914 static void
915 xa_done(xa_tag_t *tag, int wasbio)
916 {
917         KKASSERT(tag->bio == NULL);
918
919         tag->state = NULL;
920         tag->done = 1;
921         if (tag->waiting)
922                 wakeup(tag);
923         if (tag->async)
924                 xa_release(tag, wasbio);
925 }
926
927 /*
928  * Release a tag.  If everything looks ok and there are pending BIOs
929  * (due to all tags in-use), we can use the tag to start the next BIO.
930  * Do not try to restart if the connection is currently failed.
931  */
932 static
933 void
934 xa_release(xa_tag_t *tag, int wasbio)
935 {
936         xa_softc_t *sc = tag->sc;
937         struct bio *bio;
938
939         lwkt_gettoken(&sc->tok);
940         if (wasbio && sc->open_tag &&
941             (bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
942                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
943                 tag->bio = bio;
944                 xa_start(tag, NULL, 1);
945                 lwkt_reltoken(&sc->tok);
946         } else {
947                 TAILQ_REMOVE(&sc->tag_pendq, tag, entry);
948                 TAILQ_INSERT_TAIL(&sc->tag_freeq, tag, entry);
949                 lwkt_reltoken(&sc->tok);
950         }
951 }
952
953 /*
954  * Handle messages under the BLKOPEN transaction.
955  */
956 static int
957 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
958 {
959         xa_tag_t *tag = state->any.any;
960         xa_softc_t *sc;
961         struct bio *bio;
962
963         /*
964          * If the tag has been cleaned out we already closed our side
965          * of the transaction and we are waiting for the other side to
966          * close.
967          */
968         if (tag == NULL) {
969                 if (msg->any.head.cmd & DMSGF_CREATE)
970                         kdmsg_state_reply(state, DMSG_ERR_LOSTLINK);
971                 return 0;
972         }
973         sc = tag->sc;
974
975         /*
976          * Validate the tag
977          */
978         lwkt_gettoken(&sc->tok);
979
980         /*
981          * Handle initial response to our open and restart any deferred
982          * BIOs on success.
983          *
984          * NOTE: DELETE may also be set.
985          */
986         if (msg->any.head.cmd & DMSGF_CREATE) {
987                 switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
988                 case DMSG_LNK_ERROR | DMSGF_REPLY:
989                         bzero(&tag->status, sizeof(tag->status));
990                         tag->status.head = msg->any.head;
991                         break;
992                 case DMSG_BLK_ERROR | DMSGF_REPLY:
993                         tag->status = msg->any.blk_error;
994                         break;
995                 }
996                 sc->last_error = tag->status.head.error;
997                 kprintf("xdisk: blk_open completion status %d\n",
998                         sc->last_error);
999                 if (sc->last_error == 0) {
1000                         while ((bio = TAILQ_FIRST(&sc->bioq)) != NULL) {
1001                                 tag = xa_setup_cmd(sc, NULL);
1002                                 if (tag == NULL)
1003                                         break;
1004                                 TAILQ_REMOVE(&sc->bioq, bio, bio_act);
1005                                 tag->bio = bio;
1006                                 xa_start(tag, NULL, 1);
1007                         }
1008                 }
1009                 sc->serializing = 0;
1010                 wakeup(sc);
1011         }
1012
1013         /*
1014          * Handle unexpected termination (or lost comm channel) from other
1015          * side.  Autonomous completion only if open_tag matches,
1016          * otherwise another thread is probably waiting on the tag.
1017          *
1018          * (see xa_close() for other interactions)
1019          */
1020         if (msg->any.head.cmd & DMSGF_DELETE) {
1021                 kdmsg_state_reply(tag->state, 0);
1022                 if (sc->open_tag == tag) {
1023                         sc->open_tag = NULL;
1024                         xa_done(tag, 0);
1025                 } else {
1026                         tag->async = 0;
1027                         xa_done(tag, 0);
1028                 }
1029         }
1030         lwkt_reltoken(&sc->tok);
1031         return (0);
1032 }
1033
1034 static int
1035 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
1036 {
1037         xa_tag_t *tag = state->any.any;
1038         xa_softc_t *sc = tag->sc;
1039         struct bio *bio;
1040         struct buf *bp;
1041
1042         /*
1043          * Get the bio from the tag.  If no bio is present we just do
1044          * 'done' handling.
1045          */
1046         if ((bio = tag->bio) == NULL)
1047                 goto handle_done;
1048         bp = bio->bio_buf;
1049
1050         /*
1051          * Process return status
1052          */
1053         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1054         case DMSG_LNK_ERROR | DMSGF_REPLY:
1055                 bzero(&tag->status, sizeof(tag->status));
1056                 tag->status.head = msg->any.head;
1057                 if (tag->status.head.error)
1058                         tag->status.resid = bp->b_bcount;
1059                 else
1060                         tag->status.resid = 0;
1061                 break;
1062         case DMSG_BLK_ERROR | DMSGF_REPLY:
1063                 tag->status = msg->any.blk_error;
1064                 break;
1065         }
1066
1067         /*
1068          * If the device is open stall the bio on DMSG errors.  If an
1069          * actual I/O error occured on the remote device, DMSG_ERR_IO
1070          * will be returned.
1071          */
1072         if (tag->status.head.error &&
1073             (msg->any.head.cmd & DMSGF_DELETE) && sc->opencnt) {
1074                 if (tag->status.head.error != DMSG_ERR_IO)
1075                         goto handle_repend;
1076         }
1077
1078         /*
1079          * Process bio completion
1080          *
1081          * For reads any returned data is zero-extended if necessary, so
1082          * the server can short-cut any all-zeros reads if it desires.
1083          */
1084         switch(bp->b_cmd) {
1085         case BUF_CMD_READ:
1086                 if (msg->aux_data && msg->aux_size) {
1087                         if (msg->aux_size < bp->b_bcount) {
1088                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1089                                 bzero(bp->b_data + msg->aux_size,
1090                                       bp->b_bcount - msg->aux_size);
1091                         } else {
1092                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1093                         }
1094                 } else {
1095                         bzero(bp->b_data, bp->b_bcount);
1096                 }
1097                 /* fall through */
1098         case BUF_CMD_WRITE:
1099         case BUF_CMD_FLUSH:
1100         case BUF_CMD_FREEBLKS:
1101         default:
1102                 if (tag->status.resid > bp->b_bcount)
1103                         tag->status.resid = bp->b_bcount;
1104                 bp->b_resid = tag->status.resid;
1105                 if (tag->status.head.error != 0) {
1106                         bp->b_error = EIO;
1107                         bp->b_flags |= B_ERROR;
1108                 } else {
1109                         bp->b_resid = 0;
1110                 }
1111                 devstat_end_transaction_buf(&sc->stats, bp);
1112                 atomic_add_int(&xa_active, -1);
1113                 biodone(bio);
1114                 tag->bio = NULL;
1115                 break;
1116         }
1117
1118         /*
1119          * Handle completion of the transaction.  If the bioq is not empty
1120          * we can initiate another bio on the same tag.
1121          *
1122          * NOTE: Most of our transactions will be single-message
1123          *       CREATE+DELETEs, so we won't have to terminate the
1124          *       transaction separately, here.  But just in case they
1125          *       aren't be sure to terminate the transaction.
1126          */
1127 handle_done:
1128         if (msg->any.head.cmd & DMSGF_DELETE) {
1129                 xa_done(tag, 1);
1130                 if ((state->txcmd & DMSGF_DELETE) == 0)
1131                         kdmsg_msg_reply(msg, 0);
1132         }
1133         return (0);
1134
1135         /*
1136          * Handle the case where the transaction failed due to a
1137          * connectivity issue.  The tag is put away with wasbio=0
1138          * and we put the BIO back onto the bioq for a later restart.
1139          */
1140 handle_repend:
1141         lwkt_gettoken(&sc->tok);
1142         kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1143         tag->bio = NULL;
1144         xa_done(tag, 0);
1145         if ((state->txcmd & DMSGF_DELETE) == 0)
1146                 kdmsg_msg_reply(msg, 0);
1147
1148         /*
1149          * Requeue the bio
1150          */
1151         TAILQ_INSERT_TAIL(&sc->bioq, bio, bio_act);
1152
1153         lwkt_reltoken(&sc->tok);
1154         return (0);
1155 }
1156
1157 /*
1158  * Restart as much deferred I/O as we can.  The serializer is set and we
1159  * eat it (clear it) when done.
1160  *
1161  * Called with sc->tok held
1162  */
1163 static
1164 void
1165 xa_restart_deferred(xa_softc_t *sc)
1166 {
1167         kdmsg_state_t *span;
1168         kdmsg_msg_t *msg;
1169         xa_tag_t *tag;
1170         int error;
1171
1172         KKASSERT(sc->serializing);
1173
1174         /*
1175          * Determine if a restart is needed.
1176          */
1177         if (sc->opencnt == 0) {
1178                 /*
1179                  * Device is not open, nothing to do, eat serializing.
1180                  */
1181                 sc->serializing = 0;
1182                 wakeup(sc);
1183         } else if (sc->open_tag == NULL) {
1184                 /*
1185                  * BLK_OPEN required before we can restart any BIOs.
1186                  * Select the best LNK_SPAN to issue the BLK_OPEN under.
1187                  *
1188                  * serializing interlocks waiting open()s.
1189                  */
1190                 error = 0;
1191                 TAILQ_FOREACH(span, &sc->spanq, user_entry) {
1192                         if ((span->rxcmd & DMSGF_DELETE) == 0)
1193                                 break;
1194                 }
1195                 if (span == NULL)
1196                         error = ENXIO;
1197
1198                 if (error == 0) {
1199                         tag = xa_setup_cmd(sc, NULL);
1200                         if (tag == NULL)
1201                                 error = ENXIO;
1202                 }
1203                 if (error == 0) {
1204                         kprintf("xdisk: BLK_OPEN\n");
1205                         sc->open_tag = tag;
1206                         msg = kdmsg_msg_alloc(span,
1207                                               DMSG_BLK_OPEN |
1208                                               DMSGF_CREATE,
1209                                               xa_sync_completion, tag);
1210                         msg->any.blk_open.modes = DMSG_BLKOPEN_RD;
1211                         xa_start(tag, msg, 0);
1212                 }
1213                 if (error) {
1214                         sc->serializing = 0;
1215                         wakeup(sc);
1216                 }
1217                 /* else leave serializing set until BLK_OPEN response */
1218         } else {
1219                 /* nothing to do */
1220                 sc->serializing = 0;
1221                 wakeup(sc);
1222         }
1223 }